fix: enable concurrent status computation to prevent multi-minute delays

Set StatusComputeWorkers=8 on DefaultStatusWatcher for Wait,
WaitWithJobs, and WatchUntilReady. This opts in to the async status
computation added in fluxcd/cli-utils#20, preventing the informer
notification pipeline from being blocked by slow API calls when many
resources are updated simultaneously.

Without this, status computation for resources like Deployments (which
require additional LIST ReplicaSets/Pods calls) runs serially inside
the informer, causing growing delays of 1-3+ minutes when upgrading
many resources at once (e.g., ~20 Deployments via Helm).

Signed-off-by: maplemiao <maplemiao@tencent.com>
pull/32043/head
maplemiao 1 month ago
parent 1164a5fbda
commit 9c88acb97a

@ -61,6 +61,14 @@ type statusWaiter struct {
// when they don't set a timeout.
var DefaultStatusWatcherTimeout = 30 * time.Second
// DefaultStatusComputeWorkers controls the number of concurrent goroutines
// used to compute object status per informer. This prevents the informer
// notification pipeline from being blocked by slow API calls (e.g., LIST
// ReplicaSets/Pods for Deployments) when many resources are updated
// simultaneously.
// See https://github.com/fluxcd/cli-utils/pull/20
var DefaultStatusComputeWorkers = 8
func alwaysReady(_ *unstructured.Unstructured) (*status.Result, error) {
return &status.Result{
Status: status.CurrentStatus,
@ -76,6 +84,7 @@ func (w *statusWaiter) WatchUntilReady(resourceList ResourceList, timeout time.D
defer cancel()
w.Logger().Debug("waiting for resources", "count", len(resourceList), "timeout", timeout)
sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper)
sw.StatusComputeWorkers = DefaultStatusComputeWorkers
jobSR := helmStatusReaders.NewCustomJobStatusReader(w.restMapper)
podSR := helmStatusReaders.NewCustomPodStatusReader(w.restMapper)
// We don't want to wait on any other resources as watchUntilReady is only for Helm hooks.
@ -98,6 +107,7 @@ func (w *statusWaiter) Wait(resourceList ResourceList, timeout time.Duration) er
defer cancel()
w.Logger().Debug("waiting for resources", "count", len(resourceList), "timeout", timeout)
sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper)
sw.StatusComputeWorkers = DefaultStatusComputeWorkers
sw.StatusReader = statusreaders.NewStatusReader(w.restMapper, w.readers...)
return w.wait(ctx, resourceList, sw)
}
@ -110,6 +120,7 @@ func (w *statusWaiter) WaitWithJobs(resourceList ResourceList, timeout time.Dura
defer cancel()
w.Logger().Debug("waiting for resources", "count", len(resourceList), "timeout", timeout)
sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper)
sw.StatusComputeWorkers = DefaultStatusComputeWorkers
newCustomJobStatusReader := helmStatusReaders.NewCustomJobStatusReader(w.restMapper)
readers := append([]engine.StatusReader(nil), w.readers...)
readers = append(readers, newCustomJobStatusReader)

Loading…
Cancel
Save