From 9c88acb97af795cd8f799aa78e072a96c5a3cf27 Mon Sep 17 00:00:00 2001 From: maplemiao Date: Mon, 13 Apr 2026 10:32:42 +0800 Subject: [PATCH 1/3] fix: enable concurrent status computation to prevent multi-minute delays Set StatusComputeWorkers=8 on DefaultStatusWatcher for Wait, WaitWithJobs, and WatchUntilReady. This opts in to the async status computation added in fluxcd/cli-utils#20, preventing the informer notification pipeline from being blocked by slow API calls when many resources are updated simultaneously. Without this, status computation for resources like Deployments (which require additional LIST ReplicaSets/Pods calls) runs serially inside the informer, causing growing delays of 1-3+ minutes when upgrading many resources at once (e.g., ~20 Deployments via Helm). Signed-off-by: maplemiao --- pkg/kube/statuswait.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/kube/statuswait.go b/pkg/kube/statuswait.go index 59c1218ff..8f5ae25a6 100644 --- a/pkg/kube/statuswait.go +++ b/pkg/kube/statuswait.go @@ -61,6 +61,14 @@ type statusWaiter struct { // when they don't set a timeout. var DefaultStatusWatcherTimeout = 30 * time.Second +// DefaultStatusComputeWorkers controls the number of concurrent goroutines +// used to compute object status per informer. This prevents the informer +// notification pipeline from being blocked by slow API calls (e.g., LIST +// ReplicaSets/Pods for Deployments) when many resources are updated +// simultaneously. +// See https://github.com/fluxcd/cli-utils/pull/20 +var DefaultStatusComputeWorkers = 8 + func alwaysReady(_ *unstructured.Unstructured) (*status.Result, error) { return &status.Result{ Status: status.CurrentStatus, @@ -76,6 +84,7 @@ func (w *statusWaiter) WatchUntilReady(resourceList ResourceList, timeout time.D defer cancel() w.Logger().Debug("waiting for resources", "count", len(resourceList), "timeout", timeout) sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper) + sw.StatusComputeWorkers = DefaultStatusComputeWorkers jobSR := helmStatusReaders.NewCustomJobStatusReader(w.restMapper) podSR := helmStatusReaders.NewCustomPodStatusReader(w.restMapper) // We don't want to wait on any other resources as watchUntilReady is only for Helm hooks. @@ -98,6 +107,7 @@ func (w *statusWaiter) Wait(resourceList ResourceList, timeout time.Duration) er defer cancel() w.Logger().Debug("waiting for resources", "count", len(resourceList), "timeout", timeout) sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper) + sw.StatusComputeWorkers = DefaultStatusComputeWorkers sw.StatusReader = statusreaders.NewStatusReader(w.restMapper, w.readers...) return w.wait(ctx, resourceList, sw) } @@ -110,6 +120,7 @@ func (w *statusWaiter) WaitWithJobs(resourceList ResourceList, timeout time.Dura defer cancel() w.Logger().Debug("waiting for resources", "count", len(resourceList), "timeout", timeout) sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper) + sw.StatusComputeWorkers = DefaultStatusComputeWorkers newCustomJobStatusReader := helmStatusReaders.NewCustomJobStatusReader(w.restMapper) readers := append([]engine.StatusReader(nil), w.readers...) readers = append(readers, newCustomJobStatusReader) From d23a392b6823079e33e106a5a658b932d701d6e9 Mon Sep 17 00:00:00 2001 From: maplemiao Date: Tue, 21 Apr 2026 17:49:02 +0800 Subject: [PATCH 2/3] refactor(kube): expose StatusComputeWorkers as a WaitOption Replace the package-level DefaultStatusComputeWorkers variable with a WithStatusComputeWorkers WaitOption threaded through waitOptions into the statusWaiter. This removes global mutable state from pkg/kube and lets callers opt in explicitly. SDK consumers (e.g. helm-controller) inherit the zero value, which preserves the upstream cli-utils synchronous behavior and avoids an unexpected fan-out of status-compute goroutines when many releases reconcile concurrently. The Helm CLI continues to enable 8 workers by default via a shared pkg/cmd/flags.go helper, so install/upgrade/ rollback/uninstall/test users still get the fix for multi-minute informer stalls out of the box. Signed-off-by: maplemiao --- pkg/cmd/flags.go | 20 ++++++++++++++++++++ pkg/cmd/install.go | 1 + pkg/cmd/release_testing.go | 1 + pkg/cmd/rollback.go | 1 + pkg/cmd/uninstall.go | 1 + pkg/cmd/upgrade.go | 2 ++ pkg/kube/client.go | 17 +++++++++-------- pkg/kube/options.go | 30 ++++++++++++++++++++++++------ pkg/kube/statuswait.go | 31 ++++++++++++------------------- pkg/kube/statuswait_test.go | 14 ++++++++++++++ 10 files changed, 85 insertions(+), 33 deletions(-) diff --git a/pkg/cmd/flags.go b/pkg/cmd/flags.go index 5a220d1ce..2ccf01b7d 100644 --- a/pkg/cmd/flags.go +++ b/pkg/cmd/flags.go @@ -65,6 +65,26 @@ func AddWaitFlag(cmd *cobra.Command, wait *kube.WaitStrategy) { cmd.Flags().Lookup("wait").NoOptDefVal = string(kube.StatusWatcherStrategy) } +// cliDefaultStatusComputeWorkers is the number of concurrent status-compute +// workers the Helm CLI enables by default. This prevents the informer +// notification pipeline from being blocked by slow API calls (e.g. LIST +// ReplicaSets/Pods for Deployments) when many resources are updated +// simultaneously. See https://github.com/fluxcd/cli-utils/pull/20. +// +// SDK consumers (e.g. helm-controller) inherit the zero value and can opt in +// via kube.WithStatusComputeWorkers when they want the same behavior. +const cliDefaultStatusComputeWorkers = 8 + +// defaultCLIWaitOptions returns the set of WaitOptions the Helm CLI applies +// by default to every wait-enabled command. Keeping these in one place keeps +// behavior consistent across install/upgrade/rollback/uninstall and makes the +// CLI-vs-SDK default asymmetry explicit. +func defaultCLIWaitOptions() []kube.WaitOption { + return []kube.WaitOption{ + kube.WithStatusComputeWorkers(cliDefaultStatusComputeWorkers), + } +} + type waitValue kube.WaitStrategy func newWaitValue(defaultValue kube.WaitStrategy, ws *kube.WaitStrategy) *waitValue { diff --git a/pkg/cmd/install.go b/pkg/cmd/install.go index ed10513c9..feac18d73 100644 --- a/pkg/cmd/install.go +++ b/pkg/cmd/install.go @@ -131,6 +131,7 @@ charts in a repository, use 'helm search'. func newInstallCmd(cfg *action.Configuration, out io.Writer) *cobra.Command { client := action.NewInstall(cfg) + client.WaitOptions = append(client.WaitOptions, defaultCLIWaitOptions()...) valueOpts := &values.Options{} var outfmt output.Format diff --git a/pkg/cmd/release_testing.go b/pkg/cmd/release_testing.go index 5a6159e7d..6be83c966 100644 --- a/pkg/cmd/release_testing.go +++ b/pkg/cmd/release_testing.go @@ -40,6 +40,7 @@ The tests to be run are defined in the chart that was installed. func newReleaseTestCmd(cfg *action.Configuration, out io.Writer) *cobra.Command { client := action.NewReleaseTesting(cfg) + client.WaitOptions = append(client.WaitOptions, defaultCLIWaitOptions()...) outfmt := output.Table var outputLogs bool var filter []string diff --git a/pkg/cmd/rollback.go b/pkg/cmd/rollback.go index 01d8b1866..b716dae10 100644 --- a/pkg/cmd/rollback.go +++ b/pkg/cmd/rollback.go @@ -40,6 +40,7 @@ To see revision numbers, run 'helm history RELEASE'. func newRollbackCmd(cfg *action.Configuration, out io.Writer) *cobra.Command { client := action.NewRollback(cfg) + client.WaitOptions = append(client.WaitOptions, defaultCLIWaitOptions()...) cmd := &cobra.Command{ Use: "rollback [REVISION]", diff --git a/pkg/cmd/uninstall.go b/pkg/cmd/uninstall.go index 49f7bd19d..f024f912a 100644 --- a/pkg/cmd/uninstall.go +++ b/pkg/cmd/uninstall.go @@ -42,6 +42,7 @@ are fully deleted before the command returns. func newUninstallCmd(cfg *action.Configuration, out io.Writer) *cobra.Command { client := action.NewUninstall(cfg) + client.WaitOptions = append(client.WaitOptions, defaultCLIWaitOptions()...) cmd := &cobra.Command{ Use: "uninstall RELEASE_NAME [...]", diff --git a/pkg/cmd/upgrade.go b/pkg/cmd/upgrade.go index b71c4ae2d..020c4f6dd 100644 --- a/pkg/cmd/upgrade.go +++ b/pkg/cmd/upgrade.go @@ -84,6 +84,7 @@ which can contain sensitive values. To hide Kubernetes Secrets use the func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command { client := action.NewUpgrade(cfg) + client.WaitOptions = append(client.WaitOptions, defaultCLIWaitOptions()...) valueOpts := &values.Options{} var outfmt output.Format var createNamespace bool @@ -139,6 +140,7 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command { instClient.SkipCRDs = client.SkipCRDs instClient.Timeout = client.Timeout instClient.WaitStrategy = client.WaitStrategy + instClient.WaitOptions = client.WaitOptions instClient.WaitForJobs = client.WaitForJobs instClient.Devel = client.Devel instClient.Namespace = client.Namespace diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 44f31cdbe..2f1c21819 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -167,14 +167,15 @@ func (c *Client) newStatusWatcher(opts ...WaitOption) (*statusWaiter, error) { waitContext = c.WaitContext } sw := &statusWaiter{ - restMapper: restMapper, - client: dynamicClient, - ctx: waitContext, - watchUntilReadyCtx: o.watchUntilReadyCtx, - waitCtx: o.waitCtx, - waitWithJobsCtx: o.waitWithJobsCtx, - waitForDeleteCtx: o.waitForDeleteCtx, - readers: o.statusReaders, + restMapper: restMapper, + client: dynamicClient, + ctx: waitContext, + watchUntilReadyCtx: o.watchUntilReadyCtx, + waitCtx: o.waitCtx, + waitWithJobsCtx: o.waitWithJobsCtx, + waitForDeleteCtx: o.waitForDeleteCtx, + readers: o.statusReaders, + statusComputeWorkers: o.statusComputeWorkers, } sw.SetLogger(c.Logger().Handler()) return sw, nil diff --git a/pkg/kube/options.go b/pkg/kube/options.go index 3326c284b..db3de2f33 100644 --- a/pkg/kube/options.go +++ b/pkg/kube/options.go @@ -72,11 +72,29 @@ func WithKStatusReaders(readers ...engine.StatusReader) WaitOption { } } +// WithStatusComputeWorkers sets the number of concurrent goroutines used to +// compute object status per informer. This prevents the informer notification +// pipeline from being blocked by slow API calls (e.g., LIST ReplicaSets/Pods +// for Deployments) when many resources are updated simultaneously. +// +// A value of 0 (the default) keeps the underlying cli-utils behavior, where +// status is computed synchronously on the informer goroutine. SDK consumers +// (for example helm-controller) inherit this conservative default and can +// opt in explicitly. The Helm CLI passes a non-zero value so that +// `helm install/upgrade/rollback` users get the fix for multi-minute waits +// out of the box. See https://github.com/fluxcd/cli-utils/pull/20. +func WithStatusComputeWorkers(n int) WaitOption { + return func(wo *waitOptions) { + wo.statusComputeWorkers = n + } +} + type waitOptions struct { - ctx context.Context - watchUntilReadyCtx context.Context - waitCtx context.Context - waitWithJobsCtx context.Context - waitForDeleteCtx context.Context - statusReaders []engine.StatusReader + ctx context.Context + watchUntilReadyCtx context.Context + waitCtx context.Context + waitWithJobsCtx context.Context + waitForDeleteCtx context.Context + statusReaders []engine.StatusReader + statusComputeWorkers int } diff --git a/pkg/kube/statuswait.go b/pkg/kube/statuswait.go index 8f5ae25a6..fd4b4b4e5 100644 --- a/pkg/kube/statuswait.go +++ b/pkg/kube/statuswait.go @@ -43,14 +43,15 @@ import ( ) type statusWaiter struct { - client dynamic.Interface - restMapper meta.RESTMapper - ctx context.Context - watchUntilReadyCtx context.Context - waitCtx context.Context - waitWithJobsCtx context.Context - waitForDeleteCtx context.Context - readers []engine.StatusReader + client dynamic.Interface + restMapper meta.RESTMapper + ctx context.Context + watchUntilReadyCtx context.Context + waitCtx context.Context + waitWithJobsCtx context.Context + waitForDeleteCtx context.Context + readers []engine.StatusReader + statusComputeWorkers int logging.LogHolder } @@ -61,14 +62,6 @@ type statusWaiter struct { // when they don't set a timeout. var DefaultStatusWatcherTimeout = 30 * time.Second -// DefaultStatusComputeWorkers controls the number of concurrent goroutines -// used to compute object status per informer. This prevents the informer -// notification pipeline from being blocked by slow API calls (e.g., LIST -// ReplicaSets/Pods for Deployments) when many resources are updated -// simultaneously. -// See https://github.com/fluxcd/cli-utils/pull/20 -var DefaultStatusComputeWorkers = 8 - func alwaysReady(_ *unstructured.Unstructured) (*status.Result, error) { return &status.Result{ Status: status.CurrentStatus, @@ -84,7 +77,7 @@ func (w *statusWaiter) WatchUntilReady(resourceList ResourceList, timeout time.D defer cancel() w.Logger().Debug("waiting for resources", "count", len(resourceList), "timeout", timeout) sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper) - sw.StatusComputeWorkers = DefaultStatusComputeWorkers + sw.StatusComputeWorkers = w.statusComputeWorkers jobSR := helmStatusReaders.NewCustomJobStatusReader(w.restMapper) podSR := helmStatusReaders.NewCustomPodStatusReader(w.restMapper) // We don't want to wait on any other resources as watchUntilReady is only for Helm hooks. @@ -107,7 +100,7 @@ func (w *statusWaiter) Wait(resourceList ResourceList, timeout time.Duration) er defer cancel() w.Logger().Debug("waiting for resources", "count", len(resourceList), "timeout", timeout) sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper) - sw.StatusComputeWorkers = DefaultStatusComputeWorkers + sw.StatusComputeWorkers = w.statusComputeWorkers sw.StatusReader = statusreaders.NewStatusReader(w.restMapper, w.readers...) return w.wait(ctx, resourceList, sw) } @@ -120,7 +113,7 @@ func (w *statusWaiter) WaitWithJobs(resourceList ResourceList, timeout time.Dura defer cancel() w.Logger().Debug("waiting for resources", "count", len(resourceList), "timeout", timeout) sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper) - sw.StatusComputeWorkers = DefaultStatusComputeWorkers + sw.StatusComputeWorkers = w.statusComputeWorkers newCustomJobStatusReader := helmStatusReaders.NewCustomJobStatusReader(w.restMapper) readers := append([]engine.StatusReader(nil), w.readers...) readers = append(readers, newCustomJobStatusReader) diff --git a/pkg/kube/statuswait_test.go b/pkg/kube/statuswait_test.go index 73a424720..56d4b98e2 100644 --- a/pkg/kube/statuswait_test.go +++ b/pkg/kube/statuswait_test.go @@ -1293,6 +1293,20 @@ func TestWaitOptionFunctions(t *testing.T) { WithWaitForDeleteMethodContext(ctx)(opts) assert.Equal(t, ctx, opts.waitForDeleteCtx) }) + + t.Run("WithStatusComputeWorkers sets statusComputeWorkers", func(t *testing.T) { + t.Parallel() + opts := &waitOptions{} + WithStatusComputeWorkers(8)(opts) + assert.Equal(t, 8, opts.statusComputeWorkers) + }) + + t.Run("waitOptions.statusComputeWorkers defaults to zero", func(t *testing.T) { + t.Parallel() + opts := &waitOptions{} + assert.Equal(t, 0, opts.statusComputeWorkers, + "SDK consumers must opt in to concurrent status computation") + }) } func TestMethodSpecificContextCancellation(t *testing.T) { From 86a68d7ce7ffda8dbd4b7dfefdd0fce32f187fd3 Mon Sep 17 00:00:00 2001 From: maplemiao Date: Wed, 22 Apr 2026 10:55:31 +0800 Subject: [PATCH 3/3] refactor(kube): clamp negative StatusComputeWorkers values to zero Address review feedback: when a caller passes a negative value to WithStatusComputeWorkers, coerce it to zero rather than propagating it to the underlying cli-utils watcher, where the behavior is undefined. Zero is the safe default and matches the SDK opt-in contract. Signed-off-by: maplemiao --- pkg/kube/options.go | 15 ++++++++++----- pkg/kube/statuswait_test.go | 8 ++++++++ 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/pkg/kube/options.go b/pkg/kube/options.go index db3de2f33..7853cb63a 100644 --- a/pkg/kube/options.go +++ b/pkg/kube/options.go @@ -78,13 +78,18 @@ func WithKStatusReaders(readers ...engine.StatusReader) WaitOption { // for Deployments) when many resources are updated simultaneously. // // A value of 0 (the default) keeps the underlying cli-utils behavior, where -// status is computed synchronously on the informer goroutine. SDK consumers -// (for example helm-controller) inherit this conservative default and can -// opt in explicitly. The Helm CLI passes a non-zero value so that -// `helm install/upgrade/rollback` users get the fix for multi-minute waits -// out of the box. See https://github.com/fluxcd/cli-utils/pull/20. +// status is computed synchronously on the informer goroutine. Negative values +// are clamped to 0 so callers cannot propagate invalid counts to the +// underlying watcher. SDK consumers (for example helm-controller) inherit +// this conservative default and can opt in explicitly. The Helm CLI passes +// a non-zero value so that `helm install/upgrade/rollback` users get the +// fix for multi-minute waits out of the box. +// See https://github.com/fluxcd/cli-utils/pull/20. func WithStatusComputeWorkers(n int) WaitOption { return func(wo *waitOptions) { + if n < 0 { + n = 0 + } wo.statusComputeWorkers = n } } diff --git a/pkg/kube/statuswait_test.go b/pkg/kube/statuswait_test.go index 56d4b98e2..0a0052f1a 100644 --- a/pkg/kube/statuswait_test.go +++ b/pkg/kube/statuswait_test.go @@ -1301,6 +1301,14 @@ func TestWaitOptionFunctions(t *testing.T) { assert.Equal(t, 8, opts.statusComputeWorkers) }) + t.Run("WithStatusComputeWorkers clamps negative values to zero", func(t *testing.T) { + t.Parallel() + opts := &waitOptions{} + WithStatusComputeWorkers(-1)(opts) + assert.Equal(t, 0, opts.statusComputeWorkers, + "negative worker counts must not propagate to the underlying watcher") + }) + t.Run("waitOptions.statusComputeWorkers defaults to zero", func(t *testing.T) { t.Parallel() opts := &waitOptions{}