diff --git a/cmd/helm/install.go b/cmd/helm/install.go index 7edd98091..7f4cf2bda 100644 --- a/cmd/helm/install.go +++ b/cmd/helm/install.go @@ -137,6 +137,7 @@ func addInstallFlags(cmd *cobra.Command, f *pflag.FlagSet, client *action.Instal f.BoolVar(&client.CreateNamespace, "create-namespace", false, "create the release namespace if not present") f.BoolVar(&client.DryRun, "dry-run", false, "simulate an install") f.BoolVar(&client.DisableHooks, "no-hooks", false, "prevent hooks from running during install") + f.IntVar(&client.HookParallelism, "hook-parallelism", 1, "maximum number of hooks to execute in parallel") f.BoolVar(&client.Replace, "replace", false, "re-use the given name, only if that name is a deleted release which remains in the history. This is unsafe in production") f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)") f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout") diff --git a/cmd/helm/release_testing.go b/cmd/helm/release_testing.go index e4e09ef3b..31dcedec2 100644 --- a/cmd/helm/release_testing.go +++ b/cmd/helm/release_testing.go @@ -79,6 +79,7 @@ func newReleaseTestCmd(cfg *action.Configuration, out io.Writer) *cobra.Command f := cmd.Flags() f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)") + f.IntVar(&client.HookParallelism, "hook-parallelism", 1, "maximum number of hooks to execute in parallel") f.BoolVar(&outputLogs, "logs", false, "dump the logs from test pods (this runs after all tests are complete, but before any cleanup)") return cmd diff --git a/cmd/helm/rollback.go b/cmd/helm/rollback.go index 2cd6fa2cb..9986d7ec5 100644 --- a/cmd/helm/rollback.go +++ b/cmd/helm/rollback.go @@ -80,6 +80,7 @@ func newRollbackCmd(cfg *action.Configuration, out io.Writer) *cobra.Command { f.BoolVar(&client.Recreate, "recreate-pods", false, "performs pods restart for the resource if applicable") f.BoolVar(&client.Force, "force", false, "force resource update through delete/recreate if needed") f.BoolVar(&client.DisableHooks, "no-hooks", false, "prevent hooks from running during rollback") + f.IntVar(&client.HookParallelism, "hook-parallelism", 1, "maximum number of hooks to execute in parallel") f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)") f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout") f.BoolVar(&client.CleanupOnFail, "cleanup-on-fail", false, "allow deletion of new resources created in this rollback when rollback fails") diff --git a/cmd/helm/uninstall.go b/cmd/helm/uninstall.go index 509918e53..c6076858a 100644 --- a/cmd/helm/uninstall.go +++ b/cmd/helm/uninstall.go @@ -73,6 +73,7 @@ func newUninstallCmd(cfg *action.Configuration, out io.Writer) *cobra.Command { f := cmd.Flags() f.BoolVar(&client.DryRun, "dry-run", false, "simulate a uninstall") f.BoolVar(&client.DisableHooks, "no-hooks", false, "prevent hooks from running during uninstallation") + f.IntVar(&client.HookParallelism, "hook-parallelism", 1, "maximum number of hooks to execute in parallel") f.BoolVar(&client.KeepHistory, "keep-history", false, "remove all associated resources and mark the release as deleted, but retain the release history") f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)") f.StringVar(&client.Description, "description", "", "add a custom description") diff --git a/cmd/helm/upgrade.go b/cmd/helm/upgrade.go index 12d797545..326ae6351 100644 --- a/cmd/helm/upgrade.go +++ b/cmd/helm/upgrade.go @@ -100,6 +100,7 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command { instClient.ChartPathOptions = client.ChartPathOptions instClient.DryRun = client.DryRun instClient.DisableHooks = client.DisableHooks + instClient.HookParallelism = client.HookParallelism instClient.SkipCRDs = client.SkipCRDs instClient.Timeout = client.Timeout instClient.Wait = client.Wait @@ -173,6 +174,7 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command { f.MarkDeprecated("recreate-pods", "functionality will no longer be updated. Consult the documentation for other methods to recreate pods") f.BoolVar(&client.Force, "force", false, "force resource updates through a replacement strategy") f.BoolVar(&client.DisableHooks, "no-hooks", false, "disable pre/post upgrade hooks") + f.IntVar(&client.HookParallelism, "hook-parallelism", 1, "maximum number of hooks to execute in parallel") f.BoolVar(&client.DisableOpenAPIValidation, "disable-openapi-validation", false, "if set, the upgrade process will not validate rendered templates against the Kubernetes OpenAPI Schema") f.BoolVar(&client.SkipCRDs, "skip-crds", false, "if set, no CRDs will be installed when an upgrade is performed with install flag enabled. By default, CRDs are installed if not already present, when an upgrade is performed with install flag enabled") f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)") diff --git a/go.sum b/go.sum index 4186e039c..8f2bc2873 100644 --- a/go.sum +++ b/go.sum @@ -1168,6 +1168,8 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= +helm.sh/helm v1.2.1 h1:Jrn7kKQqQ/hnFWZEX+9pMFvYqFexkzrBnGqYBmIph7c= +helm.sh/helm v2.16.12+incompatible h1:nQfifk10KcpAGD1RJaNZVW/fWiqluV0JMuuDwdba4rw= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/action/action_test.go b/pkg/action/action_test.go index fedf260fb..d2537a5ac 100644 --- a/pkg/action/action_test.go +++ b/pkg/action/action_test.go @@ -262,6 +262,14 @@ func withKube(version string) chartOption { } } +func withSecondHook(hookManifest string) chartOption { + return func(opts *chartOptions) { + opts.Templates = append(opts.Templates, + &chart.File{Name: "templates/hooks-test", Data: []byte(hookManifest)}, + ) + } +} + // releaseStub creates a release stub, complete with the chartStub as its chart. func releaseStub() *release.Release { return namedReleaseStub("angry-panda", release.StatusDeployed) diff --git a/pkg/action/hooks.go b/pkg/action/hooks.go index 40c1ffdb6..29a460e06 100644 --- a/pkg/action/hooks.go +++ b/pkg/action/hooks.go @@ -18,6 +18,7 @@ package action import ( "bytes" "sort" + "sync" "time" "github.com/pkg/errors" @@ -26,100 +27,163 @@ import ( helmtime "helm.sh/helm/v3/pkg/time" ) -// execHook executes all of the hooks for the given hook event. -func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent, timeout time.Duration) error { - executingHooks := []*release.Hook{} +// execHookEvent executes all of the hooks for the given hook event. +func (cfg *Configuration) execHookEvent(rl *release.Release, event release.HookEvent, timeout time.Duration, parallelism int) error { + if parallelism < 1 { + parallelism = 1 + } + weightedHooks := make(map[int][]*release.Hook) for _, h := range rl.Hooks { for _, e := range h.Events { - if e == hook { - executingHooks = append(executingHooks, h) + if e == event { + // Set default delete policy to before-hook-creation + if h.DeletePolicies == nil || len(h.DeletePolicies) == 0 { + // TODO(jlegrone): Only apply before-hook-creation delete policy to run to completion + // resources. For all other resource types update in place if a + // resource with the same name already exists and is owned by the + // current release. + h.DeletePolicies = []release.HookDeletePolicy{release.HookBeforeHookCreation} + } + weightedHooks[h.Weight] = append(weightedHooks[h.Weight], h) } } } - // hooke are pre-ordered by kind, so keep order stable - sort.Stable(hookByWeight(executingHooks)) - - for _, h := range executingHooks { - // Set default delete policy to before-hook-creation - if h.DeletePolicies == nil || len(h.DeletePolicies) == 0 { - // TODO(jlegrone): Only apply before-hook-creation delete policy to run to completion - // resources. For all other resource types update in place if a - // resource with the same name already exists and is owned by the - // current release. - h.DeletePolicies = []release.HookDeletePolicy{release.HookBeforeHookCreation} - } - - if err := cfg.deleteHookByPolicy(h, release.HookBeforeHookCreation); err != nil { - return err + weights := make([]int, 0, len(weightedHooks)) + for w := range weightedHooks { + weights = append(weights, w) + // sort hooks in each weighted group by name + sort.Slice(weightedHooks[w], func(i, j int) bool { + return weightedHooks[w][i].Name < weightedHooks[w][j].Name + }) + } + sort.Ints(weights) + + var mut sync.RWMutex + for _, w := range weights { + sem := make(chan struct{}, parallelism) + errsChan := make(chan error) + errs := make([]error, 0) + for _, h := range weightedHooks[w] { + // execute hooks in parallel (with limited parallelism enforced by semaphore) + go func(h *release.Hook) { + sem <- struct{}{} + errsChan <- cfg.execHook(rl, h, &mut, timeout) + <-sem + }(h) } - - resources, err := cfg.KubeClient.Build(bytes.NewBufferString(h.Manifest), true) - if err != nil { - return errors.Wrapf(err, "unable to build kubernetes object for %s hook %s", hook, h.Path) + // collect errors + for range weightedHooks[w] { + if err := <-errsChan; err != nil { + errs = append(errs, err) + } } - // Record the time at which the hook was applied to the cluster - h.LastRun = release.HookExecution{ - StartedAt: helmtime.Now(), - Phase: release.HookPhaseRunning, - } - cfg.recordRelease(rl) - - // As long as the implementation of WatchUntilReady does not panic, HookPhaseFailed or HookPhaseSucceeded - // should always be set by this function. If we fail to do that for any reason, then HookPhaseUnknown is - // the most appropriate value to surface. - h.LastRun.Phase = release.HookPhaseUnknown - - // Create hook resources - if _, err := cfg.KubeClient.Create(resources); err != nil { - h.LastRun.CompletedAt = helmtime.Now() - h.LastRun.Phase = release.HookPhaseFailed - return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path) + if len(errs) > 0 { + return errors.Errorf("%s hook event failed with %d error(s): %s", event, len(errs), joinErrors(errs)) } + } - // Watch hook resources until they have completed - err = cfg.KubeClient.WatchUntilReady(resources, timeout) - // Note the time of success/failure - h.LastRun.CompletedAt = helmtime.Now() - // Mark hook as succeeded or failed - if err != nil { - h.LastRun.Phase = release.HookPhaseFailed - // If a hook is failed, check the annotation of the hook to determine whether the hook should be deleted - // under failed condition. If so, then clear the corresponding resource object in the hook - if err := cfg.deleteHookByPolicy(h, release.HookFailed); err != nil { + // If all hooks are successful, check the annotation of each hook to determine whether the hook should be deleted + // under succeeded condition. If so, then clear the corresponding resource object in each hook + for _, w := range weights { + for _, h := range weightedHooks[w] { + if err := cfg.deleteHookByPolicy(h, release.HookSucceeded); err != nil { return err } - return err } - h.LastRun.Phase = release.HookPhaseSucceeded } + return nil +} - // If all hooks are successful, check the annotation of each hook to determine whether the hook should be deleted - // under succeeded condition. If so, then clear the corresponding resource object in each hook - for _, h := range executingHooks { - if err := cfg.deleteHookByPolicy(h, release.HookSucceeded); err != nil { - return err +// // hooke are pre-ordered by kind, so keep order stable +// sort.Stable(hookByWeight(executingHooks)) + +// for _, h := range executingHooks { +// // Set default delete policy to before-hook-creation +// if h.DeletePolicies == nil || len(h.DeletePolicies) == 0 { +// // TODO(jlegrone): Only apply before-hook-creation delete policy to run to completion +// // resources. For all other resource types update in place if a +// // resource with the same name already exists and is owned by the +// // current release. +// h.DeletePolicies = []release.HookDeletePolicy{release.HookBeforeHookCreation} +// } + +// if err := cfg.deleteHookByPolicy(h, release.HookBeforeHookCreation); err != nil { +// return err +// } + +// resources, err := cfg.KubeClient.Build(bytes.NewBufferString(h.Manifest), true) +// if err != nil { +// return errors.Wrapf(err, "unable to build kubernetes object for %s hook %s", hook, h.Path) +// } + +// execHook executes a hook. +func (cfg *Configuration) execHook(rl *release.Release, h *release.Hook, mut *sync.RWMutex, timeout time.Duration) (err error) { + if err := cfg.deleteHookByPolicy(h, release.HookBeforeHookCreation); err != nil { + return err + } + + resources, err := cfg.KubeClient.Build(bytes.NewBufferString(h.Manifest), true) + if err != nil { + return errors.Wrapf(err, "unable to build kubernetes object for applying hook %s", h.Path) + } + + // Record the time at which the hook was applied to the cluster + updateHookPhase(h, mut, release.HookPhaseRunning) + // Thread safety: exclusive lock is necessary to ensure that none of the hook structs are modified during recordRelease + mut.Lock() + cfg.recordRelease(rl) + mut.Unlock() + + // As long as the implementation of WatchUntilReady does not panic, HookPhaseFailed or HookPhaseSucceeded + // should always be set by this function. If we fail to do that for any reason, then HookPhaseUnknown is + // the most appropriate value to surface. + defer func() { + if panic := recover(); panic != nil { + updateHookPhase(h, mut, release.HookPhaseUnknown) + err = errors.Errorf("panicked while executing hook %s", h.Path) } + }() + + // Create hook resources + if _, err = cfg.KubeClient.Create(resources); err != nil { + updateHookPhase(h, mut, release.HookPhaseFailed) + return errors.Wrapf(err, "warning: hook %s failed", h.Path) } + // Watch hook resources until they have completed then mark hook as succeeded or failed + if err = cfg.KubeClient.WatchUntilReady(resources, timeout); err != nil { + updateHookPhase(h, mut, release.HookPhaseFailed) + // If a hook is failed, check the annotation of the hook to determine whether the hook should be deleted + // under failed condition. If so, then clear the corresponding resource object in the hook. + if deleteHookErr := cfg.deleteHookByPolicy(h, release.HookFailed); deleteHookErr != nil { + return deleteHookErr + } + return err + } + updateHookPhase(h, mut, release.HookPhaseSucceeded) return nil } -// hookByWeight is a sorter for hooks -type hookByWeight []*release.Hook - -func (x hookByWeight) Len() int { return len(x) } -func (x hookByWeight) Swap(i, j int) { x[i], x[j] = x[j], x[i] } -func (x hookByWeight) Less(i, j int) bool { - if x[i].Weight == x[j].Weight { - return x[i].Name < x[j].Name +// updateHookPhase updates the phase of a hook in a thread-safe manner. +func updateHookPhase(h *release.Hook, mut *sync.RWMutex, phase release.HookPhase) { + // Thread safety: shared lock is sufficient because each execHook goroutine operates on a different hook + completedAtTime := helmtime.Now() + mut.RLock() + startedAtTime := helmtime.Now() + switch phase { + case release.HookPhaseRunning: + h.LastRun.StartedAt = startedAtTime + case release.HookPhaseSucceeded, release.HookPhaseFailed: + h.LastRun.CompletedAt = completedAtTime } - return x[i].Weight < x[j].Weight + h.LastRun.Phase = phase + mut.RUnlock() } -// deleteHookByPolicy deletes a hook if the hook policy instructs it to +// deleteHookByPolicy deletes a hook if the hook policy instructs it to. func (cfg *Configuration) deleteHookByPolicy(h *release.Hook, policy release.HookDeletePolicy) error { // Never delete CustomResourceDefinitions; this could cause lots of // cascading garbage collection. diff --git a/pkg/action/install.go b/pkg/action/install.go index caeefca68..1f2a0a342 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -75,6 +75,7 @@ type Install struct { CreateNamespace bool DryRun bool DisableHooks bool + HookParallelism int Replace bool Wait bool Devel bool @@ -326,7 +327,7 @@ func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release. // pre-install hooks if !i.DisableHooks { - if err := i.cfg.execHook(rel, release.HookPreInstall, i.Timeout); err != nil { + if err := i.cfg.execHookEvent(rel, release.HookPreInstall, i.Timeout, i.HookParallelism); err != nil { return i.failRelease(rel, fmt.Errorf("failed pre-install: %s", err)) } } @@ -352,7 +353,7 @@ func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release. } if !i.DisableHooks { - if err := i.cfg.execHook(rel, release.HookPostInstall, i.Timeout); err != nil { + if err := i.cfg.execHookEvent(rel, release.HookPostInstall, i.Timeout, i.HookParallelism); err != nil { return i.failRelease(rel, fmt.Errorf("failed post-install: %s", err)) } } @@ -383,6 +384,7 @@ func (i *Install) failRelease(rel *release.Release, err error) (*release.Release i.cfg.Log("Install failed and atomic is set, uninstalling release") uninstall := NewUninstall(i.cfg) uninstall.DisableHooks = i.DisableHooks + uninstall.HookParallelism = i.HookParallelism uninstall.KeepHistory = false uninstall.Timeout = i.Timeout if _, uninstallErr := uninstall.Run(i.ReleaseName); uninstallErr != nil { diff --git a/pkg/action/install_test.go b/pkg/action/install_test.go index 428e90295..2ae72e29d 100644 --- a/pkg/action/install_test.go +++ b/pkg/action/install_test.go @@ -403,6 +403,88 @@ func TestInstallRelease_Atomic(t *testing.T) { }) } +func TestInstallRelease_HookParallelism(t *testing.T) { + is := assert.New(t) + t.Run("hook parallelism of 0 defaults to 1", func(t *testing.T) { + instAction := installAction(t) + instAction.HookParallelism = 0 + vals := map[string]interface{}{} + res, err := instAction.Run(buildChart(), vals) + if err != nil { + t.Fatalf("Failed install: %s", err) + } + is.Equal(res.Name, "test-install-release", "Expected release name.") + is.Equal(res.Namespace, "spaced") + + rel, err := instAction.cfg.Releases.Get(res.Name, res.Version) + is.NoError(err) + + is.Len(rel.Hooks, 1) + is.Equal(rel.Hooks[0].Manifest, manifestWithHook) + is.Equal(rel.Hooks[0].Events[0], release.HookPostInstall) + is.Equal(rel.Hooks[0].Events[1], release.HookPreDelete, "Expected event 0 is pre-delete") + + is.NotEqual(len(res.Manifest), 0) + is.NotEqual(len(rel.Manifest), 0) + is.Contains(rel.Manifest, "---\n# Source: hello/templates/hello\nhello: world") + is.Equal(rel.Info.Description, "Install complete") + }) + + t.Run("hook parallelism greater than number of hooks", func(t *testing.T) { + instAction := installAction(t) + instAction.HookParallelism = 10 + vals := map[string]interface{}{} + res, err := instAction.Run(buildChart(), vals) + if err != nil { + t.Fatalf("Failed install: %s", err) + } + is.Equal(res.Name, "test-install-release", "Expected release name.") + is.Equal(res.Namespace, "spaced") + + rel, err := instAction.cfg.Releases.Get(res.Name, res.Version) + is.NoError(err) + + is.Len(rel.Hooks, 1) + is.Equal(rel.Hooks[0].Manifest, manifestWithHook) + is.Equal(rel.Hooks[0].Events[0], release.HookPostInstall) + is.Equal(rel.Hooks[0].Events[1], release.HookPreDelete, "Expected event 0 is pre-delete") + + is.NotEqual(len(res.Manifest), 0) + is.NotEqual(len(rel.Manifest), 0) + is.Contains(rel.Manifest, "---\n# Source: hello/templates/hello\nhello: world") + is.Equal(rel.Info.Description, "Install complete") + }) + + t.Run("hook parallelism with multiple hooks", func(t *testing.T) { + instAction := installAction(t) + instAction.HookParallelism = 2 + vals := map[string]interface{}{} + res, err := instAction.Run(buildChart(withSecondHook(manifestWithHook)), vals) + if err != nil { + t.Fatalf("Failed install: %s", err) + } + is.Equal(res.Name, "test-install-release", "Expected release name.") + is.Equal(res.Namespace, "spaced") + + rel, err := instAction.cfg.Releases.Get(res.Name, res.Version) + is.NoError(err) + + is.Len(rel.Hooks, 2) + is.Equal(rel.Hooks[0].Manifest, manifestWithHook) + is.Equal(rel.Hooks[0].Events[0], release.HookPostInstall) + is.Equal(rel.Hooks[0].Events[1], release.HookPreDelete, "Expected event 0 is pre-delete") + + is.Equal(rel.Hooks[1].Manifest, manifestWithHook) + is.Equal(rel.Hooks[1].Events[0], release.HookPostInstall) + is.Equal(rel.Hooks[1].Events[1], release.HookPreDelete, "Expected event 1 is pre-delete") + + is.NotEqual(len(res.Manifest), 0) + is.NotEqual(len(rel.Manifest), 0) + is.Contains(rel.Manifest, "---\n# Source: hello/templates/hello\nhello: world") + is.Equal(rel.Info.Description, "Install complete") + }) +} + func TestNameTemplate(t *testing.T) { testCases := []nameTemplateTestCase{ // Just a straight up nop please diff --git a/pkg/action/release_testing.go b/pkg/action/release_testing.go index 2f6f5cfce..e0fc73379 100644 --- a/pkg/action/release_testing.go +++ b/pkg/action/release_testing.go @@ -33,8 +33,9 @@ import ( // // It provides the implementation of 'helm test'. type ReleaseTesting struct { - cfg *Configuration - Timeout time.Duration + cfg *Configuration + Timeout time.Duration + HookParallelism int // Used for fetching logs from test pods Namespace string } @@ -62,7 +63,7 @@ func (r *ReleaseTesting) Run(name string) (*release.Release, error) { return rel, err } - if err := r.cfg.execHook(rel, release.HookTest, r.Timeout); err != nil { + if err := r.cfg.execHookEvent(rel, release.HookTest, r.Timeout, r.HookParallelism); err != nil { r.cfg.Releases.Update(rel) return rel, err } diff --git a/pkg/action/rollback.go b/pkg/action/rollback.go index 542acefae..7a2b91ec7 100644 --- a/pkg/action/rollback.go +++ b/pkg/action/rollback.go @@ -35,15 +35,16 @@ import ( type Rollback struct { cfg *Configuration - Version int - Timeout time.Duration - Wait bool - DisableHooks bool - DryRun bool - Recreate bool // will (if true) recreate pods after a rollback. - Force bool // will (if true) force resource upgrade through uninstall/recreate if needed - CleanupOnFail bool - MaxHistory int // MaxHistory limits the maximum number of revisions saved per release + Version int + Timeout time.Duration + Wait bool + DisableHooks bool + HookParallelism int + DryRun bool + Recreate bool // will (if true) recreate pods after a rollback. + Force bool // will (if true) force resource upgrade through uninstall/recreate if needed + CleanupOnFail bool + MaxHistory int // MaxHistory limits the maximum number of revisions saved per release } // NewRollback creates a new Rollback object with the given configuration. @@ -156,7 +157,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas // pre-rollback hooks if !r.DisableHooks { - if err := r.cfg.execHook(targetRelease, release.HookPreRollback, r.Timeout); err != nil { + if err := r.cfg.execHookEvent(targetRelease, release.HookPreRollback, r.Timeout, r.HookParallelism); err != nil { return targetRelease, err } } else { @@ -209,7 +210,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas // post-rollback hooks if !r.DisableHooks { - if err := r.cfg.execHook(targetRelease, release.HookPostRollback, r.Timeout); err != nil { + if err := r.cfg.execHookEvent(targetRelease, release.HookPostRollback, r.Timeout, r.HookParallelism); err != nil { return targetRelease, err } } diff --git a/pkg/action/uninstall.go b/pkg/action/uninstall.go index c466c6ee2..2b6468744 100644 --- a/pkg/action/uninstall.go +++ b/pkg/action/uninstall.go @@ -34,11 +34,12 @@ import ( type Uninstall struct { cfg *Configuration - DisableHooks bool - DryRun bool - KeepHistory bool - Timeout time.Duration - Description string + DisableHooks bool + HookParallelism int + DryRun bool + KeepHistory bool + Timeout time.Duration + Description string } // NewUninstall creates a new Uninstall object with the given configuration. @@ -97,7 +98,7 @@ func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error) res := &release.UninstallReleaseResponse{Release: rel} if !u.DisableHooks { - if err := u.cfg.execHook(rel, release.HookPreDelete, u.Timeout); err != nil { + if err := u.cfg.execHookEvent(rel, release.HookPreDelete, u.Timeout, u.HookParallelism); err != nil { return res, err } } else { @@ -114,7 +115,7 @@ func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error) res.Info = kept if !u.DisableHooks { - if err := u.cfg.execHook(rel, release.HookPostDelete, u.Timeout); err != nil { + if err := u.cfg.execHookEvent(rel, release.HookPostDelete, u.Timeout, u.HookParallelism); err != nil { errs = append(errs, err) } } diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index c439af79d..7683378f4 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -66,6 +66,8 @@ type Upgrade struct { Wait bool // DisableHooks disables hook processing if set to true. DisableHooks bool + // HookParallelism controls the maximum number of hooks to run in parallel + HookParallelism int // DryRun controls whether the operation is prepared, but not executed. // If `true`, the upgrade is prepared but not performed. DryRun bool @@ -305,7 +307,7 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea // pre-upgrade hooks if !u.DisableHooks { - if err := u.cfg.execHook(upgradedRelease, release.HookPreUpgrade, u.Timeout); err != nil { + if err := u.cfg.execHookEvent(upgradedRelease, release.HookPreUpgrade, u.Timeout, u.HookParallelism); err != nil { return u.failRelease(upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %s", err)) } } else { @@ -337,7 +339,7 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea // post-upgrade hooks if !u.DisableHooks { - if err := u.cfg.execHook(upgradedRelease, release.HookPostUpgrade, u.Timeout); err != nil { + if err := u.cfg.execHookEvent(upgradedRelease, release.HookPostUpgrade, u.Timeout, u.HookParallelism); err != nil { return u.failRelease(upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %s", err)) } } @@ -401,6 +403,7 @@ func (u *Upgrade) failRelease(rel *release.Release, created kube.ResourceList, e rollin.Version = filteredHistory[0].Version rollin.Wait = true rollin.DisableHooks = u.DisableHooks + rollin.HookParallelism = u.HookParallelism rollin.Recreate = u.Recreate rollin.Force = u.Force rollin.Timeout = u.Timeout diff --git a/pkg/action/upgrade_test.go b/pkg/action/upgrade_test.go index f16de6479..a93ed308b 100644 --- a/pkg/action/upgrade_test.go +++ b/pkg/action/upgrade_test.go @@ -273,3 +273,169 @@ func TestUpgradeRelease_Pending(t *testing.T) { _, err := upAction.Run(rel.Name, buildChart(), vals) req.Contains(err.Error(), "progress", err) } + +func TestUpgradeRelease_HookParallelism(t *testing.T) { + is := assert.New(t) + t.Run("hook parallelism of 0 defaults to 1", func(t *testing.T) { + upAction := upgradeAction(t) + upAction.HookParallelism = 0 + chartDefaultValues := map[string]interface{}{ + "subchart": map[string]interface{}{ + "enabled": true, + }, + } + dependency := chart.Dependency{ + Name: "subchart", + Version: "0.1.0", + Repository: "http://some-repo.com", + Condition: "subchart.enabled", + } + sampleChart := buildChart( + withName("sample"), + withValues(chartDefaultValues), + withMetadataDependency(dependency), + ) + now := time.Now() + rel := &release.Release{ + Name: "nuketown", + Info: &release.Info{ + FirstDeployed: now, + LastDeployed: now, + Status: release.StatusDeployed, + Description: "Named Release Stub", + }, + Chart: sampleChart, + Version: 1, + } + err := upAction.cfg.Releases.Create(rel) + is.NoError(err) + res, err := upAction.Run(rel.Name, sampleChart, map[string]interface{}{}) + if err != nil { + t.Fatalf("Failed upgrade: %s", err) + } + is.Equal(res.Name, "nuketown", "Expected release name.") + + rel, err = upAction.cfg.Releases.Get(res.Name, res.Version) + is.NoError(err) + + is.Len(rel.Hooks, 1) + is.Equal(rel.Hooks[0].Manifest, manifestWithHook) + is.Equal(rel.Hooks[0].Events[0], release.HookPostInstall) + is.Equal(rel.Hooks[0].Events[1], release.HookPreDelete, "Expected event 0 is pre-delete") + + is.NotEqual(len(res.Manifest), 0) + is.NotEqual(len(rel.Manifest), 0) + is.Contains(rel.Manifest, "---\n# Source: sample/templates/hello\nhello: world") + is.Equal(rel.Info.Description, "Upgrade complete") + }) + + t.Run("hook parallelism greater than number of hooks", func(t *testing.T) { + upAction := upgradeAction(t) + upAction.HookParallelism = 10 + chartDefaultValues := map[string]interface{}{ + "subchart": map[string]interface{}{ + "enabled": true, + }, + } + dependency := chart.Dependency{ + Name: "subchart", + Version: "0.1.0", + Repository: "http://some-repo.com", + Condition: "subchart.enabled", + } + sampleChart := buildChart( + withName("sample"), + withValues(chartDefaultValues), + withMetadataDependency(dependency), + ) + now := time.Now() + rel := &release.Release{ + Name: "nuketown", + Info: &release.Info{ + FirstDeployed: now, + LastDeployed: now, + Status: release.StatusDeployed, + Description: "Named Release Stub", + }, + Chart: sampleChart, + Version: 1, + } + err := upAction.cfg.Releases.Create(rel) + is.NoError(err) + res, err := upAction.Run(rel.Name, sampleChart, map[string]interface{}{}) + if err != nil { + t.Fatalf("Failed upgrade: %s", err) + } + is.Equal(res.Name, "nuketown", "Expected release name.") + + rel, err = upAction.cfg.Releases.Get(res.Name, res.Version) + is.NoError(err) + + is.Len(rel.Hooks, 1) + is.Equal(rel.Hooks[0].Manifest, manifestWithHook) + is.Equal(rel.Hooks[0].Events[0], release.HookPostInstall) + is.Equal(rel.Hooks[0].Events[1], release.HookPreDelete, "Expected event 0 is pre-delete") + + is.NotEqual(len(res.Manifest), 0) + is.NotEqual(len(rel.Manifest), 0) + is.Contains(rel.Manifest, "---\n# Source: sample/templates/hello\nhello: world") + is.Equal(rel.Info.Description, "Upgrade complete") + }) + + t.Run("hook parallelism with multiple hooks", func(t *testing.T) { + upAction := upgradeAction(t) + upAction.HookParallelism = 2 + chartDefaultValues := map[string]interface{}{ + "subchart": map[string]interface{}{ + "enabled": true, + }, + } + dependency := chart.Dependency{ + Name: "subchart", + Version: "0.1.0", + Repository: "http://some-repo.com", + Condition: "subchart.enabled", + } + sampleChart := buildChart( + withName("sample"), + withValues(chartDefaultValues), + withMetadataDependency(dependency), + withSecondHook(manifestWithHook), + ) + now := time.Now() + rel := &release.Release{ + Name: "nuketown", + Info: &release.Info{ + FirstDeployed: now, + LastDeployed: now, + Status: release.StatusDeployed, + Description: "Named Release Stub", + }, + Chart: sampleChart, + Version: 1, + } + err := upAction.cfg.Releases.Create(rel) + is.NoError(err) + res, err := upAction.Run(rel.Name, sampleChart, map[string]interface{}{}) + if err != nil { + t.Fatalf("Failed upgrade: %s", err) + } + is.Equal(res.Name, "nuketown", "Expected release name.") + + rel, err = upAction.cfg.Releases.Get(res.Name, res.Version) + is.NoError(err) + + is.Len(rel.Hooks, 2) + is.Equal(rel.Hooks[0].Manifest, manifestWithHook) + is.Equal(rel.Hooks[0].Events[0], release.HookPostInstall) + is.Equal(rel.Hooks[0].Events[1], release.HookPreDelete, "Expected event 0 is pre-delete") + is.Equal(rel.Hooks[1].Manifest, manifestWithHook) + is.Equal(rel.Hooks[1].Events[0], release.HookPostInstall) + is.Equal(rel.Hooks[1].Events[1], release.HookPreDelete, "Expected event 0 is pre-delete") + + is.NotEqual(len(res.Manifest), 0) + is.NotEqual(len(rel.Manifest), 0) + is.Contains(rel.Manifest, "---\n# Source: sample/templates/hello\nhello: world") + is.Equal(rel.Info.Description, "Upgrade complete") + }) +}