From 9e62789ddea35039f8f69406df37c3683a673f13 Mon Sep 17 00:00:00 2001 From: Josh Rowley Date: Sat, 27 May 2023 16:24:32 -0700 Subject: [PATCH 1/4] initial plumbing off time.Duration to context.Context Signed-off-by: Josh Rowley --- pkg/action/hooks.go | 11 ++-- pkg/action/install.go | 72 +++++++++---------------- pkg/action/install_test.go | 1 + pkg/action/release_testing.go | 13 +++-- pkg/action/rollback.go | 20 ++++--- pkg/action/uninstall.go | 16 ++++-- pkg/action/upgrade.go | 64 +++++----------------- pkg/kube/client.go | 99 ++++++++++++++++++++++++++++------- pkg/kube/fake/fake.go | 49 +++++++++++++++++ pkg/kube/fake/printer.go | 34 ++++++++++++ pkg/kube/interface.go | 15 ++++++ pkg/kube/wait.go | 25 ++++----- 12 files changed, 275 insertions(+), 144 deletions(-) diff --git a/pkg/action/hooks.go b/pkg/action/hooks.go index 40c1ffdb6..2490613c4 100644 --- a/pkg/action/hooks.go +++ b/pkg/action/hooks.go @@ -17,17 +17,18 @@ package action import ( "bytes" + "context" "sort" - "time" "github.com/pkg/errors" + "helm.sh/helm/v3/pkg/kube" "helm.sh/helm/v3/pkg/release" helmtime "helm.sh/helm/v3/pkg/time" ) // execHook executes all of the hooks for the given hook event. -func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent, timeout time.Duration) error { +func (cfg *Configuration) execHook(ctx context.Context, rl *release.Release, hook release.HookEvent) error { executingHooks := []*release.Hook{} for _, h := range rl.Hooks { @@ -79,8 +80,12 @@ func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent, return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path) } + ctxInterface, ok := cfg.KubeClient.(kube.ContextInterface) + if !ok { + panic("KubeClient does not satisfies kube.ContextInterface") + } // Watch hook resources until they have completed - err = cfg.KubeClient.WatchUntilReady(resources, timeout) + err = ctxInterface.WatchUntilReadyWithContext(ctx, resources) // Note the time of success/failure h.LastRun.CompletedAt = helmtime.Now() // Mark hook as succeeded or failed diff --git a/pkg/action/install.go b/pkg/action/install.go index d5c34cef7..2283bb9ef 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -206,7 +206,9 @@ func (i *Install) installCRDs(crds []chart.CRD) error { // If DryRun is set to true, this will prepare the release, but not install it func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.Release, error) { - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.TODO(), i.Timeout) + defer cancel() + return i.RunWithContext(ctx, chrt, vals) } @@ -360,6 +362,10 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma } } + // if err := ctx.Err(); err != nil { + // return rel, err + // } + // Store the release in history before continuing (new in Helm 3). We always know // that this is a create operation. if err := i.cfg.Releases.Create(rel); err != nil { @@ -368,27 +374,16 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma // not working. return rel, err } - rChan := make(chan resultMessage) - ctxChan := make(chan resultMessage) - doneChan := make(chan struct{}) - defer close(doneChan) - go i.performInstall(rChan, rel, toBeAdopted, resources) - go i.handleContext(ctx, ctxChan, doneChan, rel) - select { - case result := <-rChan: - return result.r, result.e - case result := <-ctxChan: - return result.r, result.e - } + + return i.performInstall(ctx, rel, toBeAdopted, resources) } -func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) { +func (i *Install) performInstall(ctx context.Context, rel *release.Release, toBeAdopted, resources kube.ResourceList) (*release.Release, error) { // pre-install hooks if !i.DisableHooks { - if err := i.cfg.execHook(rel, release.HookPreInstall, i.Timeout); err != nil { - i.reportToRun(c, rel, fmt.Errorf("failed pre-install: %s", err)) - return + if err := i.cfg.execHook(ctx, rel, release.HookPreInstall); err != nil { + return i.failRelease(rel, fmt.Errorf("failed pre-install: %w", err)) } } @@ -397,34 +392,31 @@ func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, t // to true, since that is basically an upgrade operation. if len(toBeAdopted) == 0 && len(resources) > 0 { if _, err := i.cfg.KubeClient.Create(resources); err != nil { - i.reportToRun(c, rel, err) - return + return i.failRelease(rel, err) } } else if len(resources) > 0 { if _, err := i.cfg.KubeClient.Update(toBeAdopted, resources, i.Force); err != nil { - i.reportToRun(c, rel, err) - return + return i.failRelease(rel, err) } } if i.Wait { + kubeClient := i.cfg.KubeClient.(kube.ContextInterface) + if i.WaitForJobs { - if err := i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout); err != nil { - i.reportToRun(c, rel, err) - return + if err := kubeClient.WaitWithJobsContext(ctx, resources); err != nil { + return i.failRelease(rel, err) } } else { - if err := i.cfg.KubeClient.Wait(resources, i.Timeout); err != nil { - i.reportToRun(c, rel, err) - return + if err := kubeClient.WaitWithContext(ctx, resources); err != nil { + return i.failRelease(rel, err) } } } if !i.DisableHooks { - if err := i.cfg.execHook(rel, release.HookPostInstall, i.Timeout); err != nil { - i.reportToRun(c, rel, fmt.Errorf("failed post-install: %s", err)) - return + if err := i.cfg.execHook(ctx, rel, release.HookPostInstall); err != nil { + return i.failRelease(rel, fmt.Errorf("failed post-install: %w", err)) } } @@ -445,25 +437,9 @@ func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, t i.cfg.Log("failed to record the release: %s", err) } - i.reportToRun(c, rel, nil) -} -func (i *Install) handleContext(ctx context.Context, c chan<- resultMessage, done chan struct{}, rel *release.Release) { - select { - case <-ctx.Done(): - err := ctx.Err() - i.reportToRun(c, rel, err) - case <-done: - return - } -} -func (i *Install) reportToRun(c chan<- resultMessage, rel *release.Release, err error) { - i.Lock.Lock() - if err != nil { - rel, err = i.failRelease(rel, err) - } - c <- resultMessage{r: rel, e: err} - i.Lock.Unlock() + return rel, nil } + func (i *Install) failRelease(rel *release.Release, err error) (*release.Release, error) { rel.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", i.ReleaseName, err.Error())) if i.Atomic { diff --git a/pkg/action/install_test.go b/pkg/action/install_test.go index dd0cdb54d..078608803 100644 --- a/pkg/action/install_test.go +++ b/pkg/action/install_test.go @@ -387,6 +387,7 @@ func TestInstallRelease_Wait_Interrupted(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) time.AfterFunc(time.Second, cancel) + //cancel() res, err := instAction.RunWithContext(ctx, buildChart(), vals) is.Error(err) diff --git a/pkg/action/release_testing.go b/pkg/action/release_testing.go index ecaeaf59f..2dc4e7a41 100644 --- a/pkg/action/release_testing.go +++ b/pkg/action/release_testing.go @@ -48,8 +48,7 @@ func NewReleaseTesting(cfg *Configuration) *ReleaseTesting { } } -// Run executes 'helm test' against the given release. -func (r *ReleaseTesting) Run(name string) (*release.Release, error) { +func (r *ReleaseTesting) RunWithContext(ctx context.Context, name string) (*release.Release, error) { if err := r.cfg.KubeClient.IsReachable(); err != nil { return nil, err } @@ -88,7 +87,7 @@ func (r *ReleaseTesting) Run(name string) (*release.Release, error) { rel.Hooks = executingHooks } - if err := r.cfg.execHook(rel, release.HookTest, r.Timeout); err != nil { + if err := r.cfg.execHook(ctx, rel, release.HookTest); err != nil { rel.Hooks = append(skippedHooks, rel.Hooks...) r.cfg.Releases.Update(rel) return rel, err @@ -98,6 +97,14 @@ func (r *ReleaseTesting) Run(name string) (*release.Release, error) { return rel, r.cfg.Releases.Update(rel) } +// Run executes 'helm test' against the given release. +func (r *ReleaseTesting) Run(name string) (*release.Release, error) { + ctx, cancel := context.WithTimeout(context.TODO(), r.Timeout) + defer cancel() + + return r.RunWithContext(ctx, name) +} + // GetPodLogs will write the logs for all test pods in the given release into // the given writer. These can be immediately output to the user or captured for // other uses diff --git a/pkg/action/rollback.go b/pkg/action/rollback.go index dda8c700b..989e771f1 100644 --- a/pkg/action/rollback.go +++ b/pkg/action/rollback.go @@ -18,6 +18,7 @@ package action import ( "bytes" + "context" "fmt" "strings" "time" @@ -54,8 +55,7 @@ func NewRollback(cfg *Configuration) *Rollback { } } -// Run executes 'helm rollback' against the given release. -func (r *Rollback) Run(name string) error { +func (r *Rollback) RunWithContext(ctx context.Context, name string) error { if err := r.cfg.KubeClient.IsReachable(); err != nil { return err } @@ -76,7 +76,7 @@ func (r *Rollback) Run(name string) error { } r.cfg.Log("performing rollback of %s", name) - if _, err := r.performRollback(currentRelease, targetRelease); err != nil { + if _, err := r.performRollback(ctx, currentRelease, targetRelease); err != nil { return err } @@ -89,6 +89,14 @@ func (r *Rollback) Run(name string) error { return nil } +// Run executes 'helm rollback' against the given release. +func (r *Rollback) Run(name string) error { + ctx, cancel := context.WithTimeout(context.TODO(), r.Timeout) + defer cancel() + + return r.RunWithContext(ctx, name) +} + // prepareRollback finds the previous release and prepares a new release object with // the previous release's configuration func (r *Rollback) prepareRollback(name string) (*release.Release, *release.Release, error) { @@ -140,7 +148,7 @@ func (r *Rollback) prepareRollback(name string) (*release.Release, *release.Rele return currentRelease, targetRelease, nil } -func (r *Rollback) performRollback(currentRelease, targetRelease *release.Release) (*release.Release, error) { +func (r *Rollback) performRollback(ctx context.Context, currentRelease, targetRelease *release.Release) (*release.Release, error) { if r.DryRun { r.cfg.Log("dry run for %s", targetRelease.Name) return targetRelease, nil @@ -157,7 +165,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas // pre-rollback hooks if !r.DisableHooks { - if err := r.cfg.execHook(targetRelease, release.HookPreRollback, r.Timeout); err != nil { + if err := r.cfg.execHook(ctx, targetRelease, release.HookPreRollback); err != nil { return targetRelease, err } } else { @@ -224,7 +232,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas // post-rollback hooks if !r.DisableHooks { - if err := r.cfg.execHook(targetRelease, release.HookPostRollback, r.Timeout); err != nil { + if err := r.cfg.execHook(ctx, targetRelease, release.HookPostRollback); err != nil { return targetRelease, err } } diff --git a/pkg/action/uninstall.go b/pkg/action/uninstall.go index 801498544..9d2078237 100644 --- a/pkg/action/uninstall.go +++ b/pkg/action/uninstall.go @@ -17,6 +17,7 @@ limitations under the License. package action import ( + "context" "strings" "time" @@ -54,6 +55,13 @@ func NewUninstall(cfg *Configuration) *Uninstall { // Run uninstalls the given release. func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error) { + ctx, cancel := context.WithTimeout(context.TODO(), u.Timeout) + defer cancel() + + return u.RunWithContext(ctx, name) +} + +func (u *Uninstall) RunWithContext(ctx context.Context, name string) (*release.UninstallReleaseResponse, error) { if err := u.cfg.KubeClient.IsReachable(); err != nil { return nil, err } @@ -101,7 +109,7 @@ func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error) res := &release.UninstallReleaseResponse{Release: rel} if !u.DisableHooks { - if err := u.cfg.execHook(rel, release.HookPreDelete, u.Timeout); err != nil { + if err := u.cfg.execHook(ctx, rel, release.HookPreDelete); err != nil { return res, err } } else { @@ -126,15 +134,15 @@ func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error) res.Info = kept if u.Wait { - if kubeClient, ok := u.cfg.KubeClient.(kube.InterfaceExt); ok { - if err := kubeClient.WaitForDelete(deletedResources, u.Timeout); err != nil { + if kubeClient, ok := u.cfg.KubeClient.(kube.ContextInterface); ok { + if err := kubeClient.WaitForDeleteWithContext(ctx, deletedResources); err != nil { errs = append(errs, err) } } } if !u.DisableHooks { - if err := u.cfg.execHook(rel, release.HookPostDelete, u.Timeout); err != nil { + if err := u.cfg.execHook(ctx, rel, release.HookPostDelete); err != nil { errs = append(errs, err) } } diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index 829be51df..975c5f7e7 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -130,7 +130,9 @@ func (u *Upgrade) SetRegistryClient(client *registry.Client) { // Run executes the upgrade on the given release. func (u *Upgrade) Run(name string, chart *chart.Chart, vals map[string]interface{}) (*release.Release, error) { - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.TODO(), u.Timeout) + defer cancel() + return u.RunWithContext(ctx, name, chart, vals) } @@ -331,51 +333,16 @@ func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedR if err := u.cfg.Releases.Create(upgradedRelease); err != nil { return nil, err } - rChan := make(chan resultMessage) - ctxChan := make(chan resultMessage) - doneChan := make(chan interface{}) - defer close(doneChan) - go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease) - go u.handleContext(ctx, doneChan, ctxChan, upgradedRelease) - select { - case result := <-rChan: - return result.r, result.e - case result := <-ctxChan: - return result.r, result.e - } -} -// Function used to lock the Mutex, this is important for the case when the atomic flag is set. -// In that case the upgrade will finish before the rollback is finished so it is necessary to wait for the rollback to finish. -// The rollback will be trigger by the function failRelease -func (u *Upgrade) reportToPerformUpgrade(c chan<- resultMessage, rel *release.Release, created kube.ResourceList, err error) { - u.Lock.Lock() - if err != nil { - rel, err = u.failRelease(rel, created, err) - } - c <- resultMessage{r: rel, e: err} - u.Lock.Unlock() + return u.releasingUpgrade(ctx, upgradedRelease, current, target, originalRelease) } -// Setup listener for SIGINT and SIGTERM -func (u *Upgrade) handleContext(ctx context.Context, done chan interface{}, c chan<- resultMessage, upgradedRelease *release.Release) { - select { - case <-ctx.Done(): - err := ctx.Err() - - // when the atomic flag is set the ongoing release finish first and doesn't give time for the rollback happens. - u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, err) - case <-done: - return - } -} -func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) { +func (u *Upgrade) releasingUpgrade(ctx context.Context, upgradedRelease *release.Release, current, target kube.ResourceList, originalRelease *release.Release) (*release.Release, error) { // pre-upgrade hooks if !u.DisableHooks { - if err := u.cfg.execHook(upgradedRelease, release.HookPreUpgrade, u.Timeout); err != nil { - u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %s", err)) - return + if err := u.cfg.execHook(ctx, upgradedRelease, release.HookPreUpgrade); err != nil { + return u.failRelease(upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %w", err)) } } else { u.cfg.Log("upgrade hooks disabled for %s", upgradedRelease.Name) @@ -384,8 +351,7 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele results, err := u.cfg.KubeClient.Update(current, target, u.Force) if err != nil { u.cfg.recordRelease(originalRelease) - u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) - return + return u.failRelease(upgradedRelease, results.Created, err) } if u.Recreate { @@ -405,23 +371,20 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele if u.WaitForJobs { if err := u.cfg.KubeClient.WaitWithJobs(target, u.Timeout); err != nil { u.cfg.recordRelease(originalRelease) - u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) - return + return u.failRelease(upgradedRelease, results.Created, err) } } else { if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil { u.cfg.recordRelease(originalRelease) - u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) - return + return u.failRelease(upgradedRelease, results.Created, err) } } } // post-upgrade hooks if !u.DisableHooks { - if err := u.cfg.execHook(upgradedRelease, release.HookPostUpgrade, u.Timeout); err != nil { - u.reportToPerformUpgrade(c, upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %s", err)) - return + if err := u.cfg.execHook(ctx, upgradedRelease, release.HookPostUpgrade); err != nil { + return u.failRelease(upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %w", err)) } } @@ -434,7 +397,8 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele } else { upgradedRelease.Info.Description = "Upgrade complete" } - u.reportToPerformUpgrade(c, upgradedRelease, nil, nil) + + return upgradedRelease, nil } func (u *Upgrade) failRelease(rel *release.Release, created kube.ResourceList, err error) (*release.Release, error) { diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 7b3c803f9..63a28b6fa 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -275,41 +275,67 @@ func getResource(info *resource.Info) (runtime.Object, error) { // Wait waits up to the given timeout for the specified resources to be ready. func (c *Client) Wait(resources ResourceList, timeout time.Duration) error { + // Helm 4 TODO: remove decarator around WaitWithContext. + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + + return c.WaitWithContext(ctx, resources) +} + +// WaitWithContext waits up till ctx timeout for the specified resources to be ready. +func (c *Client) WaitWithContext(ctx context.Context, resources ResourceList) error { cs, err := c.getKubeClient() if err != nil { return err } checker := NewReadyChecker(cs, c.Log, PausedAsReady(true)) w := waiter{ - c: checker, - log: c.Log, - timeout: timeout, + c: checker, + log: c.Log, } - return w.waitForResources(resources) + return w.waitForResources(ctx, resources) } // WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs. func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) error { + // Helm 4 TODO: remove decarator around WaitWithJobsContext. + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + + return c.WaitWithJobsContext(ctx, resources) +} + +// WaitWithJobsContext wait up to the given ctx timeout for the specified resources to be ready, including jobs. +func (c *Client) WaitWithJobsContext(ctx context.Context, resources ResourceList) error { cs, err := c.getKubeClient() if err != nil { return err } checker := NewReadyChecker(cs, c.Log, PausedAsReady(true), CheckJobs(true)) w := waiter{ - c: checker, - log: c.Log, - timeout: timeout, + c: checker, + log: c.Log, } - return w.waitForResources(resources) + + return w.waitForResources(ctx, resources) } // WaitForDelete wait up to the given timeout for the specified resources to be deleted. func (c *Client) WaitForDelete(resources ResourceList, timeout time.Duration) error { + // Helm 4 TODO: remove decarator around WaitWithJobsContext. + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + + return c.WaitForDeleteWithContext(ctx, resources) +} + +// WaitForDelete wait up to the given timeout for the specified resources to be deleted. +func (c *Client) WaitForDeleteWithContext(ctx context.Context, resources ResourceList) error { w := waiter{ - log: c.Log, - timeout: timeout, + log: c.Log, } - return w.waitForDeletedResources(resources) + + return w.waitForDeletedResources(ctx, resources) } func (c *Client) namespace() string { @@ -506,9 +532,9 @@ func delete(c *Client, resources ResourceList, propagation metav1.DeletionPropag return res, nil } -func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error { +func (c *Client) watchTimeout(ctx context.Context) func(*resource.Info) error { return func(info *resource.Info) error { - return c.watchUntilReady(t, info) + return c.watchUntilReady(ctx, info) } } @@ -527,9 +553,18 @@ func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error { // // Handling for other kinds will be added as necessary. func (c *Client) WatchUntilReady(resources ResourceList, timeout time.Duration) error { + // Helm 4 TODO: remove decarator around WatchUntilReadyWithContext. + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + // For jobs, there's also the option to do poll c.Jobs(namespace).Get(): // https://github.com/adamreese/kubernetes/blob/master/test/e2e/job.go#L291-L300 - return perform(resources, c.watchTimeout(timeout)) + return c.WatchUntilReadyWithContext(ctx, resources) +} + +// WatchUntilReadyWithContext -. +func (c *Client) WatchUntilReadyWithContext(ctx context.Context, resources ResourceList) error { + return perform(resources, c.watchTimeout(ctx)) } func perform(infos ResourceList, fn func(*resource.Info) error) error { @@ -594,6 +629,7 @@ func createResource(info *resource.Info) error { if err != nil { return err } + return info.Refresh(obj, true) } @@ -695,7 +731,7 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, return nil } -func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error { +func (c *Client) watchUntilReady(ctx context.Context, info *resource.Info) error { kind := info.Mapping.GroupVersionKind.Kind switch kind { case "Job", "Pod": @@ -703,7 +739,7 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err return nil } - c.Log("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeout) + c.Log("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeFromCtx(ctx)) // Use a selector on the name of the resource. This should be unique for the // given version and kind @@ -719,8 +755,6 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err // In the future, we might want to add some special logic for types // like Ingress, Volume, etc. - ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) - defer cancel() _, err = watchtools.UntilWithSync(ctx, lw, &unstructured.Unstructured{}, nil, func(e watch.Event) (bool, error) { // Make sure the incoming object is versioned as we use unstructured // objects when we build manifests @@ -811,6 +845,35 @@ func scrubValidationError(err error) error { return err } +func (c *Client) WaitAndGetCompletedPodPhaseWithContext(ctx context.Context, name string) (v1.PodPhase, error) { + client, err := c.getKubeClient() + if err != nil { + return v1.PodUnknown, err + } + + watcher, err := client.CoreV1().Pods(c.namespace()).Watch(ctx, metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", name), + }) + if err != nil { + return v1.PodUnknown, err + } + + for event := range watcher.ResultChan() { + p, ok := event.Object.(*v1.Pod) + if !ok { + return v1.PodUnknown, fmt.Errorf("%s not a pod", name) + } + switch p.Status.Phase { + case v1.PodFailed: + return v1.PodFailed, nil + case v1.PodSucceeded: + return v1.PodSucceeded, nil + } + } + + return v1.PodUnknown, err +} + // WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase // and returns said phase (PodSucceeded or PodFailed qualify). func (c *Client) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) { diff --git a/pkg/kube/fake/fake.go b/pkg/kube/fake/fake.go index 267020d57..820518ed9 100644 --- a/pkg/kube/fake/fake.go +++ b/pkg/kube/fake/fake.go @@ -18,6 +18,7 @@ limitations under the License. package fake import ( + "context" "io" "time" @@ -74,6 +75,15 @@ func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration) e return f.PrintingKubeClient.Wait(resources, d) } +// Waits the amount of time defined on f.WaitDuration, then returns the configured error if set or prints. +func (f *FailingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList) error { + time.Sleep(f.WaitDuration) + if f.WaitError != nil { + return f.WaitError + } + return f.PrintingKubeClient.Wait(resources, 0) +} + // WaitWithJobs returns the configured error if set or prints func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Duration) error { if f.WaitError != nil { @@ -82,6 +92,14 @@ func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Dur return f.PrintingKubeClient.WaitWithJobs(resources, d) } +// WaitWithJobs returns the configured error if set or prints +func (f *FailingKubeClient) WaitWithJobsContext(ctx context.Context, resources kube.ResourceList) error { + if f.WaitError != nil { + return f.WaitError + } + return f.PrintingKubeClient.WaitWithJobsContext(ctx, resources) +} + // WaitForDelete returns the configured error if set or prints func (f *FailingKubeClient) WaitForDelete(resources kube.ResourceList, d time.Duration) error { if f.WaitError != nil { @@ -90,6 +108,14 @@ func (f *FailingKubeClient) WaitForDelete(resources kube.ResourceList, d time.Du return f.PrintingKubeClient.WaitForDelete(resources, d) } +// WaitForDeleteWithContext returns the configured error if set or prints +func (f *FailingKubeClient) WaitForDeleteWithContext(ctx context.Context, resources kube.ResourceList) error { + if f.WaitError != nil { + return f.WaitError + } + return f.PrintingKubeClient.WaitForDelete(resources, 0) +} + // Delete returns the configured error if set or prints func (f *FailingKubeClient) Delete(resources kube.ResourceList) (*kube.Result, []error) { if f.DeleteError != nil { @@ -141,6 +167,14 @@ func (f *FailingKubeClient) WaitAndGetCompletedPodPhase(s string, d time.Duratio return f.PrintingKubeClient.WaitAndGetCompletedPodPhase(s, d) } +// WaitAndGetCompletedPodPhaseWithContext returns the configured error if set or prints +func (f *FailingKubeClient) WaitAndGetCompletedPodPhaseWithContext(ctx context.Context, s string) (v1.PodPhase, error) { + if f.WaitAndGetCompletedPodPhaseError != nil { + return v1.PodSucceeded, f.WaitAndGetCompletedPodPhaseError + } + return f.PrintingKubeClient.WaitAndGetCompletedPodPhase(s, 0) +} + // DeleteWithPropagationPolicy returns the configured error if set or prints func (f *FailingKubeClient) DeleteWithPropagationPolicy(resources kube.ResourceList, policy metav1.DeletionPropagation) (*kube.Result, []error) { if f.DeleteWithPropagationError != nil { @@ -149,6 +183,14 @@ func (f *FailingKubeClient) DeleteWithPropagationPolicy(resources kube.ResourceL return f.PrintingKubeClient.DeleteWithPropagationPolicy(resources, policy) } +func (f *FailingKubeClient) WatchUntilReadyWithContext(ctx context.Context, resources kube.ResourceList) error { + if f.WatchUntilReadyError != nil { + return f.WatchUntilReadyError + } + + return f.PrintingKubeClient.WatchUntilReady(resources, 0) +} + func createDummyResourceList() kube.ResourceList { var resInfo resource.Info resInfo.Name = "dummyName" @@ -158,3 +200,10 @@ func createDummyResourceList() kube.ResourceList { return resourceList } + +// compile time check that FailingKubeClient satiesfies our interfaces. +var ( + _ kube.Interface = &FailingKubeClient{} + _ kube.ContextInterface = &FailingKubeClient{} + _ kube.InterfaceExt = &FailingKubeClient{} +) diff --git a/pkg/kube/fake/printer.go b/pkg/kube/fake/printer.go index e6c4b6207..b101a4f64 100644 --- a/pkg/kube/fake/printer.go +++ b/pkg/kube/fake/printer.go @@ -17,6 +17,7 @@ limitations under the License. package fake import ( + "context" "io" "strings" "time" @@ -62,16 +63,31 @@ func (p *PrintingKubeClient) Wait(resources kube.ResourceList, _ time.Duration) return err } +func (p *PrintingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList) error { + _, err := io.Copy(p.Out, bufferize(resources)) + return err +} + func (p *PrintingKubeClient) WaitWithJobs(resources kube.ResourceList, _ time.Duration) error { _, err := io.Copy(p.Out, bufferize(resources)) return err } +func (p *PrintingKubeClient) WaitWithJobsContext(ctx context.Context, resources kube.ResourceList) error { + _, err := io.Copy(p.Out, bufferize(resources)) + return err +} + func (p *PrintingKubeClient) WaitForDelete(resources kube.ResourceList, _ time.Duration) error { _, err := io.Copy(p.Out, bufferize(resources)) return err } +func (p *PrintingKubeClient) WaitForDeleteWithContext(ctx context.Context, resources kube.ResourceList) error { + _, err := io.Copy(p.Out, bufferize(resources)) + return err +} + // Delete implements KubeClient delete. // // It only prints out the content to be deleted. @@ -89,6 +105,12 @@ func (p *PrintingKubeClient) WatchUntilReady(resources kube.ResourceList, _ time return err } +// WatchUntilReadyWithContext implements KubeClient WatchUntilReadyWithContext. +func (p *PrintingKubeClient) WatchUntilReadyWithContext(ctx context.Context, resources kube.ResourceList) error { + _, err := io.Copy(p.Out, bufferize(resources)) + return err +} + // Update implements KubeClient Update. func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ bool) (*kube.Result, error) { _, err := io.Copy(p.Out, bufferize(modified)) @@ -116,6 +138,11 @@ func (p *PrintingKubeClient) WaitAndGetCompletedPodPhase(_ string, _ time.Durati return v1.PodSucceeded, nil } +// WaitAndGetCompletedPodPhaseWithContext implements KubeClient WaitAndGetCompletedPodPhaseWithContext. +func (p *PrintingKubeClient) WaitAndGetCompletedPodPhaseWithContext(_ context.Context, _ string) (v1.PodPhase, error) { + return v1.PodSucceeded, nil +} + // DeleteWithPropagationPolicy implements KubeClient delete. // // It only prints out the content to be deleted. @@ -134,3 +161,10 @@ func bufferize(resources kube.ResourceList) io.Reader { } return strings.NewReader(builder.String()) } + +// compile time check that PrintingKubeClient satiesfies our interfaces. +var ( + _ kube.Interface = &PrintingKubeClient{} + _ kube.ContextInterface = &PrintingKubeClient{} + _ kube.InterfaceExt = &PrintingKubeClient{} +) diff --git a/pkg/kube/interface.go b/pkg/kube/interface.go index ce42ed950..35ce5a271 100644 --- a/pkg/kube/interface.go +++ b/pkg/kube/interface.go @@ -17,6 +17,7 @@ limitations under the License. package kube import ( + "context" "io" "time" @@ -72,6 +73,19 @@ type Interface interface { IsReachable() error } +// ContextInterface is introduced to avoid breaking backwards compatability for Interface implementers. +// +// TODO Helm 4: Replace Interface methods that accept a time.Duration as an argument, with a context. +type ContextInterface interface { + // WaitWithContext waits till a ctx timeout for the specified resources to be ready. + WaitWithContext(ctx context.Context, resources ResourceList) error + // WaitWithJobsContext wait up to the given ctx timeout for the specified resources to be ready, including jobs. + WaitWithJobsContext(ctx context.Context, resources ResourceList) error + WatchUntilReadyWithContext(context.Context, ResourceList) error + WaitAndGetCompletedPodPhaseWithContext(context.Context, string) (v1.PodPhase, error) + WaitForDeleteWithContext(context.Context, ResourceList) error +} + // InterfaceExt is introduced to avoid breaking backwards compatibility for Interface implementers. // // TODO Helm 4: Remove InterfaceExt and integrate its method(s) into the Interface. @@ -112,5 +126,6 @@ type InterfaceResources interface { var _ Interface = (*Client)(nil) var _ InterfaceExt = (*Client)(nil) +var _ ContextInterface = (*Client)(nil) var _ InterfaceDeletionPropagation = (*Client)(nil) var _ InterfaceResources = (*Client)(nil) diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index ecdd38940..03268f59a 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -37,18 +37,22 @@ import ( ) type waiter struct { - c ReadyChecker - timeout time.Duration - log func(string, ...interface{}) + c ReadyChecker + log func(string, ...interface{}) +} + +// timeFromCtx extracts time until deadline of ctx. +func timeFromCtx(ctx context.Context) time.Duration { + deadline, _ := ctx.Deadline() + + return time.Until(deadline) } // waitForResources polls to get the current status of all pods, PVCs, Services and // Jobs(optional) until all are ready or a timeout is reached -func (w *waiter) waitForResources(created ResourceList) error { - w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) +func (w *waiter) waitForResources(ctx context.Context, created ResourceList) error { - ctx, cancel := context.WithTimeout(context.Background(), w.timeout) - defer cancel() + w.log("beginning wait for %d resources with timeout of %v", len(created), timeFromCtx(ctx)) return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { for _, v := range created { @@ -62,11 +66,8 @@ func (w *waiter) waitForResources(created ResourceList) error { } // waitForDeletedResources polls to check if all the resources are deleted or a timeout is reached -func (w *waiter) waitForDeletedResources(deleted ResourceList) error { - w.log("beginning wait for %d resources to be deleted with timeout of %v", len(deleted), w.timeout) - - ctx, cancel := context.WithTimeout(context.Background(), w.timeout) - defer cancel() +func (w *waiter) waitForDeletedResources(ctx context.Context, deleted ResourceList) error { + w.log("beginning wait for %d resources to be deleted with timeout of %v", len(deleted), timeFromCtx(ctx)) return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { for _, v := range deleted { From e3d85101e713b408553c4d76178711b92fcb3c61 Mon Sep 17 00:00:00 2001 From: Josh Rowley Date: Sat, 27 May 2023 18:32:29 -0700 Subject: [PATCH 2/4] fixed uts; changed Run method calls in uts to RunWithContext Signed-off-by: Josh Rowley --- pkg/action/hooks.go | 13 ++++---- pkg/action/install.go | 27 +++++++++-------- pkg/action/install_test.go | 58 ++++++++++++++++++++---------------- pkg/action/uninstall_test.go | 16 ++++++++-- pkg/action/upgrade.go | 7 +++-- pkg/action/upgrade_test.go | 41 +++++++++++++++++-------- pkg/kube/fake/fake.go | 27 ++++++++++++++--- pkg/kube/interface.go | 14 ++++++++- 8 files changed, 136 insertions(+), 67 deletions(-) diff --git a/pkg/action/hooks.go b/pkg/action/hooks.go index 2490613c4..a050b466a 100644 --- a/pkg/action/hooks.go +++ b/pkg/action/hooks.go @@ -80,12 +80,15 @@ func (cfg *Configuration) execHook(ctx context.Context, rl *release.Release, hoo return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path) } - ctxInterface, ok := cfg.KubeClient.(kube.ContextInterface) - if !ok { - panic("KubeClient does not satisfies kube.ContextInterface") + // Check if kube.Interface implementation satisfies kube.ContextInterface interface. + // If it doesn't log a warning and move on, nothing we can do. + if kubeClient, ok := cfg.KubeClient.(kube.ContextInterface); ok { + // Watch hook resources until they have completed + err = kubeClient.WatchUntilReadyWithContext(ctx, resources) + } else { + cfg.Log("WARNING: kube.ContextInterface not satisfied") } - // Watch hook resources until they have completed - err = ctxInterface.WatchUntilReadyWithContext(ctx, resources) + // Note the time of success/failure h.LastRun.CompletedAt = helmtime.Now() // Mark hook as succeeded or failed diff --git a/pkg/action/install.go b/pkg/action/install.go index 2283bb9ef..4e9b7a2b5 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -204,7 +204,6 @@ func (i *Install) installCRDs(crds []chart.CRD) error { // Run executes the installation // // If DryRun is set to true, this will prepare the release, but not install it - func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.Release, error) { ctx, cancel := context.WithTimeout(context.TODO(), i.Timeout) defer cancel() @@ -379,11 +378,10 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma } func (i *Install) performInstall(ctx context.Context, rel *release.Release, toBeAdopted, resources kube.ResourceList) (*release.Release, error) { - // pre-install hooks if !i.DisableHooks { if err := i.cfg.execHook(ctx, rel, release.HookPreInstall); err != nil { - return i.failRelease(rel, fmt.Errorf("failed pre-install: %w", err)) + return i.failRelease(ctx, rel, fmt.Errorf("failed pre-install: %w", err)) } } @@ -392,31 +390,31 @@ func (i *Install) performInstall(ctx context.Context, rel *release.Release, toBe // to true, since that is basically an upgrade operation. if len(toBeAdopted) == 0 && len(resources) > 0 { if _, err := i.cfg.KubeClient.Create(resources); err != nil { - return i.failRelease(rel, err) + return i.failRelease(ctx, rel, err) } } else if len(resources) > 0 { if _, err := i.cfg.KubeClient.Update(toBeAdopted, resources, i.Force); err != nil { - return i.failRelease(rel, err) + return i.failRelease(ctx, rel, err) } } - if i.Wait { - kubeClient := i.cfg.KubeClient.(kube.ContextInterface) - + // Check if kube.Interface implementation, also satisfies kube.ContextInterface. + kubeClient, ok := i.cfg.KubeClient.(kube.ContextInterface) + if i.Wait && ok { if i.WaitForJobs { if err := kubeClient.WaitWithJobsContext(ctx, resources); err != nil { - return i.failRelease(rel, err) + return i.failRelease(ctx, rel, err) } } else { if err := kubeClient.WaitWithContext(ctx, resources); err != nil { - return i.failRelease(rel, err) + return i.failRelease(ctx, rel, err) } } } if !i.DisableHooks { if err := i.cfg.execHook(ctx, rel, release.HookPostInstall); err != nil { - return i.failRelease(rel, fmt.Errorf("failed post-install: %w", err)) + return i.failRelease(ctx, rel, fmt.Errorf("failed post-install: %w", err)) } } @@ -440,15 +438,18 @@ func (i *Install) performInstall(ctx context.Context, rel *release.Release, toBe return rel, nil } -func (i *Install) failRelease(rel *release.Release, err error) (*release.Release, error) { +func (i *Install) failRelease(ctx context.Context, rel *release.Release, err error) (*release.Release, error) { rel.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", i.ReleaseName, err.Error())) if i.Atomic { i.cfg.Log("Install failed and atomic is set, uninstalling release") uninstall := NewUninstall(i.cfg) uninstall.DisableHooks = i.DisableHooks uninstall.KeepHistory = false + // TODO: Not sure if a new ctx should be created by the timeout field, + // because Timeout will be replaced by contexts. Should a background ctx be used + // so we don't timeout while uninstalling? uninstall.Timeout = i.Timeout - if _, uninstallErr := uninstall.Run(i.ReleaseName); uninstallErr != nil { + if _, uninstallErr := uninstall.RunWithContext(context.Background(), i.ReleaseName); uninstallErr != nil { return rel, errors.Wrapf(uninstallErr, "an error occurred while uninstalling the release. original install error: %s", err) } return rel, errors.Wrapf(err, "release %s failed, and has been uninstalled due to atomic being set", i.ReleaseName) diff --git a/pkg/action/install_test.go b/pkg/action/install_test.go index 078608803..ca48e6cde 100644 --- a/pkg/action/install_test.go +++ b/pkg/action/install_test.go @@ -103,7 +103,8 @@ func TestInstallReleaseWithValues(t *testing.T) { "simpleKey": "simpleValue", }, } - res, err := instAction.Run(buildChart(withSampleValues()), userVals) + + res, err := instAction.RunWithContext(context.Background(), buildChart(withSampleValues()), userVals) if err != nil { t.Fatalf("Failed install: %s", err) } @@ -129,7 +130,7 @@ func TestInstallReleaseClientOnly(t *testing.T) { is := assert.New(t) instAction := installAction(t) instAction.ClientOnly = true - instAction.Run(buildChart(), nil) // disregard output + instAction.RunWithContext(context.Background(), buildChart(), nil) // disregard output is.Equal(instAction.cfg.Capabilities, chartutil.DefaultCapabilities) is.Equal(instAction.cfg.KubeClient, &kubefake.PrintingKubeClient{Out: io.Discard}) @@ -139,7 +140,7 @@ func TestInstallRelease_NoName(t *testing.T) { instAction := installAction(t) instAction.ReleaseName = "" vals := map[string]interface{}{} - _, err := instAction.Run(buildChart(), vals) + _, err := instAction.RunWithContext(context.Background(), buildChart(), vals) if err == nil { t.Fatal("expected failure when no name is specified") } @@ -151,7 +152,8 @@ func TestInstallRelease_WithNotes(t *testing.T) { instAction := installAction(t) instAction.ReleaseName = "with-notes" vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(withNotes("note here")), vals) + + res, err := instAction.RunWithContext(context.Background(), buildChart(withNotes("note here")), vals) if err != nil { t.Fatalf("Failed install: %s", err) } @@ -178,7 +180,8 @@ func TestInstallRelease_WithNotesRendered(t *testing.T) { instAction := installAction(t) instAction.ReleaseName = "with-notes" vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(withNotes("got-{{.Release.Name}}")), vals) + + res, err := instAction.RunWithContext(context.Background(), buildChart(withNotes("got-{{.Release.Name}}")), vals) if err != nil { t.Fatalf("Failed install: %s", err) } @@ -197,7 +200,8 @@ func TestInstallRelease_WithChartAndDependencyParentNotes(t *testing.T) { instAction := installAction(t) instAction.ReleaseName = "with-notes" vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(withNotes("parent"), withDependency(withNotes("child"))), vals) + + res, err := instAction.RunWithContext(context.Background(), buildChart(withNotes("parent"), withDependency(withNotes("child"))), vals) if err != nil { t.Fatalf("Failed install: %s", err) } @@ -215,8 +219,9 @@ func TestInstallRelease_WithChartAndDependencyAllNotes(t *testing.T) { instAction := installAction(t) instAction.ReleaseName = "with-notes" instAction.SubNotes = true + vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(withNotes("parent"), withDependency(withNotes("child"))), vals) + res, err := instAction.RunWithContext(context.Background(), buildChart(withNotes("parent"), withDependency(withNotes("child"))), vals) if err != nil { t.Fatalf("Failed install: %s", err) } @@ -236,7 +241,7 @@ func TestInstallRelease_DryRun(t *testing.T) { instAction := installAction(t) instAction.DryRun = true vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(withSampleTemplates()), vals) + res, err := instAction.RunWithContext(context.Background(), buildChart(withSampleTemplates()), vals) if err != nil { t.Fatalf("Failed install: %s", err) } @@ -267,7 +272,7 @@ func TestInstallRelease_DryRun_Lookup(t *testing.T) { Data: []byte(`goodbye: {{ lookup "v1" "Namespace" "" "___" }}`), }) - res, err := instAction.Run(mockChart, vals) + res, err := instAction.RunWithContext(context.Background(), mockChart, vals) if err != nil { t.Fatalf("Failed install: %s", err) } @@ -280,7 +285,7 @@ func TestInstallReleaseIncorrectTemplate_DryRun(t *testing.T) { instAction := installAction(t) instAction.DryRun = true vals := map[string]interface{}{} - _, err := instAction.Run(buildChart(withSampleIncludingIncorrectTemplates()), vals) + _, err := instAction.RunWithContext(context.Background(), buildChart(withSampleIncludingIncorrectTemplates()), vals) expectedErr := "\"hello/templates/incorrect\" at <.Values.bad.doh>: nil pointer evaluating interface {}.doh" if err == nil { t.Fatalf("Install should fail containing error: %s", expectedErr) @@ -298,7 +303,7 @@ func TestInstallRelease_NoHooks(t *testing.T) { instAction.cfg.Releases.Create(releaseStub()) vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(), vals) + res, err := instAction.RunWithContext(context.Background(), buildChart(), vals) if err != nil { t.Fatalf("Failed install: %s", err) } @@ -315,7 +320,7 @@ func TestInstallRelease_FailedHooks(t *testing.T) { instAction.cfg.KubeClient = failer vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(), vals) + res, err := instAction.RunWithContext(context.Background(), buildChart(), vals) is.Error(err) is.Contains(res.Info.Description, "failed post-install") is.Equal(release.StatusFailed, res.Info.Status) @@ -332,7 +337,8 @@ func TestInstallRelease_ReplaceRelease(t *testing.T) { instAction.ReleaseName = rel.Name vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(), vals) + + res, err := instAction.RunWithContext(context.Background(), buildChart(), vals) is.NoError(err) // This should have been auto-incremented @@ -348,13 +354,15 @@ func TestInstallRelease_KubeVersion(t *testing.T) { is := assert.New(t) instAction := installAction(t) vals := map[string]interface{}{} - _, err := instAction.Run(buildChart(withKube(">=0.0.0")), vals) + + _, err := instAction.RunWithContext(context.Background(), buildChart(withKube(">=0.0.0")), vals) is.NoError(err) // This should fail for a few hundred years instAction.ReleaseName = "should-fail" vals = map[string]interface{}{} - _, err = instAction.Run(buildChart(withKube(">=99.0.0")), vals) + + _, err = instAction.RunWithContext(context.Background(), buildChart(withKube(">=99.0.0")), vals) is.Error(err) is.Contains(err.Error(), "chart requires kubeVersion") } @@ -369,7 +377,7 @@ func TestInstallRelease_Wait(t *testing.T) { instAction.Wait = true vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(), vals) + res, err := instAction.RunWithContext(context.Background(), buildChart(), vals) is.Error(err) is.Contains(res.Info.Description, "I timed out") is.Equal(res.Info.Status, release.StatusFailed) @@ -379,15 +387,13 @@ func TestInstallRelease_Wait_Interrupted(t *testing.T) { instAction := installAction(t) instAction.ReleaseName = "interrupted-release" failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) - failer.WaitDuration = 10 * time.Second instAction.cfg.KubeClient = failer instAction.Wait = true vals := map[string]interface{}{} ctx := context.Background() ctx, cancel := context.WithCancel(ctx) - time.AfterFunc(time.Second, cancel) - //cancel() + cancel() res, err := instAction.RunWithContext(ctx, buildChart(), vals) is.Error(err) @@ -405,7 +411,7 @@ func TestInstallRelease_WaitForJobs(t *testing.T) { instAction.WaitForJobs = true vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(), vals) + res, err := instAction.RunWithContext(context.Background(), buildChart(), vals) is.Error(err) is.Contains(res.Info.Description, "I timed out") is.Equal(res.Info.Status, release.StatusFailed) @@ -421,9 +427,10 @@ func TestInstallRelease_Atomic(t *testing.T) { failer.WaitError = fmt.Errorf("I timed out") instAction.cfg.KubeClient = failer instAction.Atomic = true + vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(), vals) + res, err := instAction.RunWithContext(context.Background(), buildChart(), vals) is.Error(err) is.Contains(err.Error(), "I timed out") is.Contains(err.Error(), "atomic") @@ -444,7 +451,7 @@ func TestInstallRelease_Atomic(t *testing.T) { instAction.Atomic = true vals := map[string]interface{}{} - _, err := instAction.Run(buildChart(), vals) + _, err := instAction.RunWithContext(context.Background(), buildChart(), vals) is.Error(err) is.Contains(err.Error(), "I timed out") is.Contains(err.Error(), "uninstall fail") @@ -457,14 +464,13 @@ func TestInstallRelease_Atomic_Interrupted(t *testing.T) { instAction := installAction(t) instAction.ReleaseName = "interrupted-release" failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) - failer.WaitDuration = 10 * time.Second instAction.cfg.KubeClient = failer instAction.Atomic = true vals := map[string]interface{}{} ctx := context.Background() ctx, cancel := context.WithCancel(ctx) - time.AfterFunc(time.Second, cancel) + cancel() res, err := instAction.RunWithContext(ctx, buildChart(), vals) is.Error(err) @@ -556,7 +562,7 @@ func TestInstallReleaseOutputDir(t *testing.T) { instAction.OutputDir = dir - _, err := instAction.Run(buildChart(withSampleTemplates(), withMultipleManifestTemplate()), vals) + _, err := instAction.RunWithContext(context.Background(), buildChart(withSampleTemplates(), withMultipleManifestTemplate()), vals) if err != nil { t.Fatalf("Failed install: %s", err) } @@ -592,7 +598,7 @@ func TestInstallOutputDirWithReleaseName(t *testing.T) { newDir := filepath.Join(dir, instAction.ReleaseName) - _, err := instAction.Run(buildChart(withSampleTemplates(), withMultipleManifestTemplate()), vals) + _, err := instAction.RunWithContext(context.Background(), buildChart(withSampleTemplates(), withMultipleManifestTemplate()), vals) if err != nil { t.Fatalf("Failed install: %s", err) } diff --git a/pkg/action/uninstall_test.go b/pkg/action/uninstall_test.go index 311a34923..741abc7da 100644 --- a/pkg/action/uninstall_test.go +++ b/pkg/action/uninstall_test.go @@ -17,6 +17,7 @@ limitations under the License. package action import ( + "context" "fmt" "testing" @@ -57,7 +58,10 @@ func TestUninstallRelease_deleteRelease(t *testing.T) { } }` unAction.cfg.Releases.Create(rel) - res, err := unAction.Run(rel.Name) + + ctx := context.Background() + + res, err := unAction.RunWithContext(ctx, rel.Name) is.NoError(err) expected := `These resources were kept due to the resource policy: [Secret] secret @@ -90,7 +94,10 @@ func TestUninstallRelease_Wait(t *testing.T) { failer := unAction.cfg.KubeClient.(*kubefake.FailingKubeClient) failer.WaitError = fmt.Errorf("U timed out") unAction.cfg.KubeClient = failer - res, err := unAction.Run(rel.Name) + + ctx := context.Background() + + res, err := unAction.RunWithContext(ctx, rel.Name) is.Error(err) is.Contains(err.Error(), "U timed out") is.Equal(res.Release.Info.Status, release.StatusUninstalled) @@ -123,7 +130,10 @@ func TestUninstallRelease_Cascade(t *testing.T) { failer.DeleteWithPropagationError = fmt.Errorf("Uninstall with cascade failed") failer.BuildDummy = true unAction.cfg.KubeClient = failer - _, err := unAction.Run(rel.Name) + + ctx := context.Background() + + _, err := unAction.RunWithContext(ctx, rel.Name) is.Error(err) is.Contains(err.Error(), "failed to delete release: come-fail-away") } diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index 975c5f7e7..cdad2e121 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -364,17 +364,18 @@ func (u *Upgrade) releasingUpgrade(ctx context.Context, upgradedRelease *release } } - if u.Wait { + kubeClient, ok := u.cfg.KubeClient.(kube.ContextInterface) + if u.Wait && ok { u.cfg.Log( "waiting for release %s resources (created: %d updated: %d deleted: %d)", upgradedRelease.Name, len(results.Created), len(results.Updated), len(results.Deleted)) if u.WaitForJobs { - if err := u.cfg.KubeClient.WaitWithJobs(target, u.Timeout); err != nil { + if err := kubeClient.WaitWithJobsContext(ctx, target); err != nil { u.cfg.recordRelease(originalRelease) return u.failRelease(upgradedRelease, results.Created, err) } } else { - if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil { + if err := kubeClient.WaitWithContext(ctx, target); err != nil { u.cfg.recordRelease(originalRelease) return u.failRelease(upgradedRelease, results.Created, err) } diff --git a/pkg/action/upgrade_test.go b/pkg/action/upgrade_test.go index 62922b373..22cb1042a 100644 --- a/pkg/action/upgrade_test.go +++ b/pkg/action/upgrade_test.go @@ -54,6 +54,7 @@ func TestUpgradeRelease_Success(t *testing.T) { vals := map[string]interface{}{} ctx, done := context.WithCancel(context.Background()) + res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) done() req.NoError(err) @@ -83,7 +84,9 @@ func TestUpgradeRelease_Wait(t *testing.T) { upAction.Wait = true vals := map[string]interface{}{} - res, err := upAction.Run(rel.Name, buildChart(), vals) + ctx := context.Background() + + res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) req.Error(err) is.Contains(res.Info.Description, "I timed out") is.Equal(res.Info.Status, release.StatusFailed) @@ -106,7 +109,9 @@ func TestUpgradeRelease_WaitForJobs(t *testing.T) { upAction.WaitForJobs = true vals := map[string]interface{}{} - res, err := upAction.Run(rel.Name, buildChart(), vals) + ctx := context.Background() + + res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) req.Error(err) is.Contains(res.Info.Description, "I timed out") is.Equal(res.Info.Status, release.StatusFailed) @@ -130,7 +135,9 @@ func TestUpgradeRelease_CleanupOnFail(t *testing.T) { upAction.CleanupOnFail = true vals := map[string]interface{}{} - res, err := upAction.Run(rel.Name, buildChart(), vals) + ctx := context.Background() + + res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) req.Error(err) is.NotContains(err.Error(), "unable to cleanup resources") is.Contains(res.Info.Description, "I timed out") @@ -156,7 +163,9 @@ func TestUpgradeRelease_Atomic(t *testing.T) { upAction.Atomic = true vals := map[string]interface{}{} - res, err := upAction.Run(rel.Name, buildChart(), vals) + ctx := context.Background() + + res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) req.Error(err) is.Contains(err.Error(), "arming key removed") is.Contains(err.Error(), "atomic") @@ -181,7 +190,9 @@ func TestUpgradeRelease_Atomic(t *testing.T) { upAction.Atomic = true vals := map[string]interface{}{} - _, err := upAction.Run(rel.Name, buildChart(), vals) + ctx := context.Background() + + _, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) req.Error(err) is.Contains(err.Error(), "update fail") is.Contains(err.Error(), "an error occurred while rolling back the release") @@ -220,8 +231,11 @@ func TestUpgradeRelease_ReuseValues(t *testing.T) { is.NoError(err) upAction.ReuseValues = true + + ctx := context.Background() + // setting newValues and upgrading - res, err := upAction.Run(rel.Name, buildChart(), newValues) + res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), newValues) is.NoError(err) // Now make sure it is actually upgraded @@ -282,8 +296,11 @@ func TestUpgradeRelease_ReuseValues(t *testing.T) { withDependency(withName("subchart")), withMetadataDependency(dependency), ) + + ctx := context.Background() + // reusing values and upgrading - res, err := upAction.Run(rel.Name, sampleChartWithSubChart, map[string]interface{}{}) + res, err := upAction.RunWithContext(ctx, rel.Name, sampleChartWithSubChart, map[string]interface{}{}) is.NoError(err) // Now get the upgraded release @@ -322,7 +339,9 @@ func TestUpgradeRelease_Pending(t *testing.T) { vals := map[string]interface{}{} - _, err := upAction.Run(rel.Name, buildChart(), vals) + ctx := context.Background() + + _, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) req.Contains(err.Error(), "progress", err) } @@ -338,14 +357,13 @@ func TestUpgradeRelease_Interrupted_Wait(t *testing.T) { upAction.cfg.Releases.Create(rel) failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) - failer.WaitDuration = 10 * time.Second upAction.cfg.KubeClient = failer upAction.Wait = true vals := map[string]interface{}{} ctx := context.Background() ctx, cancel := context.WithCancel(ctx) - time.AfterFunc(time.Second, cancel) + cancel() res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) @@ -367,14 +385,13 @@ func TestUpgradeRelease_Interrupted_Atomic(t *testing.T) { upAction.cfg.Releases.Create(rel) failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) - failer.WaitDuration = 5 * time.Second upAction.cfg.KubeClient = failer upAction.Atomic = true vals := map[string]interface{}{} ctx := context.Background() ctx, cancel := context.WithCancel(ctx) - time.AfterFunc(time.Second, cancel) + cancel() res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) diff --git a/pkg/kube/fake/fake.go b/pkg/kube/fake/fake.go index 820518ed9..a2a5186e3 100644 --- a/pkg/kube/fake/fake.go +++ b/pkg/kube/fake/fake.go @@ -77,11 +77,15 @@ func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration) e // Waits the amount of time defined on f.WaitDuration, then returns the configured error if set or prints. func (f *FailingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList) error { - time.Sleep(f.WaitDuration) if f.WaitError != nil { return f.WaitError } - return f.PrintingKubeClient.Wait(resources, 0) + + if err := ctx.Err(); err != nil { + return err + } + + return f.PrintingKubeClient.WaitWithContext(ctx, resources) } // WaitWithJobs returns the configured error if set or prints @@ -97,6 +101,11 @@ func (f *FailingKubeClient) WaitWithJobsContext(ctx context.Context, resources k if f.WaitError != nil { return f.WaitError } + + if err := ctx.Err(); err != nil { + return err + } + return f.PrintingKubeClient.WaitWithJobsContext(ctx, resources) } @@ -113,7 +122,12 @@ func (f *FailingKubeClient) WaitForDeleteWithContext(ctx context.Context, resour if f.WaitError != nil { return f.WaitError } - return f.PrintingKubeClient.WaitForDelete(resources, 0) + + if err := ctx.Err(); err != nil { + return err + } + + return f.PrintingKubeClient.WaitForDeleteWithContext(ctx, resources) } // Delete returns the configured error if set or prints @@ -172,7 +186,8 @@ func (f *FailingKubeClient) WaitAndGetCompletedPodPhaseWithContext(ctx context.C if f.WaitAndGetCompletedPodPhaseError != nil { return v1.PodSucceeded, f.WaitAndGetCompletedPodPhaseError } - return f.PrintingKubeClient.WaitAndGetCompletedPodPhase(s, 0) + + return f.PrintingKubeClient.WaitAndGetCompletedPodPhaseWithContext(ctx, s) } // DeleteWithPropagationPolicy returns the configured error if set or prints @@ -188,6 +203,10 @@ func (f *FailingKubeClient) WatchUntilReadyWithContext(ctx context.Context, reso return f.WatchUntilReadyError } + if err := ctx.Err(); err != nil { + return err + } + return f.PrintingKubeClient.WatchUntilReady(resources, 0) } diff --git a/pkg/kube/interface.go b/pkg/kube/interface.go index 35ce5a271..1c844dd1f 100644 --- a/pkg/kube/interface.go +++ b/pkg/kube/interface.go @@ -79,10 +79,22 @@ type Interface interface { type ContextInterface interface { // WaitWithContext waits till a ctx timeout for the specified resources to be ready. WaitWithContext(ctx context.Context, resources ResourceList) error - // WaitWithJobsContext wait up to the given ctx timeout for the specified resources to be ready, including jobs. + // WaitWithJobsContext waits till a ctx timeout for the specified resources to be ready, including jobs. WaitWithJobsContext(ctx context.Context, resources ResourceList) error + // WatchUntilReadyWithContext watches the resources given and waits until it is ready. + // + // This method is mainly for hook implementations. It watches for a resource to + // hit a particular milestone. The milestone depends on the Kind. + // + // For Jobs, "ready" means the Job ran to completion (exited without error). + // For Pods, "ready" means the Pod phase is marked "succeeded". + // For all other kinds, it means the kind was created or modified without + // error. WatchUntilReadyWithContext(context.Context, ResourceList) error + // WaitAndGetCompletedPodPhaseWithContext waits up to a timeout until a pod enters a completed phase + // and returns said phase (PodSucceeded or PodFailed qualify). WaitAndGetCompletedPodPhaseWithContext(context.Context, string) (v1.PodPhase, error) + // WaitForDeleteWithContext waits till a ctx timeout for the specified resources to be deleted. WaitForDeleteWithContext(context.Context, ResourceList) error } From 0f22ab8223032a0e3fd65ca5a9bbea3fa4ec3634 Mon Sep 17 00:00:00 2001 From: Josh Rowley Date: Sat, 27 May 2023 18:34:25 -0700 Subject: [PATCH 3/4] removed deadcode Signed-off-by: Josh Rowley --- pkg/action/install.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/action/install.go b/pkg/action/install.go index 4e9b7a2b5..b1764c1db 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -361,10 +361,6 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma } } - // if err := ctx.Err(); err != nil { - // return rel, err - // } - // Store the release in history before continuing (new in Helm 3). We always know // that this is a create operation. if err := i.cfg.Releases.Create(rel); err != nil { From ab0f14fe2ee7aea6ab23d0754d7096f6df039974 Mon Sep 17 00:00:00 2001 From: Josh Rowley Date: Wed, 7 Jun 2023 20:43:06 -0700 Subject: [PATCH 4/4] addressed PR feedback Signed-off-by: Josh Rowley --- pkg/action/hooks.go | 12 ++++++-- pkg/action/install.go | 58 +++++++++++++++++++---------------- pkg/action/install_test.go | 13 +++----- pkg/action/release_testing.go | 7 ++--- pkg/action/rollback.go | 9 ++---- pkg/action/uninstall.go | 22 +++++++++---- pkg/action/upgrade.go | 39 ++++++++++++++--------- pkg/action/upgrade_test.go | 17 +++++----- pkg/kube/client.go | 2 +- pkg/kube/fake/fake.go | 20 +----------- pkg/kube/wait.go | 14 ++++++--- 11 files changed, 110 insertions(+), 103 deletions(-) diff --git a/pkg/action/hooks.go b/pkg/action/hooks.go index a050b466a..600a4c006 100644 --- a/pkg/action/hooks.go +++ b/pkg/action/hooks.go @@ -19,6 +19,7 @@ import ( "bytes" "context" "sort" + "time" "github.com/pkg/errors" @@ -28,7 +29,7 @@ import ( ) // execHook executes all of the hooks for the given hook event. -func (cfg *Configuration) execHook(ctx context.Context, rl *release.Release, hook release.HookEvent) error { +func (cfg *Configuration) execHook(timeout time.Duration, rl *release.Release, hook release.HookEvent) error { executingHooks := []*release.Hook{} for _, h := range rl.Hooks { @@ -81,12 +82,17 @@ func (cfg *Configuration) execHook(ctx context.Context, rl *release.Release, hoo } // Check if kube.Interface implementation satisfies kube.ContextInterface interface. - // If it doesn't log a warning and move on, nothing we can do. + // If not, fallback to time based watch to maintain backwards compatibility. if kubeClient, ok := cfg.KubeClient.(kube.ContextInterface); ok { + // Helm 4 TODO: WatchUntilReady should be replaced with it's context + // aware counterpart. + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() // Watch hook resources until they have completed err = kubeClient.WatchUntilReadyWithContext(ctx, resources) } else { - cfg.Log("WARNING: kube.ContextInterface not satisfied") + // + err = cfg.KubeClient.WatchUntilReady(resources, timeout) } // Note the time of success/failure diff --git a/pkg/action/install.go b/pkg/action/install.go index b1764c1db..5e43f47a8 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -205,10 +205,7 @@ func (i *Install) installCRDs(crds []chart.CRD) error { // // If DryRun is set to true, this will prepare the release, but not install it func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.Release, error) { - ctx, cancel := context.WithTimeout(context.TODO(), i.Timeout) - defer cancel() - - return i.RunWithContext(ctx, chrt, vals) + return i.RunWithContext(context.TODO(), chrt, vals) } // Run executes the installation with Context @@ -376,8 +373,8 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma func (i *Install) performInstall(ctx context.Context, rel *release.Release, toBeAdopted, resources kube.ResourceList) (*release.Release, error) { // pre-install hooks if !i.DisableHooks { - if err := i.cfg.execHook(ctx, rel, release.HookPreInstall); err != nil { - return i.failRelease(ctx, rel, fmt.Errorf("failed pre-install: %w", err)) + if err := i.cfg.execHook(i.Timeout, rel, release.HookPreInstall); err != nil { + return i.failRelease(rel, fmt.Errorf("failed pre-install: %w", err)) } } @@ -386,31 +383,42 @@ func (i *Install) performInstall(ctx context.Context, rel *release.Release, toBe // to true, since that is basically an upgrade operation. if len(toBeAdopted) == 0 && len(resources) > 0 { if _, err := i.cfg.KubeClient.Create(resources); err != nil { - return i.failRelease(ctx, rel, err) + return i.failRelease(rel, err) } } else if len(resources) > 0 { if _, err := i.cfg.KubeClient.Update(toBeAdopted, resources, i.Force); err != nil { - return i.failRelease(ctx, rel, err) + return i.failRelease(rel, err) } } - // Check if kube.Interface implementation, also satisfies kube.ContextInterface. - kubeClient, ok := i.cfg.KubeClient.(kube.ContextInterface) - if i.Wait && ok { - if i.WaitForJobs { - if err := kubeClient.WaitWithJobsContext(ctx, resources); err != nil { - return i.failRelease(ctx, rel, err) - } - } else { - if err := kubeClient.WaitWithContext(ctx, resources); err != nil { - return i.failRelease(ctx, rel, err) - } + if i.Wait { + var err error + + ctx, cancel := context.WithTimeout(ctx, i.Timeout) + defer cancel() + + kubeClient, ok := i.cfg.KubeClient.(kube.ContextInterface) + // Helm 4 TODO: WaitWithJobs and Wait should be replaced with their context + // aware counterparts. + switch { + case ok && i.WaitForJobs: + err = kubeClient.WaitWithJobsContext(ctx, resources) + case ok && !i.WaitForJobs: + err = kubeClient.WaitWithContext(ctx, resources) + case i.WaitForJobs: + err = i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout) + case !i.WaitForJobs: + err = i.cfg.KubeClient.Wait(resources, i.Timeout) + } + + if err != nil { + return i.failRelease(rel, err) } } if !i.DisableHooks { - if err := i.cfg.execHook(ctx, rel, release.HookPostInstall); err != nil { - return i.failRelease(ctx, rel, fmt.Errorf("failed post-install: %w", err)) + if err := i.cfg.execHook(i.Timeout, rel, release.HookPostInstall); err != nil { + return i.failRelease(rel, fmt.Errorf("failed post-install: %w", err)) } } @@ -434,18 +442,16 @@ func (i *Install) performInstall(ctx context.Context, rel *release.Release, toBe return rel, nil } -func (i *Install) failRelease(ctx context.Context, rel *release.Release, err error) (*release.Release, error) { +func (i *Install) failRelease(rel *release.Release, err error) (*release.Release, error) { rel.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", i.ReleaseName, err.Error())) if i.Atomic { i.cfg.Log("Install failed and atomic is set, uninstalling release") uninstall := NewUninstall(i.cfg) uninstall.DisableHooks = i.DisableHooks uninstall.KeepHistory = false - // TODO: Not sure if a new ctx should be created by the timeout field, - // because Timeout will be replaced by contexts. Should a background ctx be used - // so we don't timeout while uninstalling? uninstall.Timeout = i.Timeout - if _, uninstallErr := uninstall.RunWithContext(context.Background(), i.ReleaseName); uninstallErr != nil { + // Helm 4 TODO: Uninstalling needs to be handled properly on a failed atomic install. + if _, uninstallErr := uninstall.RunWithContext(context.TODO(), i.ReleaseName); uninstallErr != nil { return rel, errors.Wrapf(uninstallErr, "an error occurred while uninstalling the release. original install error: %s", err) } return rel, errors.Wrapf(err, "release %s failed, and has been uninstalled due to atomic being set", i.ReleaseName) diff --git a/pkg/action/install_test.go b/pkg/action/install_test.go index ca48e6cde..7354547e9 100644 --- a/pkg/action/install_test.go +++ b/pkg/action/install_test.go @@ -387,15 +387,13 @@ func TestInstallRelease_Wait_Interrupted(t *testing.T) { instAction := installAction(t) instAction.ReleaseName = "interrupted-release" failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + failer.WaitError = context.Canceled instAction.cfg.KubeClient = failer instAction.Wait = true vals := map[string]interface{}{} - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - cancel() + res, err := instAction.RunWithContext(context.Background(), buildChart(), vals) - res, err := instAction.RunWithContext(ctx, buildChart(), vals) is.Error(err) is.Contains(res.Info.Description, "Release \"interrupted-release\" failed: context canceled") is.Equal(res.Info.Status, release.StatusFailed) @@ -464,15 +462,12 @@ func TestInstallRelease_Atomic_Interrupted(t *testing.T) { instAction := installAction(t) instAction.ReleaseName = "interrupted-release" failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + failer.WaitError = context.Canceled instAction.cfg.KubeClient = failer instAction.Atomic = true vals := map[string]interface{}{} - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - cancel() - - res, err := instAction.RunWithContext(ctx, buildChart(), vals) + res, err := instAction.RunWithContext(context.Background(), buildChart(), vals) is.Error(err) is.Contains(err.Error(), "context canceled") is.Contains(err.Error(), "atomic") diff --git a/pkg/action/release_testing.go b/pkg/action/release_testing.go index 2dc4e7a41..0ef9490e2 100644 --- a/pkg/action/release_testing.go +++ b/pkg/action/release_testing.go @@ -87,7 +87,7 @@ func (r *ReleaseTesting) RunWithContext(ctx context.Context, name string) (*rele rel.Hooks = executingHooks } - if err := r.cfg.execHook(ctx, rel, release.HookTest); err != nil { + if err := r.cfg.execHook(r.Timeout, rel, release.HookTest); err != nil { rel.Hooks = append(skippedHooks, rel.Hooks...) r.cfg.Releases.Update(rel) return rel, err @@ -99,10 +99,7 @@ func (r *ReleaseTesting) RunWithContext(ctx context.Context, name string) (*rele // Run executes 'helm test' against the given release. func (r *ReleaseTesting) Run(name string) (*release.Release, error) { - ctx, cancel := context.WithTimeout(context.TODO(), r.Timeout) - defer cancel() - - return r.RunWithContext(ctx, name) + return r.RunWithContext(context.TODO(), name) } // GetPodLogs will write the logs for all test pods in the given release into diff --git a/pkg/action/rollback.go b/pkg/action/rollback.go index 989e771f1..11e3cb52c 100644 --- a/pkg/action/rollback.go +++ b/pkg/action/rollback.go @@ -91,10 +91,7 @@ func (r *Rollback) RunWithContext(ctx context.Context, name string) error { // Run executes 'helm rollback' against the given release. func (r *Rollback) Run(name string) error { - ctx, cancel := context.WithTimeout(context.TODO(), r.Timeout) - defer cancel() - - return r.RunWithContext(ctx, name) + return r.RunWithContext(context.TODO(), name) } // prepareRollback finds the previous release and prepares a new release object with @@ -165,7 +162,7 @@ func (r *Rollback) performRollback(ctx context.Context, currentRelease, targetRe // pre-rollback hooks if !r.DisableHooks { - if err := r.cfg.execHook(ctx, targetRelease, release.HookPreRollback); err != nil { + if err := r.cfg.execHook(r.Timeout, targetRelease, release.HookPreRollback); err != nil { return targetRelease, err } } else { @@ -232,7 +229,7 @@ func (r *Rollback) performRollback(ctx context.Context, currentRelease, targetRe // post-rollback hooks if !r.DisableHooks { - if err := r.cfg.execHook(ctx, targetRelease, release.HookPostRollback); err != nil { + if err := r.cfg.execHook(r.Timeout, targetRelease, release.HookPostRollback); err != nil { return targetRelease, err } } diff --git a/pkg/action/uninstall.go b/pkg/action/uninstall.go index 9d2078237..87a77e879 100644 --- a/pkg/action/uninstall.go +++ b/pkg/action/uninstall.go @@ -109,7 +109,7 @@ func (u *Uninstall) RunWithContext(ctx context.Context, name string) (*release.U res := &release.UninstallReleaseResponse{Release: rel} if !u.DisableHooks { - if err := u.cfg.execHook(ctx, rel, release.HookPreDelete); err != nil { + if err := u.cfg.execHook(u.Timeout, rel, release.HookPreDelete); err != nil { return res, err } } else { @@ -134,15 +134,25 @@ func (u *Uninstall) RunWithContext(ctx context.Context, name string) (*release.U res.Info = kept if u.Wait { - if kubeClient, ok := u.cfg.KubeClient.(kube.ContextInterface); ok { - if err := kubeClient.WaitForDeleteWithContext(ctx, deletedResources); err != nil { - errs = append(errs, err) - } + var err error + // Helm 4 TODO: WaitForDelete should be replaced with it's context + // aware counterpart. + switch kubeClient := u.cfg.KubeClient.(type) { + case kube.ContextInterface: + err = kubeClient.WaitForDeleteWithContext(ctx, deletedResources) + case kube.InterfaceExt: + err = kubeClient.WaitForDelete(deletedResources, u.Timeout) + default: + u.cfg.Log("WARNING: KubeClient does not satisfy ContextInterface, or InterfaceExt") + } + + if err != nil { + errs = append(errs, err) } } if !u.DisableHooks { - if err := u.cfg.execHook(ctx, rel, release.HookPostDelete); err != nil { + if err := u.cfg.execHook(u.Timeout, rel, release.HookPostDelete); err != nil { errs = append(errs, err) } } diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index cdad2e121..fe4b3277d 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -341,7 +341,7 @@ func (u *Upgrade) releasingUpgrade(ctx context.Context, upgradedRelease *release // pre-upgrade hooks if !u.DisableHooks { - if err := u.cfg.execHook(ctx, upgradedRelease, release.HookPreUpgrade); err != nil { + if err := u.cfg.execHook(u.Timeout, upgradedRelease, release.HookPreUpgrade); err != nil { return u.failRelease(upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %w", err)) } } else { @@ -364,27 +364,38 @@ func (u *Upgrade) releasingUpgrade(ctx context.Context, upgradedRelease *release } } - kubeClient, ok := u.cfg.KubeClient.(kube.ContextInterface) - if u.Wait && ok { + if u.Wait { u.cfg.Log( "waiting for release %s resources (created: %d updated: %d deleted: %d)", upgradedRelease.Name, len(results.Created), len(results.Updated), len(results.Deleted)) - if u.WaitForJobs { - if err := kubeClient.WaitWithJobsContext(ctx, target); err != nil { - u.cfg.recordRelease(originalRelease) - return u.failRelease(upgradedRelease, results.Created, err) - } - } else { - if err := kubeClient.WaitWithContext(ctx, target); err != nil { - u.cfg.recordRelease(originalRelease) - return u.failRelease(upgradedRelease, results.Created, err) - } + var err error + + ctx, cancel := context.WithTimeout(ctx, u.Timeout) + defer cancel() + + kubeClient, ok := u.cfg.KubeClient.(kube.ContextInterface) + // Helm 4 TODO: WaitWithJobs and Wait should be replaced with their context + // aware counterparts. + switch { + case ok && u.WaitForJobs: + err = kubeClient.WaitWithJobsContext(ctx, target) + case ok && !u.WaitForJobs: + err = kubeClient.WaitWithContext(ctx, target) + case u.WaitForJobs: + err = u.cfg.KubeClient.WaitWithJobs(target, u.Timeout) + case !u.WaitForJobs: + err = u.cfg.KubeClient.Wait(target, u.Timeout) + } + + if err != nil { + u.cfg.recordRelease(originalRelease) + return u.failRelease(upgradedRelease, results.Created, err) } } // post-upgrade hooks if !u.DisableHooks { - if err := u.cfg.execHook(ctx, upgradedRelease, release.HookPostUpgrade); err != nil { + if err := u.cfg.execHook(u.Timeout, upgradedRelease, release.HookPostUpgrade); err != nil { return u.failRelease(upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %w", err)) } } diff --git a/pkg/action/upgrade_test.go b/pkg/action/upgrade_test.go index 22cb1042a..55b294e6d 100644 --- a/pkg/action/upgrade_test.go +++ b/pkg/action/upgrade_test.go @@ -357,15 +357,12 @@ func TestUpgradeRelease_Interrupted_Wait(t *testing.T) { upAction.cfg.Releases.Create(rel) failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + failer.WaitError = context.Canceled upAction.cfg.KubeClient = failer upAction.Wait = true vals := map[string]interface{}{} - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - cancel() - - res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) + res, err := upAction.RunWithContext(context.Background(), rel.Name, buildChart(), vals) req.Error(err) is.Contains(res.Info.Description, "Upgrade \"interrupted-release\" failed: context canceled") @@ -385,15 +382,17 @@ func TestUpgradeRelease_Interrupted_Atomic(t *testing.T) { upAction.cfg.Releases.Create(rel) failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + failer.WaitError = context.Canceled + failer.WaitDuration = 2 * time.Second upAction.cfg.KubeClient = failer upAction.Atomic = true vals := map[string]interface{}{} - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - cancel() + // After the first Wait error, error needs to be set nil + // so atomic cleanup passes. + time.AfterFunc(failer.WaitDuration, func() { failer.WaitError = nil }) - res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) + res, err := upAction.RunWithContext(context.Background(), rel.Name, buildChart(), vals) req.Error(err) is.Contains(err.Error(), "release interrupted-release failed, and has been rolled back due to atomic being set: context canceled") diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 63a28b6fa..647175235 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -739,7 +739,7 @@ func (c *Client) watchUntilReady(ctx context.Context, info *resource.Info) error return nil } - c.Log("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeFromCtx(ctx)) + c.Log("Watching for changes to %s %s with timeout of %q", kind, info.Name, timeFromCtx(ctx)) // Use a selector on the name of the resource. This should be unique for the // given version and kind diff --git a/pkg/kube/fake/fake.go b/pkg/kube/fake/fake.go index a2a5186e3..7addb364a 100644 --- a/pkg/kube/fake/fake.go +++ b/pkg/kube/fake/fake.go @@ -77,14 +77,11 @@ func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration) e // Waits the amount of time defined on f.WaitDuration, then returns the configured error if set or prints. func (f *FailingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList) error { + time.Sleep(f.WaitDuration) if f.WaitError != nil { return f.WaitError } - if err := ctx.Err(); err != nil { - return err - } - return f.PrintingKubeClient.WaitWithContext(ctx, resources) } @@ -101,11 +98,6 @@ func (f *FailingKubeClient) WaitWithJobsContext(ctx context.Context, resources k if f.WaitError != nil { return f.WaitError } - - if err := ctx.Err(); err != nil { - return err - } - return f.PrintingKubeClient.WaitWithJobsContext(ctx, resources) } @@ -122,11 +114,6 @@ func (f *FailingKubeClient) WaitForDeleteWithContext(ctx context.Context, resour if f.WaitError != nil { return f.WaitError } - - if err := ctx.Err(); err != nil { - return err - } - return f.PrintingKubeClient.WaitForDeleteWithContext(ctx, resources) } @@ -202,11 +189,6 @@ func (f *FailingKubeClient) WatchUntilReadyWithContext(ctx context.Context, reso if f.WatchUntilReadyError != nil { return f.WatchUntilReadyError } - - if err := ctx.Err(); err != nil { - return err - } - return f.PrintingKubeClient.WatchUntilReady(resources, 0) } diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index 03268f59a..6d353719c 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -42,17 +42,21 @@ type waiter struct { } // timeFromCtx extracts time until deadline of ctx. -func timeFromCtx(ctx context.Context) time.Duration { - deadline, _ := ctx.Deadline() +func timeFromCtx(ctx context.Context) string { + deadline, ok := ctx.Deadline() + // No deadline means context won't timeout + if !ok { + return "none" + } - return time.Until(deadline) + return time.Until(deadline).String() } // waitForResources polls to get the current status of all pods, PVCs, Services and // Jobs(optional) until all are ready or a timeout is reached func (w *waiter) waitForResources(ctx context.Context, created ResourceList) error { - w.log("beginning wait for %d resources with timeout of %v", len(created), timeFromCtx(ctx)) + w.log("beginning wait for %d resources with timeout of %q", len(created), timeFromCtx(ctx)) return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { for _, v := range created { @@ -67,7 +71,7 @@ func (w *waiter) waitForResources(ctx context.Context, created ResourceList) err // waitForDeletedResources polls to check if all the resources are deleted or a timeout is reached func (w *waiter) waitForDeletedResources(ctx context.Context, deleted ResourceList) error { - w.log("beginning wait for %d resources to be deleted with timeout of %v", len(deleted), timeFromCtx(ctx)) + w.log("beginning wait for %d resources to be deleted with timeout of %q", len(deleted), timeFromCtx(ctx)) return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { for _, v := range deleted {