Zhaofeng Miao 1 week ago committed by GitHub
commit f6b666f7c8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -65,6 +65,26 @@ func AddWaitFlag(cmd *cobra.Command, wait *kube.WaitStrategy) {
cmd.Flags().Lookup("wait").NoOptDefVal = string(kube.StatusWatcherStrategy)
}
// cliDefaultStatusComputeWorkers is the number of concurrent status-compute
// workers the Helm CLI enables by default. This prevents the informer
// notification pipeline from being blocked by slow API calls (e.g. LIST
// ReplicaSets/Pods for Deployments) when many resources are updated
// simultaneously. See https://github.com/fluxcd/cli-utils/pull/20.
//
// SDK consumers (e.g. helm-controller) inherit the zero value and can opt in
// via kube.WithStatusComputeWorkers when they want the same behavior.
const cliDefaultStatusComputeWorkers = 8
// defaultCLIWaitOptions returns the set of WaitOptions the Helm CLI applies
// by default to every wait-enabled command. Keeping these in one place keeps
// behavior consistent across install/upgrade/rollback/uninstall and makes the
// CLI-vs-SDK default asymmetry explicit.
func defaultCLIWaitOptions() []kube.WaitOption {
return []kube.WaitOption{
kube.WithStatusComputeWorkers(cliDefaultStatusComputeWorkers),
}
}
type waitValue kube.WaitStrategy
func newWaitValue(defaultValue kube.WaitStrategy, ws *kube.WaitStrategy) *waitValue {

@ -131,6 +131,7 @@ charts in a repository, use 'helm search'.
func newInstallCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
client := action.NewInstall(cfg)
client.WaitOptions = append(client.WaitOptions, defaultCLIWaitOptions()...)
valueOpts := &values.Options{}
var outfmt output.Format

@ -40,6 +40,7 @@ The tests to be run are defined in the chart that was installed.
func newReleaseTestCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
client := action.NewReleaseTesting(cfg)
client.WaitOptions = append(client.WaitOptions, defaultCLIWaitOptions()...)
outfmt := output.Table
var outputLogs bool
var filter []string

@ -40,6 +40,7 @@ To see revision numbers, run 'helm history RELEASE'.
func newRollbackCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
client := action.NewRollback(cfg)
client.WaitOptions = append(client.WaitOptions, defaultCLIWaitOptions()...)
cmd := &cobra.Command{
Use: "rollback <RELEASE> [REVISION]",

@ -42,6 +42,7 @@ are fully deleted before the command returns.
func newUninstallCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
client := action.NewUninstall(cfg)
client.WaitOptions = append(client.WaitOptions, defaultCLIWaitOptions()...)
cmd := &cobra.Command{
Use: "uninstall RELEASE_NAME [...]",

@ -84,6 +84,7 @@ which can contain sensitive values. To hide Kubernetes Secrets use the
func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
client := action.NewUpgrade(cfg)
client.WaitOptions = append(client.WaitOptions, defaultCLIWaitOptions()...)
valueOpts := &values.Options{}
var outfmt output.Format
var createNamespace bool
@ -139,6 +140,7 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
instClient.SkipCRDs = client.SkipCRDs
instClient.Timeout = client.Timeout
instClient.WaitStrategy = client.WaitStrategy
instClient.WaitOptions = client.WaitOptions
instClient.WaitForJobs = client.WaitForJobs
instClient.Devel = client.Devel
instClient.Namespace = client.Namespace

@ -167,14 +167,15 @@ func (c *Client) newStatusWatcher(opts ...WaitOption) (*statusWaiter, error) {
waitContext = c.WaitContext
}
sw := &statusWaiter{
restMapper: restMapper,
client: dynamicClient,
ctx: waitContext,
watchUntilReadyCtx: o.watchUntilReadyCtx,
waitCtx: o.waitCtx,
waitWithJobsCtx: o.waitWithJobsCtx,
waitForDeleteCtx: o.waitForDeleteCtx,
readers: o.statusReaders,
restMapper: restMapper,
client: dynamicClient,
ctx: waitContext,
watchUntilReadyCtx: o.watchUntilReadyCtx,
waitCtx: o.waitCtx,
waitWithJobsCtx: o.waitWithJobsCtx,
waitForDeleteCtx: o.waitForDeleteCtx,
readers: o.statusReaders,
statusComputeWorkers: o.statusComputeWorkers,
}
sw.SetLogger(c.Logger().Handler())
return sw, nil

@ -72,11 +72,34 @@ func WithKStatusReaders(readers ...engine.StatusReader) WaitOption {
}
}
// WithStatusComputeWorkers sets the number of concurrent goroutines used to
// compute object status per informer. This prevents the informer notification
// pipeline from being blocked by slow API calls (e.g., LIST ReplicaSets/Pods
// for Deployments) when many resources are updated simultaneously.
//
// A value of 0 (the default) keeps the underlying cli-utils behavior, where
// status is computed synchronously on the informer goroutine. Negative values
// are clamped to 0 so callers cannot propagate invalid counts to the
// underlying watcher. SDK consumers (for example helm-controller) inherit
// this conservative default and can opt in explicitly. The Helm CLI passes
// a non-zero value so that `helm install/upgrade/rollback` users get the
// fix for multi-minute waits out of the box.
// See https://github.com/fluxcd/cli-utils/pull/20.
func WithStatusComputeWorkers(n int) WaitOption {
return func(wo *waitOptions) {
if n < 0 {
n = 0
}
wo.statusComputeWorkers = n
}
}
type waitOptions struct {
ctx context.Context
watchUntilReadyCtx context.Context
waitCtx context.Context
waitWithJobsCtx context.Context
waitForDeleteCtx context.Context
statusReaders []engine.StatusReader
ctx context.Context
watchUntilReadyCtx context.Context
waitCtx context.Context
waitWithJobsCtx context.Context
waitForDeleteCtx context.Context
statusReaders []engine.StatusReader
statusComputeWorkers int
}

@ -43,14 +43,15 @@ import (
)
type statusWaiter struct {
client dynamic.Interface
restMapper meta.RESTMapper
ctx context.Context
watchUntilReadyCtx context.Context
waitCtx context.Context
waitWithJobsCtx context.Context
waitForDeleteCtx context.Context
readers []engine.StatusReader
client dynamic.Interface
restMapper meta.RESTMapper
ctx context.Context
watchUntilReadyCtx context.Context
waitCtx context.Context
waitWithJobsCtx context.Context
waitForDeleteCtx context.Context
readers []engine.StatusReader
statusComputeWorkers int
logging.LogHolder
}
@ -76,6 +77,7 @@ func (w *statusWaiter) WatchUntilReady(resourceList ResourceList, timeout time.D
defer cancel()
w.Logger().Debug("waiting for resources", "count", len(resourceList), "timeout", timeout)
sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper)
sw.StatusComputeWorkers = w.statusComputeWorkers
jobSR := helmStatusReaders.NewCustomJobStatusReader(w.restMapper)
podSR := helmStatusReaders.NewCustomPodStatusReader(w.restMapper)
// We don't want to wait on any other resources as watchUntilReady is only for Helm hooks.
@ -98,6 +100,7 @@ func (w *statusWaiter) Wait(resourceList ResourceList, timeout time.Duration) er
defer cancel()
w.Logger().Debug("waiting for resources", "count", len(resourceList), "timeout", timeout)
sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper)
sw.StatusComputeWorkers = w.statusComputeWorkers
sw.StatusReader = statusreaders.NewStatusReader(w.restMapper, w.readers...)
return w.wait(ctx, resourceList, sw)
}
@ -110,6 +113,7 @@ func (w *statusWaiter) WaitWithJobs(resourceList ResourceList, timeout time.Dura
defer cancel()
w.Logger().Debug("waiting for resources", "count", len(resourceList), "timeout", timeout)
sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper)
sw.StatusComputeWorkers = w.statusComputeWorkers
newCustomJobStatusReader := helmStatusReaders.NewCustomJobStatusReader(w.restMapper)
readers := append([]engine.StatusReader(nil), w.readers...)
readers = append(readers, newCustomJobStatusReader)

@ -1293,6 +1293,28 @@ func TestWaitOptionFunctions(t *testing.T) {
WithWaitForDeleteMethodContext(ctx)(opts)
assert.Equal(t, ctx, opts.waitForDeleteCtx)
})
t.Run("WithStatusComputeWorkers sets statusComputeWorkers", func(t *testing.T) {
t.Parallel()
opts := &waitOptions{}
WithStatusComputeWorkers(8)(opts)
assert.Equal(t, 8, opts.statusComputeWorkers)
})
t.Run("WithStatusComputeWorkers clamps negative values to zero", func(t *testing.T) {
t.Parallel()
opts := &waitOptions{}
WithStatusComputeWorkers(-1)(opts)
assert.Equal(t, 0, opts.statusComputeWorkers,
"negative worker counts must not propagate to the underlying watcher")
})
t.Run("waitOptions.statusComputeWorkers defaults to zero", func(t *testing.T) {
t.Parallel()
opts := &waitOptions{}
assert.Equal(t, 0, opts.statusComputeWorkers,
"SDK consumers must opt in to concurrent status computation")
})
}
func TestMethodSpecificContextCancellation(t *testing.T) {

Loading…
Cancel
Save