diff --git a/pkg/action/hooks.go b/pkg/action/hooks.go index 40c1ffdb6..600a4c006 100644 --- a/pkg/action/hooks.go +++ b/pkg/action/hooks.go @@ -17,17 +17,19 @@ package action import ( "bytes" + "context" "sort" "time" "github.com/pkg/errors" + "helm.sh/helm/v3/pkg/kube" "helm.sh/helm/v3/pkg/release" helmtime "helm.sh/helm/v3/pkg/time" ) // execHook executes all of the hooks for the given hook event. -func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent, timeout time.Duration) error { +func (cfg *Configuration) execHook(timeout time.Duration, rl *release.Release, hook release.HookEvent) error { executingHooks := []*release.Hook{} for _, h := range rl.Hooks { @@ -79,8 +81,20 @@ func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent, return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path) } - // Watch hook resources until they have completed - err = cfg.KubeClient.WatchUntilReady(resources, timeout) + // Check if kube.Interface implementation satisfies kube.ContextInterface interface. + // If not, fallback to time based watch to maintain backwards compatibility. + if kubeClient, ok := cfg.KubeClient.(kube.ContextInterface); ok { + // Helm 4 TODO: WatchUntilReady should be replaced with it's context + // aware counterpart. + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + // Watch hook resources until they have completed + err = kubeClient.WatchUntilReadyWithContext(ctx, resources) + } else { + // + err = cfg.KubeClient.WatchUntilReady(resources, timeout) + } + // Note the time of success/failure h.LastRun.CompletedAt = helmtime.Now() // Mark hook as succeeded or failed diff --git a/pkg/action/install.go b/pkg/action/install.go index e3538a4f5..ca8e76cf6 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -212,10 +212,8 @@ func (i *Install) installCRDs(crds []chart.CRD) error { // Run executes the installation // // If DryRun is set to true, this will prepare the release, but not install it - func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.Release, error) { - ctx := context.Background() - return i.RunWithContext(ctx, chrt, vals) + return i.RunWithContext(context.TODO(), chrt, vals) } // Run executes the installation with Context @@ -389,31 +387,12 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma return rel, err } - rel, err = i.performInstallCtx(ctx, rel, toBeAdopted, resources) + rel, err = i.performInstall(ctx, rel, toBeAdopted, resources) if err != nil { rel, err = i.failRelease(rel, err) } - return rel, err -} - -func (i *Install) performInstallCtx(ctx context.Context, rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) (*release.Release, error) { - type Msg struct { - r *release.Release - e error - } - resultChan := make(chan Msg, 1) - go func() { - rel, err := i.performInstall(rel, toBeAdopted, resources) - resultChan <- Msg{rel, err} - }() - select { - case <-ctx.Done(): - err := ctx.Err() - return rel, err - case msg := <-resultChan: - return msg.r, msg.e - } + return rel, err } // isDryRun returns true if Upgrade is set to run as a DryRun @@ -424,12 +403,12 @@ func (i *Install) isDryRun() bool { return false } -func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) (*release.Release, error) { +func (i *Install) performInstall(ctx context.Context, rel *release.Release, toBeAdopted, resources kube.ResourceList) (*release.Release, error) { var err error // pre-install hooks if !i.DisableHooks { - if err := i.cfg.execHook(rel, release.HookPreInstall, i.Timeout); err != nil { - return rel, fmt.Errorf("failed pre-install: %s", err) + if err := i.cfg.execHook(i.Timeout, rel, release.HookPreInstall); err != nil { + return rel, fmt.Errorf("failed pre-install: %w", err) } } @@ -442,23 +421,37 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource _, err = i.cfg.KubeClient.Update(toBeAdopted, resources, i.Force) } if err != nil { - return rel, err + return nil, err } if i.Wait { - if i.WaitForJobs { + var err error + + ctx, cancel := context.WithTimeout(ctx, i.Timeout) + defer cancel() + + kubeClient, ok := i.cfg.KubeClient.(kube.ContextInterface) + // Helm 4 TODO: WaitWithJobs and Wait should be replaced with their context + // aware counterparts. + switch { + case ok && i.WaitForJobs: + err = kubeClient.WaitWithJobsContext(ctx, resources) + case ok && !i.WaitForJobs: + err = kubeClient.WaitWithContext(ctx, resources) + case i.WaitForJobs: err = i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout) - } else { + case !i.WaitForJobs: err = i.cfg.KubeClient.Wait(resources, i.Timeout) } + if err != nil { return rel, err } } if !i.DisableHooks { - if err := i.cfg.execHook(rel, release.HookPostInstall, i.Timeout); err != nil { - return rel, fmt.Errorf("failed post-install: %s", err) + if err := i.cfg.execHook(i.Timeout, rel, release.HookPostInstall); err != nil { + return rel, fmt.Errorf("failed post-install: %w", err) } } @@ -490,7 +483,8 @@ func (i *Install) failRelease(rel *release.Release, err error) (*release.Release uninstall.DisableHooks = i.DisableHooks uninstall.KeepHistory = false uninstall.Timeout = i.Timeout - if _, uninstallErr := uninstall.Run(i.ReleaseName); uninstallErr != nil { + // Helm 4 TODO: Uninstalling needs to be handled properly on a failed atomic install. + if _, uninstallErr := uninstall.RunWithContext(context.TODO(), i.ReleaseName); uninstallErr != nil { return rel, errors.Wrapf(uninstallErr, "an error occurred while uninstalling the release. original install error: %s", err) } return rel, errors.Wrapf(err, "release %s failed, and has been uninstalled due to atomic being set", i.ReleaseName) diff --git a/pkg/action/install_test.go b/pkg/action/install_test.go index bc0890115..db0024319 100644 --- a/pkg/action/install_test.go +++ b/pkg/action/install_test.go @@ -104,7 +104,8 @@ func TestInstallReleaseWithValues(t *testing.T) { "simpleKey": "simpleValue", }, } - res, err := instAction.Run(buildChart(withSampleValues()), userVals) + + res, err := instAction.RunWithContext(context.Background(), buildChart(withSampleValues()), userVals) if err != nil { t.Fatalf("Failed install: %s", err) } @@ -130,7 +131,7 @@ func TestInstallReleaseClientOnly(t *testing.T) { is := assert.New(t) instAction := installAction(t) instAction.ClientOnly = true - instAction.Run(buildChart(), nil) // disregard output + instAction.RunWithContext(context.Background(), buildChart(), nil) // disregard output is.Equal(instAction.cfg.Capabilities, chartutil.DefaultCapabilities) is.Equal(instAction.cfg.KubeClient, &kubefake.PrintingKubeClient{Out: io.Discard}) @@ -140,7 +141,7 @@ func TestInstallRelease_NoName(t *testing.T) { instAction := installAction(t) instAction.ReleaseName = "" vals := map[string]interface{}{} - _, err := instAction.Run(buildChart(), vals) + _, err := instAction.RunWithContext(context.Background(), buildChart(), vals) if err == nil { t.Fatal("expected failure when no name is specified") } @@ -152,7 +153,8 @@ func TestInstallRelease_WithNotes(t *testing.T) { instAction := installAction(t) instAction.ReleaseName = "with-notes" vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(withNotes("note here")), vals) + + res, err := instAction.RunWithContext(context.Background(), buildChart(withNotes("note here")), vals) if err != nil { t.Fatalf("Failed install: %s", err) } @@ -179,7 +181,8 @@ func TestInstallRelease_WithNotesRendered(t *testing.T) { instAction := installAction(t) instAction.ReleaseName = "with-notes" vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(withNotes("got-{{.Release.Name}}")), vals) + + res, err := instAction.RunWithContext(context.Background(), buildChart(withNotes("got-{{.Release.Name}}")), vals) if err != nil { t.Fatalf("Failed install: %s", err) } @@ -198,7 +201,8 @@ func TestInstallRelease_WithChartAndDependencyParentNotes(t *testing.T) { instAction := installAction(t) instAction.ReleaseName = "with-notes" vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(withNotes("parent"), withDependency(withNotes("child"))), vals) + + res, err := instAction.RunWithContext(context.Background(), buildChart(withNotes("parent"), withDependency(withNotes("child"))), vals) if err != nil { t.Fatalf("Failed install: %s", err) } @@ -216,8 +220,9 @@ func TestInstallRelease_WithChartAndDependencyAllNotes(t *testing.T) { instAction := installAction(t) instAction.ReleaseName = "with-notes" instAction.SubNotes = true + vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(withNotes("parent"), withDependency(withNotes("child"))), vals) + res, err := instAction.RunWithContext(context.Background(), buildChart(withNotes("parent"), withDependency(withNotes("child"))), vals) if err != nil { t.Fatalf("Failed install: %s", err) } @@ -237,7 +242,7 @@ func TestInstallRelease_DryRun(t *testing.T) { instAction := installAction(t) instAction.DryRun = true vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(withSampleTemplates()), vals) + res, err := instAction.RunWithContext(context.Background(), buildChart(withSampleTemplates()), vals) if err != nil { t.Fatalf("Failed install: %s", err) } @@ -268,7 +273,7 @@ func TestInstallRelease_DryRun_Lookup(t *testing.T) { Data: []byte(`goodbye: {{ lookup "v1" "Namespace" "" "___" }}`), }) - res, err := instAction.Run(mockChart, vals) + res, err := instAction.RunWithContext(context.Background(), mockChart, vals) if err != nil { t.Fatalf("Failed install: %s", err) } @@ -281,7 +286,7 @@ func TestInstallReleaseIncorrectTemplate_DryRun(t *testing.T) { instAction := installAction(t) instAction.DryRun = true vals := map[string]interface{}{} - _, err := instAction.Run(buildChart(withSampleIncludingIncorrectTemplates()), vals) + _, err := instAction.RunWithContext(context.Background(), buildChart(withSampleIncludingIncorrectTemplates()), vals) expectedErr := "\"hello/templates/incorrect\" at <.Values.bad.doh>: nil pointer evaluating interface {}.doh" if err == nil { t.Fatalf("Install should fail containing error: %s", expectedErr) @@ -299,7 +304,7 @@ func TestInstallRelease_NoHooks(t *testing.T) { instAction.cfg.Releases.Create(releaseStub()) vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(), vals) + res, err := instAction.RunWithContext(context.Background(), buildChart(), vals) if err != nil { t.Fatalf("Failed install: %s", err) } @@ -316,7 +321,7 @@ func TestInstallRelease_FailedHooks(t *testing.T) { instAction.cfg.KubeClient = failer vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(), vals) + res, err := instAction.RunWithContext(context.Background(), buildChart(), vals) is.Error(err) is.Contains(res.Info.Description, "failed post-install") is.Equal(release.StatusFailed, res.Info.Status) @@ -333,7 +338,8 @@ func TestInstallRelease_ReplaceRelease(t *testing.T) { instAction.ReleaseName = rel.Name vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(), vals) + + res, err := instAction.RunWithContext(context.Background(), buildChart(), vals) is.NoError(err) // This should have been auto-incremented @@ -349,13 +355,15 @@ func TestInstallRelease_KubeVersion(t *testing.T) { is := assert.New(t) instAction := installAction(t) vals := map[string]interface{}{} - _, err := instAction.Run(buildChart(withKube(">=0.0.0")), vals) + + _, err := instAction.RunWithContext(context.Background(), buildChart(withKube(">=0.0.0")), vals) is.NoError(err) // This should fail for a few hundred years instAction.ReleaseName = "should-fail" vals = map[string]interface{}{} - _, err = instAction.Run(buildChart(withKube(">=99.0.0")), vals) + + _, err = instAction.RunWithContext(context.Background(), buildChart(withKube(">=99.0.0")), vals) is.Error(err) is.Contains(err.Error(), "chart requires kubeVersion") } @@ -372,7 +380,7 @@ func TestInstallRelease_Wait(t *testing.T) { goroutines := runtime.NumGoroutine() - res, err := instAction.Run(buildChart(), vals) + res, err := instAction.RunWithContext(context.Background(), buildChart(), vals) is.Error(err) is.Contains(res.Info.Description, "I timed out") is.Equal(res.Info.Status, release.StatusFailed) @@ -384,25 +392,15 @@ func TestInstallRelease_Wait_Interrupted(t *testing.T) { instAction := installAction(t) instAction.ReleaseName = "interrupted-release" failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) - failer.WaitDuration = 10 * time.Second + failer.WaitError = context.Canceled instAction.cfg.KubeClient = failer instAction.Wait = true vals := map[string]interface{}{} - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - time.AfterFunc(time.Second, cancel) - - goroutines := runtime.NumGoroutine() - - res, err := instAction.RunWithContext(ctx, buildChart(), vals) + res, err := instAction.RunWithContext(context.Background(), buildChart(), vals) is.Error(err) is.Contains(res.Info.Description, "Release \"interrupted-release\" failed: context canceled") is.Equal(res.Info.Status, release.StatusFailed) - - is.Equal(goroutines+1, runtime.NumGoroutine()) // installation goroutine still is in background - time.Sleep(10 * time.Second) // wait for goroutine to finish - is.Equal(goroutines, runtime.NumGoroutine()) } func TestInstallRelease_WaitForJobs(t *testing.T) { is := assert.New(t) @@ -415,7 +413,7 @@ func TestInstallRelease_WaitForJobs(t *testing.T) { instAction.WaitForJobs = true vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(), vals) + res, err := instAction.RunWithContext(context.Background(), buildChart(), vals) is.Error(err) is.Contains(res.Info.Description, "I timed out") is.Equal(res.Info.Status, release.StatusFailed) @@ -431,9 +429,10 @@ func TestInstallRelease_Atomic(t *testing.T) { failer.WaitError = fmt.Errorf("I timed out") instAction.cfg.KubeClient = failer instAction.Atomic = true + vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(), vals) + res, err := instAction.RunWithContext(context.Background(), buildChart(), vals) is.Error(err) is.Contains(err.Error(), "I timed out") is.Contains(err.Error(), "atomic") @@ -454,7 +453,7 @@ func TestInstallRelease_Atomic(t *testing.T) { instAction.Atomic = true vals := map[string]interface{}{} - _, err := instAction.Run(buildChart(), vals) + _, err := instAction.RunWithContext(context.Background(), buildChart(), vals) is.Error(err) is.Contains(err.Error(), "I timed out") is.Contains(err.Error(), "uninstall fail") @@ -467,16 +466,12 @@ func TestInstallRelease_Atomic_Interrupted(t *testing.T) { instAction := installAction(t) instAction.ReleaseName = "interrupted-release" failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) - failer.WaitDuration = 10 * time.Second + failer.WaitError = context.Canceled instAction.cfg.KubeClient = failer instAction.Atomic = true vals := map[string]interface{}{} - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - time.AfterFunc(time.Second, cancel) - - res, err := instAction.RunWithContext(ctx, buildChart(), vals) + res, err := instAction.RunWithContext(context.Background(), buildChart(), vals) is.Error(err) is.Contains(err.Error(), "context canceled") is.Contains(err.Error(), "atomic") @@ -566,7 +561,7 @@ func TestInstallReleaseOutputDir(t *testing.T) { instAction.OutputDir = dir - _, err := instAction.Run(buildChart(withSampleTemplates(), withMultipleManifestTemplate()), vals) + _, err := instAction.RunWithContext(context.Background(), buildChart(withSampleTemplates(), withMultipleManifestTemplate()), vals) if err != nil { t.Fatalf("Failed install: %s", err) } @@ -602,7 +597,7 @@ func TestInstallOutputDirWithReleaseName(t *testing.T) { newDir := filepath.Join(dir, instAction.ReleaseName) - _, err := instAction.Run(buildChart(withSampleTemplates(), withMultipleManifestTemplate()), vals) + _, err := instAction.RunWithContext(context.Background(), buildChart(withSampleTemplates(), withMultipleManifestTemplate()), vals) if err != nil { t.Fatalf("Failed install: %s", err) } diff --git a/pkg/action/release_testing.go b/pkg/action/release_testing.go index 3c10cecf8..9a91e522f 100644 --- a/pkg/action/release_testing.go +++ b/pkg/action/release_testing.go @@ -54,8 +54,7 @@ func NewReleaseTesting(cfg *Configuration) *ReleaseTesting { } } -// Run executes 'helm test' against the given release. -func (r *ReleaseTesting) Run(name string) (*release.Release, error) { +func (r *ReleaseTesting) RunWithContext(ctx context.Context, name string) (*release.Release, error) { if err := r.cfg.KubeClient.IsReachable(); err != nil { return nil, err } @@ -94,7 +93,7 @@ func (r *ReleaseTesting) Run(name string) (*release.Release, error) { rel.Hooks = executingHooks } - if err := r.cfg.execHook(rel, release.HookTest, r.Timeout); err != nil { + if err := r.cfg.execHook(r.Timeout, rel, release.HookTest); err != nil { rel.Hooks = append(skippedHooks, rel.Hooks...) r.cfg.Releases.Update(rel) return rel, err @@ -104,6 +103,11 @@ func (r *ReleaseTesting) Run(name string) (*release.Release, error) { return rel, r.cfg.Releases.Update(rel) } +// Run executes 'helm test' against the given release. +func (r *ReleaseTesting) Run(name string) (*release.Release, error) { + return r.RunWithContext(context.TODO(), name) +} + // GetPodLogs will write the logs for all test pods in the given release into // the given writer. These can be immediately output to the user or captured for // other uses diff --git a/pkg/action/rollback.go b/pkg/action/rollback.go index b0be17d13..7ca1a905b 100644 --- a/pkg/action/rollback.go +++ b/pkg/action/rollback.go @@ -18,6 +18,7 @@ package action import ( "bytes" + "context" "fmt" "strings" "time" @@ -54,8 +55,7 @@ func NewRollback(cfg *Configuration) *Rollback { } } -// Run executes 'helm rollback' against the given release. -func (r *Rollback) Run(name string) error { +func (r *Rollback) RunWithContext(ctx context.Context, name string) error { if err := r.cfg.KubeClient.IsReachable(); err != nil { return err } @@ -76,7 +76,7 @@ func (r *Rollback) Run(name string) error { } r.cfg.Log("performing rollback of %s", name) - if _, err := r.performRollback(currentRelease, targetRelease); err != nil { + if _, err := r.performRollback(ctx, currentRelease, targetRelease); err != nil { return err } @@ -89,6 +89,11 @@ func (r *Rollback) Run(name string) error { return nil } +// Run executes 'helm rollback' against the given release. +func (r *Rollback) Run(name string) error { + return r.RunWithContext(context.TODO(), name) +} + // prepareRollback finds the previous release and prepares a new release object with // the previous release's configuration func (r *Rollback) prepareRollback(name string) (*release.Release, *release.Release, error) { @@ -159,7 +164,7 @@ func (r *Rollback) prepareRollback(name string) (*release.Release, *release.Rele return currentRelease, targetRelease, nil } -func (r *Rollback) performRollback(currentRelease, targetRelease *release.Release) (*release.Release, error) { +func (r *Rollback) performRollback(ctx context.Context, currentRelease, targetRelease *release.Release) (*release.Release, error) { if r.DryRun { r.cfg.Log("dry run for %s", targetRelease.Name) return targetRelease, nil @@ -176,7 +181,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas // pre-rollback hooks if !r.DisableHooks { - if err := r.cfg.execHook(targetRelease, release.HookPreRollback, r.Timeout); err != nil { + if err := r.cfg.execHook(r.Timeout, targetRelease, release.HookPreRollback); err != nil { return targetRelease, err } } else { @@ -243,7 +248,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas // post-rollback hooks if !r.DisableHooks { - if err := r.cfg.execHook(targetRelease, release.HookPostRollback, r.Timeout); err != nil { + if err := r.cfg.execHook(r.Timeout, targetRelease, release.HookPostRollback); err != nil { return targetRelease, err } } diff --git a/pkg/action/uninstall.go b/pkg/action/uninstall.go index 40d82243e..3282f25a9 100644 --- a/pkg/action/uninstall.go +++ b/pkg/action/uninstall.go @@ -17,6 +17,7 @@ limitations under the License. package action import ( + "context" "strings" "time" @@ -56,6 +57,13 @@ func NewUninstall(cfg *Configuration) *Uninstall { // Run uninstalls the given release. func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error) { + ctx, cancel := context.WithTimeout(context.TODO(), u.Timeout) + defer cancel() + + return u.RunWithContext(ctx, name) +} + +func (u *Uninstall) RunWithContext(ctx context.Context, name string) (*release.UninstallReleaseResponse, error) { if err := u.cfg.KubeClient.IsReachable(); err != nil { return nil, err } @@ -106,7 +114,7 @@ func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error) res := &release.UninstallReleaseResponse{Release: rel} if !u.DisableHooks { - if err := u.cfg.execHook(rel, release.HookPreDelete, u.Timeout); err != nil { + if err := u.cfg.execHook(u.Timeout, rel, release.HookPreDelete); err != nil { return res, err } } else { @@ -131,15 +139,25 @@ func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error) res.Info = kept if u.Wait { - if kubeClient, ok := u.cfg.KubeClient.(kube.InterfaceExt); ok { - if err := kubeClient.WaitForDelete(deletedResources, u.Timeout); err != nil { - errs = append(errs, err) - } + var err error + // Helm 4 TODO: WaitForDelete should be replaced with it's context + // aware counterpart. + switch kubeClient := u.cfg.KubeClient.(type) { + case kube.ContextInterface: + err = kubeClient.WaitForDeleteWithContext(ctx, deletedResources) + case kube.InterfaceExt: + err = kubeClient.WaitForDelete(deletedResources, u.Timeout) + default: + u.cfg.Log("WARNING: KubeClient does not satisfy ContextInterface, or InterfaceExt") + } + + if err != nil { + errs = append(errs, err) } } if !u.DisableHooks { - if err := u.cfg.execHook(rel, release.HookPostDelete, u.Timeout); err != nil { + if err := u.cfg.execHook(u.Timeout, rel, release.HookPostDelete); err != nil { errs = append(errs, err) } } diff --git a/pkg/action/uninstall_test.go b/pkg/action/uninstall_test.go index 869ffb8c7..7ab5dd05f 100644 --- a/pkg/action/uninstall_test.go +++ b/pkg/action/uninstall_test.go @@ -17,6 +17,7 @@ limitations under the License. package action import ( + "context" "fmt" "testing" @@ -68,7 +69,10 @@ func TestUninstallRelease_deleteRelease(t *testing.T) { } }` unAction.cfg.Releases.Create(rel) - res, err := unAction.Run(rel.Name) + + ctx := context.Background() + + res, err := unAction.RunWithContext(ctx, rel.Name) is.NoError(err) expected := `These resources were kept due to the resource policy: [Secret] secret @@ -101,7 +105,10 @@ func TestUninstallRelease_Wait(t *testing.T) { failer := unAction.cfg.KubeClient.(*kubefake.FailingKubeClient) failer.WaitError = fmt.Errorf("U timed out") unAction.cfg.KubeClient = failer - res, err := unAction.Run(rel.Name) + + ctx := context.Background() + + res, err := unAction.RunWithContext(ctx, rel.Name) is.Error(err) is.Contains(err.Error(), "U timed out") is.Equal(res.Release.Info.Status, release.StatusUninstalled) @@ -134,7 +141,10 @@ func TestUninstallRelease_Cascade(t *testing.T) { failer.DeleteWithPropagationError = fmt.Errorf("Uninstall with cascade failed") failer.BuildDummy = true unAction.cfg.KubeClient = failer - _, err := unAction.Run(rel.Name) + + ctx := context.Background() + + _, err := unAction.RunWithContext(ctx, rel.Name) is.Error(err) is.Contains(err.Error(), "failed to delete release: come-fail-away") } diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index ffb7538a6..911299376 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -134,7 +134,9 @@ func (u *Upgrade) SetRegistryClient(client *registry.Client) { // Run executes the upgrade on the given release. func (u *Upgrade) Run(name string, chart *chart.Chart, vals map[string]interface{}) (*release.Release, error) { - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.TODO(), u.Timeout) + defer cancel() + return u.RunWithContext(ctx, name, chart, vals) } @@ -357,51 +359,16 @@ func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedR if err := u.cfg.Releases.Create(upgradedRelease); err != nil { return nil, err } - rChan := make(chan resultMessage) - ctxChan := make(chan resultMessage) - doneChan := make(chan interface{}) - defer close(doneChan) - go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease) - go u.handleContext(ctx, doneChan, ctxChan, upgradedRelease) - select { - case result := <-rChan: - return result.r, result.e - case result := <-ctxChan: - return result.r, result.e - } -} -// Function used to lock the Mutex, this is important for the case when the atomic flag is set. -// In that case the upgrade will finish before the rollback is finished so it is necessary to wait for the rollback to finish. -// The rollback will be trigger by the function failRelease -func (u *Upgrade) reportToPerformUpgrade(c chan<- resultMessage, rel *release.Release, created kube.ResourceList, err error) { - u.Lock.Lock() - if err != nil { - rel, err = u.failRelease(rel, created, err) - } - c <- resultMessage{r: rel, e: err} - u.Lock.Unlock() + return u.releasingUpgrade(ctx, upgradedRelease, current, target, originalRelease) } -// Setup listener for SIGINT and SIGTERM -func (u *Upgrade) handleContext(ctx context.Context, done chan interface{}, c chan<- resultMessage, upgradedRelease *release.Release) { - select { - case <-ctx.Done(): - err := ctx.Err() - - // when the atomic flag is set the ongoing release finish first and doesn't give time for the rollback happens. - u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, err) - case <-done: - return - } -} -func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) { +func (u *Upgrade) releasingUpgrade(ctx context.Context, upgradedRelease *release.Release, current, target kube.ResourceList, originalRelease *release.Release) (*release.Release, error) { // pre-upgrade hooks if !u.DisableHooks { - if err := u.cfg.execHook(upgradedRelease, release.HookPreUpgrade, u.Timeout); err != nil { - u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %s", err)) - return + if err := u.cfg.execHook(u.Timeout, upgradedRelease, release.HookPreUpgrade); err != nil { + return u.failRelease(upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %w", err)) } } else { u.cfg.Log("upgrade hooks disabled for %s", upgradedRelease.Name) @@ -410,8 +377,7 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele results, err := u.cfg.KubeClient.Update(current, target, u.Force) if err != nil { u.cfg.recordRelease(originalRelease) - u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) - return + return u.failRelease(upgradedRelease, results.Created, err) } if u.Recreate { @@ -428,26 +394,35 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele u.cfg.Log( "waiting for release %s resources (created: %d updated: %d deleted: %d)", upgradedRelease.Name, len(results.Created), len(results.Updated), len(results.Deleted)) - if u.WaitForJobs { - if err := u.cfg.KubeClient.WaitWithJobs(target, u.Timeout); err != nil { - u.cfg.recordRelease(originalRelease) - u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) - return - } - } else { - if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil { - u.cfg.recordRelease(originalRelease) - u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) - return - } + var err error + + ctx, cancel := context.WithTimeout(ctx, u.Timeout) + defer cancel() + + kubeClient, ok := u.cfg.KubeClient.(kube.ContextInterface) + // Helm 4 TODO: WaitWithJobs and Wait should be replaced with their context + // aware counterparts. + switch { + case ok && u.WaitForJobs: + err = kubeClient.WaitWithJobsContext(ctx, target) + case ok && !u.WaitForJobs: + err = kubeClient.WaitWithContext(ctx, target) + case u.WaitForJobs: + err = u.cfg.KubeClient.WaitWithJobs(target, u.Timeout) + case !u.WaitForJobs: + err = u.cfg.KubeClient.Wait(target, u.Timeout) + } + + if err != nil { + u.cfg.recordRelease(originalRelease) + return u.failRelease(upgradedRelease, results.Created, err) } } // post-upgrade hooks if !u.DisableHooks { - if err := u.cfg.execHook(upgradedRelease, release.HookPostUpgrade, u.Timeout); err != nil { - u.reportToPerformUpgrade(c, upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %s", err)) - return + if err := u.cfg.execHook(u.Timeout, upgradedRelease, release.HookPostUpgrade); err != nil { + return u.failRelease(upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %w", err)) } } @@ -460,7 +435,8 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele } else { upgradedRelease.Info.Description = "Upgrade complete" } - u.reportToPerformUpgrade(c, upgradedRelease, nil, nil) + + return upgradedRelease, nil } func (u *Upgrade) failRelease(rel *release.Release, created kube.ResourceList, err error) (*release.Release, error) { diff --git a/pkg/action/upgrade_test.go b/pkg/action/upgrade_test.go index e259605ce..f8f5dd231 100644 --- a/pkg/action/upgrade_test.go +++ b/pkg/action/upgrade_test.go @@ -56,6 +56,7 @@ func TestUpgradeRelease_Success(t *testing.T) { vals := map[string]interface{}{} ctx, done := context.WithCancel(context.Background()) + res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) done() req.NoError(err) @@ -85,7 +86,9 @@ func TestUpgradeRelease_Wait(t *testing.T) { upAction.Wait = true vals := map[string]interface{}{} - res, err := upAction.Run(rel.Name, buildChart(), vals) + ctx := context.Background() + + res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) req.Error(err) is.Contains(res.Info.Description, "I timed out") is.Equal(res.Info.Status, release.StatusFailed) @@ -108,7 +111,9 @@ func TestUpgradeRelease_WaitForJobs(t *testing.T) { upAction.WaitForJobs = true vals := map[string]interface{}{} - res, err := upAction.Run(rel.Name, buildChart(), vals) + ctx := context.Background() + + res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) req.Error(err) is.Contains(res.Info.Description, "I timed out") is.Equal(res.Info.Status, release.StatusFailed) @@ -132,7 +137,9 @@ func TestUpgradeRelease_CleanupOnFail(t *testing.T) { upAction.CleanupOnFail = true vals := map[string]interface{}{} - res, err := upAction.Run(rel.Name, buildChart(), vals) + ctx := context.Background() + + res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) req.Error(err) is.NotContains(err.Error(), "unable to cleanup resources") is.Contains(res.Info.Description, "I timed out") @@ -158,7 +165,9 @@ func TestUpgradeRelease_Atomic(t *testing.T) { upAction.Atomic = true vals := map[string]interface{}{} - res, err := upAction.Run(rel.Name, buildChart(), vals) + ctx := context.Background() + + res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) req.Error(err) is.Contains(err.Error(), "arming key removed") is.Contains(err.Error(), "atomic") @@ -183,7 +192,9 @@ func TestUpgradeRelease_Atomic(t *testing.T) { upAction.Atomic = true vals := map[string]interface{}{} - _, err := upAction.Run(rel.Name, buildChart(), vals) + ctx := context.Background() + + _, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) req.Error(err) is.Contains(err.Error(), "update fail") is.Contains(err.Error(), "an error occurred while rolling back the release") @@ -222,8 +233,11 @@ func TestUpgradeRelease_ReuseValues(t *testing.T) { is.NoError(err) upAction.ReuseValues = true + + ctx := context.Background() + // setting newValues and upgrading - res, err := upAction.Run(rel.Name, buildChart(), newValues) + res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), newValues) is.NoError(err) // Now make sure it is actually upgraded @@ -284,8 +298,11 @@ func TestUpgradeRelease_ReuseValues(t *testing.T) { withDependency(withName("subchart")), withMetadataDependency(dependency), ) + + ctx := context.Background() + // reusing values and upgrading - res, err := upAction.Run(rel.Name, sampleChartWithSubChart, map[string]interface{}{}) + res, err := upAction.RunWithContext(ctx, rel.Name, sampleChartWithSubChart, map[string]interface{}{}) is.NoError(err) // Now get the upgraded release @@ -377,7 +394,9 @@ func TestUpgradeRelease_Pending(t *testing.T) { vals := map[string]interface{}{} - _, err := upAction.Run(rel.Name, buildChart(), vals) + ctx := context.Background() + + _, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) req.Contains(err.Error(), "progress", err) } @@ -393,16 +412,12 @@ func TestUpgradeRelease_Interrupted_Wait(t *testing.T) { upAction.cfg.Releases.Create(rel) failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) - failer.WaitDuration = 10 * time.Second + failer.WaitError = context.Canceled upAction.cfg.KubeClient = failer upAction.Wait = true vals := map[string]interface{}{} - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - time.AfterFunc(time.Second, cancel) - - res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) + res, err := upAction.RunWithContext(context.Background(), rel.Name, buildChart(), vals) req.Error(err) is.Contains(res.Info.Description, "Upgrade \"interrupted-release\" failed: context canceled") @@ -422,16 +437,17 @@ func TestUpgradeRelease_Interrupted_Atomic(t *testing.T) { upAction.cfg.Releases.Create(rel) failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) - failer.WaitDuration = 5 * time.Second + failer.WaitError = context.Canceled + failer.WaitDuration = 2 * time.Second upAction.cfg.KubeClient = failer upAction.Atomic = true vals := map[string]interface{}{} - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - time.AfterFunc(time.Second, cancel) + // After the first Wait error, error needs to be set nil + // so atomic cleanup passes. + time.AfterFunc(failer.WaitDuration, func() { failer.WaitError = nil }) - res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) + res, err := upAction.RunWithContext(context.Background(), rel.Name, buildChart(), vals) req.Error(err) is.Contains(err.Error(), "release interrupted-release failed, and has been rolled back due to atomic being set: context canceled") diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 0772678d1..77e428e0a 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -283,41 +283,67 @@ func getResource(info *resource.Info) (runtime.Object, error) { // Wait waits up to the given timeout for the specified resources to be ready. func (c *Client) Wait(resources ResourceList, timeout time.Duration) error { + // Helm 4 TODO: remove decarator around WaitWithContext. + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + + return c.WaitWithContext(ctx, resources) +} + +// WaitWithContext waits up till ctx timeout for the specified resources to be ready. +func (c *Client) WaitWithContext(ctx context.Context, resources ResourceList) error { cs, err := c.getKubeClient() if err != nil { return err } checker := NewReadyChecker(cs, c.Log, PausedAsReady(true)) w := waiter{ - c: checker, - log: c.Log, - timeout: timeout, + c: checker, + log: c.Log, } - return w.waitForResources(resources) + return w.waitForResources(ctx, resources) } // WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs. func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) error { + // Helm 4 TODO: remove decarator around WaitWithJobsContext. + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + + return c.WaitWithJobsContext(ctx, resources) +} + +// WaitWithJobsContext wait up to the given ctx timeout for the specified resources to be ready, including jobs. +func (c *Client) WaitWithJobsContext(ctx context.Context, resources ResourceList) error { cs, err := c.getKubeClient() if err != nil { return err } checker := NewReadyChecker(cs, c.Log, PausedAsReady(true), CheckJobs(true)) w := waiter{ - c: checker, - log: c.Log, - timeout: timeout, + c: checker, + log: c.Log, } - return w.waitForResources(resources) + + return w.waitForResources(ctx, resources) } // WaitForDelete wait up to the given timeout for the specified resources to be deleted. func (c *Client) WaitForDelete(resources ResourceList, timeout time.Duration) error { + // Helm 4 TODO: remove decarator around WaitWithJobsContext. + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + + return c.WaitForDeleteWithContext(ctx, resources) +} + +// WaitForDelete wait up to the given timeout for the specified resources to be deleted. +func (c *Client) WaitForDeleteWithContext(ctx context.Context, resources ResourceList) error { w := waiter{ - log: c.Log, - timeout: timeout, + log: c.Log, } - return w.waitForDeletedResources(resources) + + return w.waitForDeletedResources(ctx, resources) } func (c *Client) namespace() string { @@ -512,9 +538,9 @@ func delete(c *Client, resources ResourceList, propagation metav1.DeletionPropag return res, nil } -func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error { +func (c *Client) watchTimeout(ctx context.Context) func(*resource.Info) error { return func(info *resource.Info) error { - return c.watchUntilReady(t, info) + return c.watchUntilReady(ctx, info) } } @@ -533,9 +559,18 @@ func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error { // // Handling for other kinds will be added as necessary. func (c *Client) WatchUntilReady(resources ResourceList, timeout time.Duration) error { + // Helm 4 TODO: remove decarator around WatchUntilReadyWithContext. + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + // For jobs, there's also the option to do poll c.Jobs(namespace).Get(): // https://github.com/adamreese/kubernetes/blob/master/test/e2e/job.go#L291-L300 - return perform(resources, c.watchTimeout(timeout)) + return c.WatchUntilReadyWithContext(ctx, resources) +} + +// WatchUntilReadyWithContext -. +func (c *Client) WatchUntilReadyWithContext(ctx context.Context, resources ResourceList) error { + return perform(resources, c.watchTimeout(ctx)) } func perform(infos ResourceList, fn func(*resource.Info) error) error { @@ -600,6 +635,7 @@ func createResource(info *resource.Info) error { if err != nil { return err } + return info.Refresh(obj, true) } @@ -701,7 +737,7 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, return nil } -func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error { +func (c *Client) watchUntilReady(ctx context.Context, info *resource.Info) error { kind := info.Mapping.GroupVersionKind.Kind switch kind { case "Job", "Pod": @@ -709,7 +745,7 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err return nil } - c.Log("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeout) + c.Log("Watching for changes to %s %s with timeout of %q", kind, info.Name, timeFromCtx(ctx)) // Use a selector on the name of the resource. This should be unique for the // given version and kind @@ -725,8 +761,6 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err // In the future, we might want to add some special logic for types // like Ingress, Volume, etc. - ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) - defer cancel() _, err = watchtools.UntilWithSync(ctx, lw, &unstructured.Unstructured{}, nil, func(e watch.Event) (bool, error) { // Make sure the incoming object is versioned as we use unstructured // objects when we build manifests @@ -817,6 +851,35 @@ func scrubValidationError(err error) error { return err } +func (c *Client) WaitAndGetCompletedPodPhaseWithContext(ctx context.Context, name string) (v1.PodPhase, error) { + client, err := c.getKubeClient() + if err != nil { + return v1.PodUnknown, err + } + + watcher, err := client.CoreV1().Pods(c.namespace()).Watch(ctx, metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", name), + }) + if err != nil { + return v1.PodUnknown, err + } + + for event := range watcher.ResultChan() { + p, ok := event.Object.(*v1.Pod) + if !ok { + return v1.PodUnknown, fmt.Errorf("%s not a pod", name) + } + switch p.Status.Phase { + case v1.PodFailed: + return v1.PodFailed, nil + case v1.PodSucceeded: + return v1.PodSucceeded, nil + } + } + + return v1.PodUnknown, err +} + // WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase // and returns said phase (PodSucceeded or PodFailed qualify). func (c *Client) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) { diff --git a/pkg/kube/fake/fake.go b/pkg/kube/fake/fake.go index 267020d57..7addb364a 100644 --- a/pkg/kube/fake/fake.go +++ b/pkg/kube/fake/fake.go @@ -18,6 +18,7 @@ limitations under the License. package fake import ( + "context" "io" "time" @@ -74,6 +75,16 @@ func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration) e return f.PrintingKubeClient.Wait(resources, d) } +// Waits the amount of time defined on f.WaitDuration, then returns the configured error if set or prints. +func (f *FailingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList) error { + time.Sleep(f.WaitDuration) + if f.WaitError != nil { + return f.WaitError + } + + return f.PrintingKubeClient.WaitWithContext(ctx, resources) +} + // WaitWithJobs returns the configured error if set or prints func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Duration) error { if f.WaitError != nil { @@ -82,6 +93,14 @@ func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Dur return f.PrintingKubeClient.WaitWithJobs(resources, d) } +// WaitWithJobs returns the configured error if set or prints +func (f *FailingKubeClient) WaitWithJobsContext(ctx context.Context, resources kube.ResourceList) error { + if f.WaitError != nil { + return f.WaitError + } + return f.PrintingKubeClient.WaitWithJobsContext(ctx, resources) +} + // WaitForDelete returns the configured error if set or prints func (f *FailingKubeClient) WaitForDelete(resources kube.ResourceList, d time.Duration) error { if f.WaitError != nil { @@ -90,6 +109,14 @@ func (f *FailingKubeClient) WaitForDelete(resources kube.ResourceList, d time.Du return f.PrintingKubeClient.WaitForDelete(resources, d) } +// WaitForDeleteWithContext returns the configured error if set or prints +func (f *FailingKubeClient) WaitForDeleteWithContext(ctx context.Context, resources kube.ResourceList) error { + if f.WaitError != nil { + return f.WaitError + } + return f.PrintingKubeClient.WaitForDeleteWithContext(ctx, resources) +} + // Delete returns the configured error if set or prints func (f *FailingKubeClient) Delete(resources kube.ResourceList) (*kube.Result, []error) { if f.DeleteError != nil { @@ -141,6 +168,15 @@ func (f *FailingKubeClient) WaitAndGetCompletedPodPhase(s string, d time.Duratio return f.PrintingKubeClient.WaitAndGetCompletedPodPhase(s, d) } +// WaitAndGetCompletedPodPhaseWithContext returns the configured error if set or prints +func (f *FailingKubeClient) WaitAndGetCompletedPodPhaseWithContext(ctx context.Context, s string) (v1.PodPhase, error) { + if f.WaitAndGetCompletedPodPhaseError != nil { + return v1.PodSucceeded, f.WaitAndGetCompletedPodPhaseError + } + + return f.PrintingKubeClient.WaitAndGetCompletedPodPhaseWithContext(ctx, s) +} + // DeleteWithPropagationPolicy returns the configured error if set or prints func (f *FailingKubeClient) DeleteWithPropagationPolicy(resources kube.ResourceList, policy metav1.DeletionPropagation) (*kube.Result, []error) { if f.DeleteWithPropagationError != nil { @@ -149,6 +185,13 @@ func (f *FailingKubeClient) DeleteWithPropagationPolicy(resources kube.ResourceL return f.PrintingKubeClient.DeleteWithPropagationPolicy(resources, policy) } +func (f *FailingKubeClient) WatchUntilReadyWithContext(ctx context.Context, resources kube.ResourceList) error { + if f.WatchUntilReadyError != nil { + return f.WatchUntilReadyError + } + return f.PrintingKubeClient.WatchUntilReady(resources, 0) +} + func createDummyResourceList() kube.ResourceList { var resInfo resource.Info resInfo.Name = "dummyName" @@ -158,3 +201,10 @@ func createDummyResourceList() kube.ResourceList { return resourceList } + +// compile time check that FailingKubeClient satiesfies our interfaces. +var ( + _ kube.Interface = &FailingKubeClient{} + _ kube.ContextInterface = &FailingKubeClient{} + _ kube.InterfaceExt = &FailingKubeClient{} +) diff --git a/pkg/kube/fake/printer.go b/pkg/kube/fake/printer.go index e6c4b6207..b101a4f64 100644 --- a/pkg/kube/fake/printer.go +++ b/pkg/kube/fake/printer.go @@ -17,6 +17,7 @@ limitations under the License. package fake import ( + "context" "io" "strings" "time" @@ -62,16 +63,31 @@ func (p *PrintingKubeClient) Wait(resources kube.ResourceList, _ time.Duration) return err } +func (p *PrintingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList) error { + _, err := io.Copy(p.Out, bufferize(resources)) + return err +} + func (p *PrintingKubeClient) WaitWithJobs(resources kube.ResourceList, _ time.Duration) error { _, err := io.Copy(p.Out, bufferize(resources)) return err } +func (p *PrintingKubeClient) WaitWithJobsContext(ctx context.Context, resources kube.ResourceList) error { + _, err := io.Copy(p.Out, bufferize(resources)) + return err +} + func (p *PrintingKubeClient) WaitForDelete(resources kube.ResourceList, _ time.Duration) error { _, err := io.Copy(p.Out, bufferize(resources)) return err } +func (p *PrintingKubeClient) WaitForDeleteWithContext(ctx context.Context, resources kube.ResourceList) error { + _, err := io.Copy(p.Out, bufferize(resources)) + return err +} + // Delete implements KubeClient delete. // // It only prints out the content to be deleted. @@ -89,6 +105,12 @@ func (p *PrintingKubeClient) WatchUntilReady(resources kube.ResourceList, _ time return err } +// WatchUntilReadyWithContext implements KubeClient WatchUntilReadyWithContext. +func (p *PrintingKubeClient) WatchUntilReadyWithContext(ctx context.Context, resources kube.ResourceList) error { + _, err := io.Copy(p.Out, bufferize(resources)) + return err +} + // Update implements KubeClient Update. func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ bool) (*kube.Result, error) { _, err := io.Copy(p.Out, bufferize(modified)) @@ -116,6 +138,11 @@ func (p *PrintingKubeClient) WaitAndGetCompletedPodPhase(_ string, _ time.Durati return v1.PodSucceeded, nil } +// WaitAndGetCompletedPodPhaseWithContext implements KubeClient WaitAndGetCompletedPodPhaseWithContext. +func (p *PrintingKubeClient) WaitAndGetCompletedPodPhaseWithContext(_ context.Context, _ string) (v1.PodPhase, error) { + return v1.PodSucceeded, nil +} + // DeleteWithPropagationPolicy implements KubeClient delete. // // It only prints out the content to be deleted. @@ -134,3 +161,10 @@ func bufferize(resources kube.ResourceList) io.Reader { } return strings.NewReader(builder.String()) } + +// compile time check that PrintingKubeClient satiesfies our interfaces. +var ( + _ kube.Interface = &PrintingKubeClient{} + _ kube.ContextInterface = &PrintingKubeClient{} + _ kube.InterfaceExt = &PrintingKubeClient{} +) diff --git a/pkg/kube/interface.go b/pkg/kube/interface.go index ce42ed950..1c844dd1f 100644 --- a/pkg/kube/interface.go +++ b/pkg/kube/interface.go @@ -17,6 +17,7 @@ limitations under the License. package kube import ( + "context" "io" "time" @@ -72,6 +73,31 @@ type Interface interface { IsReachable() error } +// ContextInterface is introduced to avoid breaking backwards compatability for Interface implementers. +// +// TODO Helm 4: Replace Interface methods that accept a time.Duration as an argument, with a context. +type ContextInterface interface { + // WaitWithContext waits till a ctx timeout for the specified resources to be ready. + WaitWithContext(ctx context.Context, resources ResourceList) error + // WaitWithJobsContext waits till a ctx timeout for the specified resources to be ready, including jobs. + WaitWithJobsContext(ctx context.Context, resources ResourceList) error + // WatchUntilReadyWithContext watches the resources given and waits until it is ready. + // + // This method is mainly for hook implementations. It watches for a resource to + // hit a particular milestone. The milestone depends on the Kind. + // + // For Jobs, "ready" means the Job ran to completion (exited without error). + // For Pods, "ready" means the Pod phase is marked "succeeded". + // For all other kinds, it means the kind was created or modified without + // error. + WatchUntilReadyWithContext(context.Context, ResourceList) error + // WaitAndGetCompletedPodPhaseWithContext waits up to a timeout until a pod enters a completed phase + // and returns said phase (PodSucceeded or PodFailed qualify). + WaitAndGetCompletedPodPhaseWithContext(context.Context, string) (v1.PodPhase, error) + // WaitForDeleteWithContext waits till a ctx timeout for the specified resources to be deleted. + WaitForDeleteWithContext(context.Context, ResourceList) error +} + // InterfaceExt is introduced to avoid breaking backwards compatibility for Interface implementers. // // TODO Helm 4: Remove InterfaceExt and integrate its method(s) into the Interface. @@ -112,5 +138,6 @@ type InterfaceResources interface { var _ Interface = (*Client)(nil) var _ InterfaceExt = (*Client)(nil) +var _ ContextInterface = (*Client)(nil) var _ InterfaceDeletionPropagation = (*Client)(nil) var _ InterfaceResources = (*Client)(nil) diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index ecdd38940..6d353719c 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -37,18 +37,26 @@ import ( ) type waiter struct { - c ReadyChecker - timeout time.Duration - log func(string, ...interface{}) + c ReadyChecker + log func(string, ...interface{}) +} + +// timeFromCtx extracts time until deadline of ctx. +func timeFromCtx(ctx context.Context) string { + deadline, ok := ctx.Deadline() + // No deadline means context won't timeout + if !ok { + return "none" + } + + return time.Until(deadline).String() } // waitForResources polls to get the current status of all pods, PVCs, Services and // Jobs(optional) until all are ready or a timeout is reached -func (w *waiter) waitForResources(created ResourceList) error { - w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) +func (w *waiter) waitForResources(ctx context.Context, created ResourceList) error { - ctx, cancel := context.WithTimeout(context.Background(), w.timeout) - defer cancel() + w.log("beginning wait for %d resources with timeout of %q", len(created), timeFromCtx(ctx)) return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { for _, v := range created { @@ -62,11 +70,8 @@ func (w *waiter) waitForResources(created ResourceList) error { } // waitForDeletedResources polls to check if all the resources are deleted or a timeout is reached -func (w *waiter) waitForDeletedResources(deleted ResourceList) error { - w.log("beginning wait for %d resources to be deleted with timeout of %v", len(deleted), w.timeout) - - ctx, cancel := context.WithTimeout(context.Background(), w.timeout) - defer cancel() +func (w *waiter) waitForDeletedResources(ctx context.Context, deleted ResourceList) error { + w.log("beginning wait for %d resources to be deleted with timeout of %q", len(deleted), timeFromCtx(ctx)) return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { for _, v := range deleted {