diff --git a/pkg/kube/statuswait.go b/pkg/kube/statuswait.go index 59c1218ff..a3e495ae7 100644 --- a/pkg/kube/statuswait.go +++ b/pkg/kube/statuswait.go @@ -61,6 +61,14 @@ type statusWaiter struct { // when they don't set a timeout. var DefaultStatusWatcherTimeout = 30 * time.Second +// defaultStatusComputeWorkers controls the number of concurrent goroutines +// used to compute object status per informer. This prevents the informer +// notification pipeline from being blocked by slow API calls (e.g., LIST +// ReplicaSets/Pods for Deployments) when many resources are updated +// simultaneously. +// See https://github.com/fluxcd/cli-utils/pull/20 +const defaultStatusComputeWorkers = 8 + func alwaysReady(_ *unstructured.Unstructured) (*status.Result, error) { return &status.Result{ Status: status.CurrentStatus, @@ -76,6 +84,7 @@ func (w *statusWaiter) WatchUntilReady(resourceList ResourceList, timeout time.D defer cancel() w.Logger().Debug("waiting for resources", "count", len(resourceList), "timeout", timeout) sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper) + sw.StatusComputeWorkers = defaultStatusComputeWorkers jobSR := helmStatusReaders.NewCustomJobStatusReader(w.restMapper) podSR := helmStatusReaders.NewCustomPodStatusReader(w.restMapper) // We don't want to wait on any other resources as watchUntilReady is only for Helm hooks. @@ -98,6 +107,7 @@ func (w *statusWaiter) Wait(resourceList ResourceList, timeout time.Duration) er defer cancel() w.Logger().Debug("waiting for resources", "count", len(resourceList), "timeout", timeout) sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper) + sw.StatusComputeWorkers = defaultStatusComputeWorkers sw.StatusReader = statusreaders.NewStatusReader(w.restMapper, w.readers...) return w.wait(ctx, resourceList, sw) } @@ -110,6 +120,7 @@ func (w *statusWaiter) WaitWithJobs(resourceList ResourceList, timeout time.Dura defer cancel() w.Logger().Debug("waiting for resources", "count", len(resourceList), "timeout", timeout) sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper) + sw.StatusComputeWorkers = defaultStatusComputeWorkers newCustomJobStatusReader := helmStatusReaders.NewCustomJobStatusReader(w.restMapper) readers := append([]engine.StatusReader(nil), w.readers...) readers = append(readers, newCustomJobStatusReader)