fix: enable concurrent status computation to prevent multi-minute delays

Set StatusComputeWorkers=8 on DefaultStatusWatcher for Wait,
WaitWithJobs, and WatchUntilReady. This opts in to the async status
computation added in fluxcd/cli-utils#20, preventing the informer
notification pipeline from being blocked by slow API calls when many
resources are updated simultaneously.

Without this, status computation for resources like Deployments (which
require additional LIST ReplicaSets/Pods calls) runs serially inside
the informer, causing growing delays of 1-3+ minutes when upgrading
many resources at once (e.g., ~20 Deployments via Helm).

Signed-off-by: Maple Miao <mapleeit@gmail.com>
Signed-off-by: maplemiao <maplemiao@tencent.com>
Made-with: Cursor
pull/32043/head
maplemiao 17 hours ago
parent 1164a5fbda
commit 54a2e6b47b

@ -61,6 +61,14 @@ type statusWaiter struct {
// when they don't set a timeout.
var DefaultStatusWatcherTimeout = 30 * time.Second
// defaultStatusComputeWorkers controls the number of concurrent goroutines
// used to compute object status per informer. This prevents the informer
// notification pipeline from being blocked by slow API calls (e.g., LIST
// ReplicaSets/Pods for Deployments) when many resources are updated
// simultaneously.
// See https://github.com/fluxcd/cli-utils/pull/20
const defaultStatusComputeWorkers = 8
func alwaysReady(_ *unstructured.Unstructured) (*status.Result, error) {
return &status.Result{
Status: status.CurrentStatus,
@ -76,6 +84,7 @@ func (w *statusWaiter) WatchUntilReady(resourceList ResourceList, timeout time.D
defer cancel()
w.Logger().Debug("waiting for resources", "count", len(resourceList), "timeout", timeout)
sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper)
sw.StatusComputeWorkers = defaultStatusComputeWorkers
jobSR := helmStatusReaders.NewCustomJobStatusReader(w.restMapper)
podSR := helmStatusReaders.NewCustomPodStatusReader(w.restMapper)
// We don't want to wait on any other resources as watchUntilReady is only for Helm hooks.
@ -98,6 +107,7 @@ func (w *statusWaiter) Wait(resourceList ResourceList, timeout time.Duration) er
defer cancel()
w.Logger().Debug("waiting for resources", "count", len(resourceList), "timeout", timeout)
sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper)
sw.StatusComputeWorkers = defaultStatusComputeWorkers
sw.StatusReader = statusreaders.NewStatusReader(w.restMapper, w.readers...)
return w.wait(ctx, resourceList, sw)
}
@ -110,6 +120,7 @@ func (w *statusWaiter) WaitWithJobs(resourceList ResourceList, timeout time.Dura
defer cancel()
w.Logger().Debug("waiting for resources", "count", len(resourceList), "timeout", timeout)
sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper)
sw.StatusComputeWorkers = defaultStatusComputeWorkers
newCustomJobStatusReader := helmStatusReaders.NewCustomJobStatusReader(w.restMapper)
readers := append([]engine.StatusReader(nil), w.readers...)
readers = append(readers, newCustomJobStatusReader)

Loading…
Cancel
Save