diff --git a/pkg/action/hooks.go b/pkg/action/hooks.go index a050b466a..600a4c006 100644 --- a/pkg/action/hooks.go +++ b/pkg/action/hooks.go @@ -19,6 +19,7 @@ import ( "bytes" "context" "sort" + "time" "github.com/pkg/errors" @@ -28,7 +29,7 @@ import ( ) // execHook executes all of the hooks for the given hook event. -func (cfg *Configuration) execHook(ctx context.Context, rl *release.Release, hook release.HookEvent) error { +func (cfg *Configuration) execHook(timeout time.Duration, rl *release.Release, hook release.HookEvent) error { executingHooks := []*release.Hook{} for _, h := range rl.Hooks { @@ -81,12 +82,17 @@ func (cfg *Configuration) execHook(ctx context.Context, rl *release.Release, hoo } // Check if kube.Interface implementation satisfies kube.ContextInterface interface. - // If it doesn't log a warning and move on, nothing we can do. + // If not, fallback to time based watch to maintain backwards compatibility. if kubeClient, ok := cfg.KubeClient.(kube.ContextInterface); ok { + // Helm 4 TODO: WatchUntilReady should be replaced with it's context + // aware counterpart. + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() // Watch hook resources until they have completed err = kubeClient.WatchUntilReadyWithContext(ctx, resources) } else { - cfg.Log("WARNING: kube.ContextInterface not satisfied") + // + err = cfg.KubeClient.WatchUntilReady(resources, timeout) } // Note the time of success/failure diff --git a/pkg/action/install.go b/pkg/action/install.go index b1764c1db..5e43f47a8 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -205,10 +205,7 @@ func (i *Install) installCRDs(crds []chart.CRD) error { // // If DryRun is set to true, this will prepare the release, but not install it func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.Release, error) { - ctx, cancel := context.WithTimeout(context.TODO(), i.Timeout) - defer cancel() - - return i.RunWithContext(ctx, chrt, vals) + return i.RunWithContext(context.TODO(), chrt, vals) } // Run executes the installation with Context @@ -376,8 +373,8 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma func (i *Install) performInstall(ctx context.Context, rel *release.Release, toBeAdopted, resources kube.ResourceList) (*release.Release, error) { // pre-install hooks if !i.DisableHooks { - if err := i.cfg.execHook(ctx, rel, release.HookPreInstall); err != nil { - return i.failRelease(ctx, rel, fmt.Errorf("failed pre-install: %w", err)) + if err := i.cfg.execHook(i.Timeout, rel, release.HookPreInstall); err != nil { + return i.failRelease(rel, fmt.Errorf("failed pre-install: %w", err)) } } @@ -386,31 +383,42 @@ func (i *Install) performInstall(ctx context.Context, rel *release.Release, toBe // to true, since that is basically an upgrade operation. if len(toBeAdopted) == 0 && len(resources) > 0 { if _, err := i.cfg.KubeClient.Create(resources); err != nil { - return i.failRelease(ctx, rel, err) + return i.failRelease(rel, err) } } else if len(resources) > 0 { if _, err := i.cfg.KubeClient.Update(toBeAdopted, resources, i.Force); err != nil { - return i.failRelease(ctx, rel, err) + return i.failRelease(rel, err) } } - // Check if kube.Interface implementation, also satisfies kube.ContextInterface. - kubeClient, ok := i.cfg.KubeClient.(kube.ContextInterface) - if i.Wait && ok { - if i.WaitForJobs { - if err := kubeClient.WaitWithJobsContext(ctx, resources); err != nil { - return i.failRelease(ctx, rel, err) - } - } else { - if err := kubeClient.WaitWithContext(ctx, resources); err != nil { - return i.failRelease(ctx, rel, err) - } + if i.Wait { + var err error + + ctx, cancel := context.WithTimeout(ctx, i.Timeout) + defer cancel() + + kubeClient, ok := i.cfg.KubeClient.(kube.ContextInterface) + // Helm 4 TODO: WaitWithJobs and Wait should be replaced with their context + // aware counterparts. + switch { + case ok && i.WaitForJobs: + err = kubeClient.WaitWithJobsContext(ctx, resources) + case ok && !i.WaitForJobs: + err = kubeClient.WaitWithContext(ctx, resources) + case i.WaitForJobs: + err = i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout) + case !i.WaitForJobs: + err = i.cfg.KubeClient.Wait(resources, i.Timeout) + } + + if err != nil { + return i.failRelease(rel, err) } } if !i.DisableHooks { - if err := i.cfg.execHook(ctx, rel, release.HookPostInstall); err != nil { - return i.failRelease(ctx, rel, fmt.Errorf("failed post-install: %w", err)) + if err := i.cfg.execHook(i.Timeout, rel, release.HookPostInstall); err != nil { + return i.failRelease(rel, fmt.Errorf("failed post-install: %w", err)) } } @@ -434,18 +442,16 @@ func (i *Install) performInstall(ctx context.Context, rel *release.Release, toBe return rel, nil } -func (i *Install) failRelease(ctx context.Context, rel *release.Release, err error) (*release.Release, error) { +func (i *Install) failRelease(rel *release.Release, err error) (*release.Release, error) { rel.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", i.ReleaseName, err.Error())) if i.Atomic { i.cfg.Log("Install failed and atomic is set, uninstalling release") uninstall := NewUninstall(i.cfg) uninstall.DisableHooks = i.DisableHooks uninstall.KeepHistory = false - // TODO: Not sure if a new ctx should be created by the timeout field, - // because Timeout will be replaced by contexts. Should a background ctx be used - // so we don't timeout while uninstalling? uninstall.Timeout = i.Timeout - if _, uninstallErr := uninstall.RunWithContext(context.Background(), i.ReleaseName); uninstallErr != nil { + // Helm 4 TODO: Uninstalling needs to be handled properly on a failed atomic install. + if _, uninstallErr := uninstall.RunWithContext(context.TODO(), i.ReleaseName); uninstallErr != nil { return rel, errors.Wrapf(uninstallErr, "an error occurred while uninstalling the release. original install error: %s", err) } return rel, errors.Wrapf(err, "release %s failed, and has been uninstalled due to atomic being set", i.ReleaseName) diff --git a/pkg/action/install_test.go b/pkg/action/install_test.go index ca48e6cde..7354547e9 100644 --- a/pkg/action/install_test.go +++ b/pkg/action/install_test.go @@ -387,15 +387,13 @@ func TestInstallRelease_Wait_Interrupted(t *testing.T) { instAction := installAction(t) instAction.ReleaseName = "interrupted-release" failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + failer.WaitError = context.Canceled instAction.cfg.KubeClient = failer instAction.Wait = true vals := map[string]interface{}{} - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - cancel() + res, err := instAction.RunWithContext(context.Background(), buildChart(), vals) - res, err := instAction.RunWithContext(ctx, buildChart(), vals) is.Error(err) is.Contains(res.Info.Description, "Release \"interrupted-release\" failed: context canceled") is.Equal(res.Info.Status, release.StatusFailed) @@ -464,15 +462,12 @@ func TestInstallRelease_Atomic_Interrupted(t *testing.T) { instAction := installAction(t) instAction.ReleaseName = "interrupted-release" failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + failer.WaitError = context.Canceled instAction.cfg.KubeClient = failer instAction.Atomic = true vals := map[string]interface{}{} - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - cancel() - - res, err := instAction.RunWithContext(ctx, buildChart(), vals) + res, err := instAction.RunWithContext(context.Background(), buildChart(), vals) is.Error(err) is.Contains(err.Error(), "context canceled") is.Contains(err.Error(), "atomic") diff --git a/pkg/action/release_testing.go b/pkg/action/release_testing.go index 2dc4e7a41..0ef9490e2 100644 --- a/pkg/action/release_testing.go +++ b/pkg/action/release_testing.go @@ -87,7 +87,7 @@ func (r *ReleaseTesting) RunWithContext(ctx context.Context, name string) (*rele rel.Hooks = executingHooks } - if err := r.cfg.execHook(ctx, rel, release.HookTest); err != nil { + if err := r.cfg.execHook(r.Timeout, rel, release.HookTest); err != nil { rel.Hooks = append(skippedHooks, rel.Hooks...) r.cfg.Releases.Update(rel) return rel, err @@ -99,10 +99,7 @@ func (r *ReleaseTesting) RunWithContext(ctx context.Context, name string) (*rele // Run executes 'helm test' against the given release. func (r *ReleaseTesting) Run(name string) (*release.Release, error) { - ctx, cancel := context.WithTimeout(context.TODO(), r.Timeout) - defer cancel() - - return r.RunWithContext(ctx, name) + return r.RunWithContext(context.TODO(), name) } // GetPodLogs will write the logs for all test pods in the given release into diff --git a/pkg/action/rollback.go b/pkg/action/rollback.go index 989e771f1..11e3cb52c 100644 --- a/pkg/action/rollback.go +++ b/pkg/action/rollback.go @@ -91,10 +91,7 @@ func (r *Rollback) RunWithContext(ctx context.Context, name string) error { // Run executes 'helm rollback' against the given release. func (r *Rollback) Run(name string) error { - ctx, cancel := context.WithTimeout(context.TODO(), r.Timeout) - defer cancel() - - return r.RunWithContext(ctx, name) + return r.RunWithContext(context.TODO(), name) } // prepareRollback finds the previous release and prepares a new release object with @@ -165,7 +162,7 @@ func (r *Rollback) performRollback(ctx context.Context, currentRelease, targetRe // pre-rollback hooks if !r.DisableHooks { - if err := r.cfg.execHook(ctx, targetRelease, release.HookPreRollback); err != nil { + if err := r.cfg.execHook(r.Timeout, targetRelease, release.HookPreRollback); err != nil { return targetRelease, err } } else { @@ -232,7 +229,7 @@ func (r *Rollback) performRollback(ctx context.Context, currentRelease, targetRe // post-rollback hooks if !r.DisableHooks { - if err := r.cfg.execHook(ctx, targetRelease, release.HookPostRollback); err != nil { + if err := r.cfg.execHook(r.Timeout, targetRelease, release.HookPostRollback); err != nil { return targetRelease, err } } diff --git a/pkg/action/uninstall.go b/pkg/action/uninstall.go index 9d2078237..87a77e879 100644 --- a/pkg/action/uninstall.go +++ b/pkg/action/uninstall.go @@ -109,7 +109,7 @@ func (u *Uninstall) RunWithContext(ctx context.Context, name string) (*release.U res := &release.UninstallReleaseResponse{Release: rel} if !u.DisableHooks { - if err := u.cfg.execHook(ctx, rel, release.HookPreDelete); err != nil { + if err := u.cfg.execHook(u.Timeout, rel, release.HookPreDelete); err != nil { return res, err } } else { @@ -134,15 +134,25 @@ func (u *Uninstall) RunWithContext(ctx context.Context, name string) (*release.U res.Info = kept if u.Wait { - if kubeClient, ok := u.cfg.KubeClient.(kube.ContextInterface); ok { - if err := kubeClient.WaitForDeleteWithContext(ctx, deletedResources); err != nil { - errs = append(errs, err) - } + var err error + // Helm 4 TODO: WaitForDelete should be replaced with it's context + // aware counterpart. + switch kubeClient := u.cfg.KubeClient.(type) { + case kube.ContextInterface: + err = kubeClient.WaitForDeleteWithContext(ctx, deletedResources) + case kube.InterfaceExt: + err = kubeClient.WaitForDelete(deletedResources, u.Timeout) + default: + u.cfg.Log("WARNING: KubeClient does not satisfy ContextInterface, or InterfaceExt") + } + + if err != nil { + errs = append(errs, err) } } if !u.DisableHooks { - if err := u.cfg.execHook(ctx, rel, release.HookPostDelete); err != nil { + if err := u.cfg.execHook(u.Timeout, rel, release.HookPostDelete); err != nil { errs = append(errs, err) } } diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index cdad2e121..fe4b3277d 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -341,7 +341,7 @@ func (u *Upgrade) releasingUpgrade(ctx context.Context, upgradedRelease *release // pre-upgrade hooks if !u.DisableHooks { - if err := u.cfg.execHook(ctx, upgradedRelease, release.HookPreUpgrade); err != nil { + if err := u.cfg.execHook(u.Timeout, upgradedRelease, release.HookPreUpgrade); err != nil { return u.failRelease(upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %w", err)) } } else { @@ -364,27 +364,38 @@ func (u *Upgrade) releasingUpgrade(ctx context.Context, upgradedRelease *release } } - kubeClient, ok := u.cfg.KubeClient.(kube.ContextInterface) - if u.Wait && ok { + if u.Wait { u.cfg.Log( "waiting for release %s resources (created: %d updated: %d deleted: %d)", upgradedRelease.Name, len(results.Created), len(results.Updated), len(results.Deleted)) - if u.WaitForJobs { - if err := kubeClient.WaitWithJobsContext(ctx, target); err != nil { - u.cfg.recordRelease(originalRelease) - return u.failRelease(upgradedRelease, results.Created, err) - } - } else { - if err := kubeClient.WaitWithContext(ctx, target); err != nil { - u.cfg.recordRelease(originalRelease) - return u.failRelease(upgradedRelease, results.Created, err) - } + var err error + + ctx, cancel := context.WithTimeout(ctx, u.Timeout) + defer cancel() + + kubeClient, ok := u.cfg.KubeClient.(kube.ContextInterface) + // Helm 4 TODO: WaitWithJobs and Wait should be replaced with their context + // aware counterparts. + switch { + case ok && u.WaitForJobs: + err = kubeClient.WaitWithJobsContext(ctx, target) + case ok && !u.WaitForJobs: + err = kubeClient.WaitWithContext(ctx, target) + case u.WaitForJobs: + err = u.cfg.KubeClient.WaitWithJobs(target, u.Timeout) + case !u.WaitForJobs: + err = u.cfg.KubeClient.Wait(target, u.Timeout) + } + + if err != nil { + u.cfg.recordRelease(originalRelease) + return u.failRelease(upgradedRelease, results.Created, err) } } // post-upgrade hooks if !u.DisableHooks { - if err := u.cfg.execHook(ctx, upgradedRelease, release.HookPostUpgrade); err != nil { + if err := u.cfg.execHook(u.Timeout, upgradedRelease, release.HookPostUpgrade); err != nil { return u.failRelease(upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %w", err)) } } diff --git a/pkg/action/upgrade_test.go b/pkg/action/upgrade_test.go index 22cb1042a..55b294e6d 100644 --- a/pkg/action/upgrade_test.go +++ b/pkg/action/upgrade_test.go @@ -357,15 +357,12 @@ func TestUpgradeRelease_Interrupted_Wait(t *testing.T) { upAction.cfg.Releases.Create(rel) failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + failer.WaitError = context.Canceled upAction.cfg.KubeClient = failer upAction.Wait = true vals := map[string]interface{}{} - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - cancel() - - res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) + res, err := upAction.RunWithContext(context.Background(), rel.Name, buildChart(), vals) req.Error(err) is.Contains(res.Info.Description, "Upgrade \"interrupted-release\" failed: context canceled") @@ -385,15 +382,17 @@ func TestUpgradeRelease_Interrupted_Atomic(t *testing.T) { upAction.cfg.Releases.Create(rel) failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + failer.WaitError = context.Canceled + failer.WaitDuration = 2 * time.Second upAction.cfg.KubeClient = failer upAction.Atomic = true vals := map[string]interface{}{} - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - cancel() + // After the first Wait error, error needs to be set nil + // so atomic cleanup passes. + time.AfterFunc(failer.WaitDuration, func() { failer.WaitError = nil }) - res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) + res, err := upAction.RunWithContext(context.Background(), rel.Name, buildChart(), vals) req.Error(err) is.Contains(err.Error(), "release interrupted-release failed, and has been rolled back due to atomic being set: context canceled") diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 63a28b6fa..647175235 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -739,7 +739,7 @@ func (c *Client) watchUntilReady(ctx context.Context, info *resource.Info) error return nil } - c.Log("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeFromCtx(ctx)) + c.Log("Watching for changes to %s %s with timeout of %q", kind, info.Name, timeFromCtx(ctx)) // Use a selector on the name of the resource. This should be unique for the // given version and kind diff --git a/pkg/kube/fake/fake.go b/pkg/kube/fake/fake.go index a2a5186e3..7addb364a 100644 --- a/pkg/kube/fake/fake.go +++ b/pkg/kube/fake/fake.go @@ -77,14 +77,11 @@ func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration) e // Waits the amount of time defined on f.WaitDuration, then returns the configured error if set or prints. func (f *FailingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList) error { + time.Sleep(f.WaitDuration) if f.WaitError != nil { return f.WaitError } - if err := ctx.Err(); err != nil { - return err - } - return f.PrintingKubeClient.WaitWithContext(ctx, resources) } @@ -101,11 +98,6 @@ func (f *FailingKubeClient) WaitWithJobsContext(ctx context.Context, resources k if f.WaitError != nil { return f.WaitError } - - if err := ctx.Err(); err != nil { - return err - } - return f.PrintingKubeClient.WaitWithJobsContext(ctx, resources) } @@ -122,11 +114,6 @@ func (f *FailingKubeClient) WaitForDeleteWithContext(ctx context.Context, resour if f.WaitError != nil { return f.WaitError } - - if err := ctx.Err(); err != nil { - return err - } - return f.PrintingKubeClient.WaitForDeleteWithContext(ctx, resources) } @@ -202,11 +189,6 @@ func (f *FailingKubeClient) WatchUntilReadyWithContext(ctx context.Context, reso if f.WatchUntilReadyError != nil { return f.WatchUntilReadyError } - - if err := ctx.Err(); err != nil { - return err - } - return f.PrintingKubeClient.WatchUntilReady(resources, 0) } diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index 03268f59a..6d353719c 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -42,17 +42,21 @@ type waiter struct { } // timeFromCtx extracts time until deadline of ctx. -func timeFromCtx(ctx context.Context) time.Duration { - deadline, _ := ctx.Deadline() +func timeFromCtx(ctx context.Context) string { + deadline, ok := ctx.Deadline() + // No deadline means context won't timeout + if !ok { + return "none" + } - return time.Until(deadline) + return time.Until(deadline).String() } // waitForResources polls to get the current status of all pods, PVCs, Services and // Jobs(optional) until all are ready or a timeout is reached func (w *waiter) waitForResources(ctx context.Context, created ResourceList) error { - w.log("beginning wait for %d resources with timeout of %v", len(created), timeFromCtx(ctx)) + w.log("beginning wait for %d resources with timeout of %q", len(created), timeFromCtx(ctx)) return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { for _, v := range created { @@ -67,7 +71,7 @@ func (w *waiter) waitForResources(ctx context.Context, created ResourceList) err // waitForDeletedResources polls to check if all the resources are deleted or a timeout is reached func (w *waiter) waitForDeletedResources(ctx context.Context, deleted ResourceList) error { - w.log("beginning wait for %d resources to be deleted with timeout of %v", len(deleted), timeFromCtx(ctx)) + w.log("beginning wait for %d resources to be deleted with timeout of %q", len(deleted), timeFromCtx(ctx)) return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { for _, v := range deleted {