diff --git a/pkg/action/hooks.go b/pkg/action/hooks.go index 40c1ffdb6..2490613c4 100644 --- a/pkg/action/hooks.go +++ b/pkg/action/hooks.go @@ -17,17 +17,18 @@ package action import ( "bytes" + "context" "sort" - "time" "github.com/pkg/errors" + "helm.sh/helm/v3/pkg/kube" "helm.sh/helm/v3/pkg/release" helmtime "helm.sh/helm/v3/pkg/time" ) // execHook executes all of the hooks for the given hook event. -func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent, timeout time.Duration) error { +func (cfg *Configuration) execHook(ctx context.Context, rl *release.Release, hook release.HookEvent) error { executingHooks := []*release.Hook{} for _, h := range rl.Hooks { @@ -79,8 +80,12 @@ func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent, return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path) } + ctxInterface, ok := cfg.KubeClient.(kube.ContextInterface) + if !ok { + panic("KubeClient does not satisfies kube.ContextInterface") + } // Watch hook resources until they have completed - err = cfg.KubeClient.WatchUntilReady(resources, timeout) + err = ctxInterface.WatchUntilReadyWithContext(ctx, resources) // Note the time of success/failure h.LastRun.CompletedAt = helmtime.Now() // Mark hook as succeeded or failed diff --git a/pkg/action/install.go b/pkg/action/install.go index d5c34cef7..2283bb9ef 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -206,7 +206,9 @@ func (i *Install) installCRDs(crds []chart.CRD) error { // If DryRun is set to true, this will prepare the release, but not install it func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.Release, error) { - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.TODO(), i.Timeout) + defer cancel() + return i.RunWithContext(ctx, chrt, vals) } @@ -360,6 +362,10 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma } } + // if err := ctx.Err(); err != nil { + // return rel, err + // } + // Store the release in history before continuing (new in Helm 3). We always know // that this is a create operation. if err := i.cfg.Releases.Create(rel); err != nil { @@ -368,27 +374,16 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma // not working. return rel, err } - rChan := make(chan resultMessage) - ctxChan := make(chan resultMessage) - doneChan := make(chan struct{}) - defer close(doneChan) - go i.performInstall(rChan, rel, toBeAdopted, resources) - go i.handleContext(ctx, ctxChan, doneChan, rel) - select { - case result := <-rChan: - return result.r, result.e - case result := <-ctxChan: - return result.r, result.e - } + + return i.performInstall(ctx, rel, toBeAdopted, resources) } -func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) { +func (i *Install) performInstall(ctx context.Context, rel *release.Release, toBeAdopted, resources kube.ResourceList) (*release.Release, error) { // pre-install hooks if !i.DisableHooks { - if err := i.cfg.execHook(rel, release.HookPreInstall, i.Timeout); err != nil { - i.reportToRun(c, rel, fmt.Errorf("failed pre-install: %s", err)) - return + if err := i.cfg.execHook(ctx, rel, release.HookPreInstall); err != nil { + return i.failRelease(rel, fmt.Errorf("failed pre-install: %w", err)) } } @@ -397,34 +392,31 @@ func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, t // to true, since that is basically an upgrade operation. if len(toBeAdopted) == 0 && len(resources) > 0 { if _, err := i.cfg.KubeClient.Create(resources); err != nil { - i.reportToRun(c, rel, err) - return + return i.failRelease(rel, err) } } else if len(resources) > 0 { if _, err := i.cfg.KubeClient.Update(toBeAdopted, resources, i.Force); err != nil { - i.reportToRun(c, rel, err) - return + return i.failRelease(rel, err) } } if i.Wait { + kubeClient := i.cfg.KubeClient.(kube.ContextInterface) + if i.WaitForJobs { - if err := i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout); err != nil { - i.reportToRun(c, rel, err) - return + if err := kubeClient.WaitWithJobsContext(ctx, resources); err != nil { + return i.failRelease(rel, err) } } else { - if err := i.cfg.KubeClient.Wait(resources, i.Timeout); err != nil { - i.reportToRun(c, rel, err) - return + if err := kubeClient.WaitWithContext(ctx, resources); err != nil { + return i.failRelease(rel, err) } } } if !i.DisableHooks { - if err := i.cfg.execHook(rel, release.HookPostInstall, i.Timeout); err != nil { - i.reportToRun(c, rel, fmt.Errorf("failed post-install: %s", err)) - return + if err := i.cfg.execHook(ctx, rel, release.HookPostInstall); err != nil { + return i.failRelease(rel, fmt.Errorf("failed post-install: %w", err)) } } @@ -445,25 +437,9 @@ func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, t i.cfg.Log("failed to record the release: %s", err) } - i.reportToRun(c, rel, nil) -} -func (i *Install) handleContext(ctx context.Context, c chan<- resultMessage, done chan struct{}, rel *release.Release) { - select { - case <-ctx.Done(): - err := ctx.Err() - i.reportToRun(c, rel, err) - case <-done: - return - } -} -func (i *Install) reportToRun(c chan<- resultMessage, rel *release.Release, err error) { - i.Lock.Lock() - if err != nil { - rel, err = i.failRelease(rel, err) - } - c <- resultMessage{r: rel, e: err} - i.Lock.Unlock() + return rel, nil } + func (i *Install) failRelease(rel *release.Release, err error) (*release.Release, error) { rel.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", i.ReleaseName, err.Error())) if i.Atomic { diff --git a/pkg/action/install_test.go b/pkg/action/install_test.go index dd0cdb54d..078608803 100644 --- a/pkg/action/install_test.go +++ b/pkg/action/install_test.go @@ -387,6 +387,7 @@ func TestInstallRelease_Wait_Interrupted(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) time.AfterFunc(time.Second, cancel) + //cancel() res, err := instAction.RunWithContext(ctx, buildChart(), vals) is.Error(err) diff --git a/pkg/action/release_testing.go b/pkg/action/release_testing.go index ecaeaf59f..2dc4e7a41 100644 --- a/pkg/action/release_testing.go +++ b/pkg/action/release_testing.go @@ -48,8 +48,7 @@ func NewReleaseTesting(cfg *Configuration) *ReleaseTesting { } } -// Run executes 'helm test' against the given release. -func (r *ReleaseTesting) Run(name string) (*release.Release, error) { +func (r *ReleaseTesting) RunWithContext(ctx context.Context, name string) (*release.Release, error) { if err := r.cfg.KubeClient.IsReachable(); err != nil { return nil, err } @@ -88,7 +87,7 @@ func (r *ReleaseTesting) Run(name string) (*release.Release, error) { rel.Hooks = executingHooks } - if err := r.cfg.execHook(rel, release.HookTest, r.Timeout); err != nil { + if err := r.cfg.execHook(ctx, rel, release.HookTest); err != nil { rel.Hooks = append(skippedHooks, rel.Hooks...) r.cfg.Releases.Update(rel) return rel, err @@ -98,6 +97,14 @@ func (r *ReleaseTesting) Run(name string) (*release.Release, error) { return rel, r.cfg.Releases.Update(rel) } +// Run executes 'helm test' against the given release. +func (r *ReleaseTesting) Run(name string) (*release.Release, error) { + ctx, cancel := context.WithTimeout(context.TODO(), r.Timeout) + defer cancel() + + return r.RunWithContext(ctx, name) +} + // GetPodLogs will write the logs for all test pods in the given release into // the given writer. These can be immediately output to the user or captured for // other uses diff --git a/pkg/action/rollback.go b/pkg/action/rollback.go index dda8c700b..989e771f1 100644 --- a/pkg/action/rollback.go +++ b/pkg/action/rollback.go @@ -18,6 +18,7 @@ package action import ( "bytes" + "context" "fmt" "strings" "time" @@ -54,8 +55,7 @@ func NewRollback(cfg *Configuration) *Rollback { } } -// Run executes 'helm rollback' against the given release. -func (r *Rollback) Run(name string) error { +func (r *Rollback) RunWithContext(ctx context.Context, name string) error { if err := r.cfg.KubeClient.IsReachable(); err != nil { return err } @@ -76,7 +76,7 @@ func (r *Rollback) Run(name string) error { } r.cfg.Log("performing rollback of %s", name) - if _, err := r.performRollback(currentRelease, targetRelease); err != nil { + if _, err := r.performRollback(ctx, currentRelease, targetRelease); err != nil { return err } @@ -89,6 +89,14 @@ func (r *Rollback) Run(name string) error { return nil } +// Run executes 'helm rollback' against the given release. +func (r *Rollback) Run(name string) error { + ctx, cancel := context.WithTimeout(context.TODO(), r.Timeout) + defer cancel() + + return r.RunWithContext(ctx, name) +} + // prepareRollback finds the previous release and prepares a new release object with // the previous release's configuration func (r *Rollback) prepareRollback(name string) (*release.Release, *release.Release, error) { @@ -140,7 +148,7 @@ func (r *Rollback) prepareRollback(name string) (*release.Release, *release.Rele return currentRelease, targetRelease, nil } -func (r *Rollback) performRollback(currentRelease, targetRelease *release.Release) (*release.Release, error) { +func (r *Rollback) performRollback(ctx context.Context, currentRelease, targetRelease *release.Release) (*release.Release, error) { if r.DryRun { r.cfg.Log("dry run for %s", targetRelease.Name) return targetRelease, nil @@ -157,7 +165,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas // pre-rollback hooks if !r.DisableHooks { - if err := r.cfg.execHook(targetRelease, release.HookPreRollback, r.Timeout); err != nil { + if err := r.cfg.execHook(ctx, targetRelease, release.HookPreRollback); err != nil { return targetRelease, err } } else { @@ -224,7 +232,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas // post-rollback hooks if !r.DisableHooks { - if err := r.cfg.execHook(targetRelease, release.HookPostRollback, r.Timeout); err != nil { + if err := r.cfg.execHook(ctx, targetRelease, release.HookPostRollback); err != nil { return targetRelease, err } } diff --git a/pkg/action/uninstall.go b/pkg/action/uninstall.go index 801498544..9d2078237 100644 --- a/pkg/action/uninstall.go +++ b/pkg/action/uninstall.go @@ -17,6 +17,7 @@ limitations under the License. package action import ( + "context" "strings" "time" @@ -54,6 +55,13 @@ func NewUninstall(cfg *Configuration) *Uninstall { // Run uninstalls the given release. func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error) { + ctx, cancel := context.WithTimeout(context.TODO(), u.Timeout) + defer cancel() + + return u.RunWithContext(ctx, name) +} + +func (u *Uninstall) RunWithContext(ctx context.Context, name string) (*release.UninstallReleaseResponse, error) { if err := u.cfg.KubeClient.IsReachable(); err != nil { return nil, err } @@ -101,7 +109,7 @@ func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error) res := &release.UninstallReleaseResponse{Release: rel} if !u.DisableHooks { - if err := u.cfg.execHook(rel, release.HookPreDelete, u.Timeout); err != nil { + if err := u.cfg.execHook(ctx, rel, release.HookPreDelete); err != nil { return res, err } } else { @@ -126,15 +134,15 @@ func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error) res.Info = kept if u.Wait { - if kubeClient, ok := u.cfg.KubeClient.(kube.InterfaceExt); ok { - if err := kubeClient.WaitForDelete(deletedResources, u.Timeout); err != nil { + if kubeClient, ok := u.cfg.KubeClient.(kube.ContextInterface); ok { + if err := kubeClient.WaitForDeleteWithContext(ctx, deletedResources); err != nil { errs = append(errs, err) } } } if !u.DisableHooks { - if err := u.cfg.execHook(rel, release.HookPostDelete, u.Timeout); err != nil { + if err := u.cfg.execHook(ctx, rel, release.HookPostDelete); err != nil { errs = append(errs, err) } } diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index 829be51df..975c5f7e7 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -130,7 +130,9 @@ func (u *Upgrade) SetRegistryClient(client *registry.Client) { // Run executes the upgrade on the given release. func (u *Upgrade) Run(name string, chart *chart.Chart, vals map[string]interface{}) (*release.Release, error) { - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.TODO(), u.Timeout) + defer cancel() + return u.RunWithContext(ctx, name, chart, vals) } @@ -331,51 +333,16 @@ func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedR if err := u.cfg.Releases.Create(upgradedRelease); err != nil { return nil, err } - rChan := make(chan resultMessage) - ctxChan := make(chan resultMessage) - doneChan := make(chan interface{}) - defer close(doneChan) - go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease) - go u.handleContext(ctx, doneChan, ctxChan, upgradedRelease) - select { - case result := <-rChan: - return result.r, result.e - case result := <-ctxChan: - return result.r, result.e - } -} -// Function used to lock the Mutex, this is important for the case when the atomic flag is set. -// In that case the upgrade will finish before the rollback is finished so it is necessary to wait for the rollback to finish. -// The rollback will be trigger by the function failRelease -func (u *Upgrade) reportToPerformUpgrade(c chan<- resultMessage, rel *release.Release, created kube.ResourceList, err error) { - u.Lock.Lock() - if err != nil { - rel, err = u.failRelease(rel, created, err) - } - c <- resultMessage{r: rel, e: err} - u.Lock.Unlock() + return u.releasingUpgrade(ctx, upgradedRelease, current, target, originalRelease) } -// Setup listener for SIGINT and SIGTERM -func (u *Upgrade) handleContext(ctx context.Context, done chan interface{}, c chan<- resultMessage, upgradedRelease *release.Release) { - select { - case <-ctx.Done(): - err := ctx.Err() - - // when the atomic flag is set the ongoing release finish first and doesn't give time for the rollback happens. - u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, err) - case <-done: - return - } -} -func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) { +func (u *Upgrade) releasingUpgrade(ctx context.Context, upgradedRelease *release.Release, current, target kube.ResourceList, originalRelease *release.Release) (*release.Release, error) { // pre-upgrade hooks if !u.DisableHooks { - if err := u.cfg.execHook(upgradedRelease, release.HookPreUpgrade, u.Timeout); err != nil { - u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %s", err)) - return + if err := u.cfg.execHook(ctx, upgradedRelease, release.HookPreUpgrade); err != nil { + return u.failRelease(upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %w", err)) } } else { u.cfg.Log("upgrade hooks disabled for %s", upgradedRelease.Name) @@ -384,8 +351,7 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele results, err := u.cfg.KubeClient.Update(current, target, u.Force) if err != nil { u.cfg.recordRelease(originalRelease) - u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) - return + return u.failRelease(upgradedRelease, results.Created, err) } if u.Recreate { @@ -405,23 +371,20 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele if u.WaitForJobs { if err := u.cfg.KubeClient.WaitWithJobs(target, u.Timeout); err != nil { u.cfg.recordRelease(originalRelease) - u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) - return + return u.failRelease(upgradedRelease, results.Created, err) } } else { if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil { u.cfg.recordRelease(originalRelease) - u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) - return + return u.failRelease(upgradedRelease, results.Created, err) } } } // post-upgrade hooks if !u.DisableHooks { - if err := u.cfg.execHook(upgradedRelease, release.HookPostUpgrade, u.Timeout); err != nil { - u.reportToPerformUpgrade(c, upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %s", err)) - return + if err := u.cfg.execHook(ctx, upgradedRelease, release.HookPostUpgrade); err != nil { + return u.failRelease(upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %w", err)) } } @@ -434,7 +397,8 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele } else { upgradedRelease.Info.Description = "Upgrade complete" } - u.reportToPerformUpgrade(c, upgradedRelease, nil, nil) + + return upgradedRelease, nil } func (u *Upgrade) failRelease(rel *release.Release, created kube.ResourceList, err error) (*release.Release, error) { diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 7b3c803f9..63a28b6fa 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -275,41 +275,67 @@ func getResource(info *resource.Info) (runtime.Object, error) { // Wait waits up to the given timeout for the specified resources to be ready. func (c *Client) Wait(resources ResourceList, timeout time.Duration) error { + // Helm 4 TODO: remove decarator around WaitWithContext. + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + + return c.WaitWithContext(ctx, resources) +} + +// WaitWithContext waits up till ctx timeout for the specified resources to be ready. +func (c *Client) WaitWithContext(ctx context.Context, resources ResourceList) error { cs, err := c.getKubeClient() if err != nil { return err } checker := NewReadyChecker(cs, c.Log, PausedAsReady(true)) w := waiter{ - c: checker, - log: c.Log, - timeout: timeout, + c: checker, + log: c.Log, } - return w.waitForResources(resources) + return w.waitForResources(ctx, resources) } // WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs. func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) error { + // Helm 4 TODO: remove decarator around WaitWithJobsContext. + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + + return c.WaitWithJobsContext(ctx, resources) +} + +// WaitWithJobsContext wait up to the given ctx timeout for the specified resources to be ready, including jobs. +func (c *Client) WaitWithJobsContext(ctx context.Context, resources ResourceList) error { cs, err := c.getKubeClient() if err != nil { return err } checker := NewReadyChecker(cs, c.Log, PausedAsReady(true), CheckJobs(true)) w := waiter{ - c: checker, - log: c.Log, - timeout: timeout, + c: checker, + log: c.Log, } - return w.waitForResources(resources) + + return w.waitForResources(ctx, resources) } // WaitForDelete wait up to the given timeout for the specified resources to be deleted. func (c *Client) WaitForDelete(resources ResourceList, timeout time.Duration) error { + // Helm 4 TODO: remove decarator around WaitWithJobsContext. + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + + return c.WaitForDeleteWithContext(ctx, resources) +} + +// WaitForDelete wait up to the given timeout for the specified resources to be deleted. +func (c *Client) WaitForDeleteWithContext(ctx context.Context, resources ResourceList) error { w := waiter{ - log: c.Log, - timeout: timeout, + log: c.Log, } - return w.waitForDeletedResources(resources) + + return w.waitForDeletedResources(ctx, resources) } func (c *Client) namespace() string { @@ -506,9 +532,9 @@ func delete(c *Client, resources ResourceList, propagation metav1.DeletionPropag return res, nil } -func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error { +func (c *Client) watchTimeout(ctx context.Context) func(*resource.Info) error { return func(info *resource.Info) error { - return c.watchUntilReady(t, info) + return c.watchUntilReady(ctx, info) } } @@ -527,9 +553,18 @@ func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error { // // Handling for other kinds will be added as necessary. func (c *Client) WatchUntilReady(resources ResourceList, timeout time.Duration) error { + // Helm 4 TODO: remove decarator around WatchUntilReadyWithContext. + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + // For jobs, there's also the option to do poll c.Jobs(namespace).Get(): // https://github.com/adamreese/kubernetes/blob/master/test/e2e/job.go#L291-L300 - return perform(resources, c.watchTimeout(timeout)) + return c.WatchUntilReadyWithContext(ctx, resources) +} + +// WatchUntilReadyWithContext -. +func (c *Client) WatchUntilReadyWithContext(ctx context.Context, resources ResourceList) error { + return perform(resources, c.watchTimeout(ctx)) } func perform(infos ResourceList, fn func(*resource.Info) error) error { @@ -594,6 +629,7 @@ func createResource(info *resource.Info) error { if err != nil { return err } + return info.Refresh(obj, true) } @@ -695,7 +731,7 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, return nil } -func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error { +func (c *Client) watchUntilReady(ctx context.Context, info *resource.Info) error { kind := info.Mapping.GroupVersionKind.Kind switch kind { case "Job", "Pod": @@ -703,7 +739,7 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err return nil } - c.Log("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeout) + c.Log("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeFromCtx(ctx)) // Use a selector on the name of the resource. This should be unique for the // given version and kind @@ -719,8 +755,6 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err // In the future, we might want to add some special logic for types // like Ingress, Volume, etc. - ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) - defer cancel() _, err = watchtools.UntilWithSync(ctx, lw, &unstructured.Unstructured{}, nil, func(e watch.Event) (bool, error) { // Make sure the incoming object is versioned as we use unstructured // objects when we build manifests @@ -811,6 +845,35 @@ func scrubValidationError(err error) error { return err } +func (c *Client) WaitAndGetCompletedPodPhaseWithContext(ctx context.Context, name string) (v1.PodPhase, error) { + client, err := c.getKubeClient() + if err != nil { + return v1.PodUnknown, err + } + + watcher, err := client.CoreV1().Pods(c.namespace()).Watch(ctx, metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", name), + }) + if err != nil { + return v1.PodUnknown, err + } + + for event := range watcher.ResultChan() { + p, ok := event.Object.(*v1.Pod) + if !ok { + return v1.PodUnknown, fmt.Errorf("%s not a pod", name) + } + switch p.Status.Phase { + case v1.PodFailed: + return v1.PodFailed, nil + case v1.PodSucceeded: + return v1.PodSucceeded, nil + } + } + + return v1.PodUnknown, err +} + // WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase // and returns said phase (PodSucceeded or PodFailed qualify). func (c *Client) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) { diff --git a/pkg/kube/fake/fake.go b/pkg/kube/fake/fake.go index 267020d57..820518ed9 100644 --- a/pkg/kube/fake/fake.go +++ b/pkg/kube/fake/fake.go @@ -18,6 +18,7 @@ limitations under the License. package fake import ( + "context" "io" "time" @@ -74,6 +75,15 @@ func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration) e return f.PrintingKubeClient.Wait(resources, d) } +// Waits the amount of time defined on f.WaitDuration, then returns the configured error if set or prints. +func (f *FailingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList) error { + time.Sleep(f.WaitDuration) + if f.WaitError != nil { + return f.WaitError + } + return f.PrintingKubeClient.Wait(resources, 0) +} + // WaitWithJobs returns the configured error if set or prints func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Duration) error { if f.WaitError != nil { @@ -82,6 +92,14 @@ func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Dur return f.PrintingKubeClient.WaitWithJobs(resources, d) } +// WaitWithJobs returns the configured error if set or prints +func (f *FailingKubeClient) WaitWithJobsContext(ctx context.Context, resources kube.ResourceList) error { + if f.WaitError != nil { + return f.WaitError + } + return f.PrintingKubeClient.WaitWithJobsContext(ctx, resources) +} + // WaitForDelete returns the configured error if set or prints func (f *FailingKubeClient) WaitForDelete(resources kube.ResourceList, d time.Duration) error { if f.WaitError != nil { @@ -90,6 +108,14 @@ func (f *FailingKubeClient) WaitForDelete(resources kube.ResourceList, d time.Du return f.PrintingKubeClient.WaitForDelete(resources, d) } +// WaitForDeleteWithContext returns the configured error if set or prints +func (f *FailingKubeClient) WaitForDeleteWithContext(ctx context.Context, resources kube.ResourceList) error { + if f.WaitError != nil { + return f.WaitError + } + return f.PrintingKubeClient.WaitForDelete(resources, 0) +} + // Delete returns the configured error if set or prints func (f *FailingKubeClient) Delete(resources kube.ResourceList) (*kube.Result, []error) { if f.DeleteError != nil { @@ -141,6 +167,14 @@ func (f *FailingKubeClient) WaitAndGetCompletedPodPhase(s string, d time.Duratio return f.PrintingKubeClient.WaitAndGetCompletedPodPhase(s, d) } +// WaitAndGetCompletedPodPhaseWithContext returns the configured error if set or prints +func (f *FailingKubeClient) WaitAndGetCompletedPodPhaseWithContext(ctx context.Context, s string) (v1.PodPhase, error) { + if f.WaitAndGetCompletedPodPhaseError != nil { + return v1.PodSucceeded, f.WaitAndGetCompletedPodPhaseError + } + return f.PrintingKubeClient.WaitAndGetCompletedPodPhase(s, 0) +} + // DeleteWithPropagationPolicy returns the configured error if set or prints func (f *FailingKubeClient) DeleteWithPropagationPolicy(resources kube.ResourceList, policy metav1.DeletionPropagation) (*kube.Result, []error) { if f.DeleteWithPropagationError != nil { @@ -149,6 +183,14 @@ func (f *FailingKubeClient) DeleteWithPropagationPolicy(resources kube.ResourceL return f.PrintingKubeClient.DeleteWithPropagationPolicy(resources, policy) } +func (f *FailingKubeClient) WatchUntilReadyWithContext(ctx context.Context, resources kube.ResourceList) error { + if f.WatchUntilReadyError != nil { + return f.WatchUntilReadyError + } + + return f.PrintingKubeClient.WatchUntilReady(resources, 0) +} + func createDummyResourceList() kube.ResourceList { var resInfo resource.Info resInfo.Name = "dummyName" @@ -158,3 +200,10 @@ func createDummyResourceList() kube.ResourceList { return resourceList } + +// compile time check that FailingKubeClient satiesfies our interfaces. +var ( + _ kube.Interface = &FailingKubeClient{} + _ kube.ContextInterface = &FailingKubeClient{} + _ kube.InterfaceExt = &FailingKubeClient{} +) diff --git a/pkg/kube/fake/printer.go b/pkg/kube/fake/printer.go index e6c4b6207..b101a4f64 100644 --- a/pkg/kube/fake/printer.go +++ b/pkg/kube/fake/printer.go @@ -17,6 +17,7 @@ limitations under the License. package fake import ( + "context" "io" "strings" "time" @@ -62,16 +63,31 @@ func (p *PrintingKubeClient) Wait(resources kube.ResourceList, _ time.Duration) return err } +func (p *PrintingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList) error { + _, err := io.Copy(p.Out, bufferize(resources)) + return err +} + func (p *PrintingKubeClient) WaitWithJobs(resources kube.ResourceList, _ time.Duration) error { _, err := io.Copy(p.Out, bufferize(resources)) return err } +func (p *PrintingKubeClient) WaitWithJobsContext(ctx context.Context, resources kube.ResourceList) error { + _, err := io.Copy(p.Out, bufferize(resources)) + return err +} + func (p *PrintingKubeClient) WaitForDelete(resources kube.ResourceList, _ time.Duration) error { _, err := io.Copy(p.Out, bufferize(resources)) return err } +func (p *PrintingKubeClient) WaitForDeleteWithContext(ctx context.Context, resources kube.ResourceList) error { + _, err := io.Copy(p.Out, bufferize(resources)) + return err +} + // Delete implements KubeClient delete. // // It only prints out the content to be deleted. @@ -89,6 +105,12 @@ func (p *PrintingKubeClient) WatchUntilReady(resources kube.ResourceList, _ time return err } +// WatchUntilReadyWithContext implements KubeClient WatchUntilReadyWithContext. +func (p *PrintingKubeClient) WatchUntilReadyWithContext(ctx context.Context, resources kube.ResourceList) error { + _, err := io.Copy(p.Out, bufferize(resources)) + return err +} + // Update implements KubeClient Update. func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ bool) (*kube.Result, error) { _, err := io.Copy(p.Out, bufferize(modified)) @@ -116,6 +138,11 @@ func (p *PrintingKubeClient) WaitAndGetCompletedPodPhase(_ string, _ time.Durati return v1.PodSucceeded, nil } +// WaitAndGetCompletedPodPhaseWithContext implements KubeClient WaitAndGetCompletedPodPhaseWithContext. +func (p *PrintingKubeClient) WaitAndGetCompletedPodPhaseWithContext(_ context.Context, _ string) (v1.PodPhase, error) { + return v1.PodSucceeded, nil +} + // DeleteWithPropagationPolicy implements KubeClient delete. // // It only prints out the content to be deleted. @@ -134,3 +161,10 @@ func bufferize(resources kube.ResourceList) io.Reader { } return strings.NewReader(builder.String()) } + +// compile time check that PrintingKubeClient satiesfies our interfaces. +var ( + _ kube.Interface = &PrintingKubeClient{} + _ kube.ContextInterface = &PrintingKubeClient{} + _ kube.InterfaceExt = &PrintingKubeClient{} +) diff --git a/pkg/kube/interface.go b/pkg/kube/interface.go index ce42ed950..35ce5a271 100644 --- a/pkg/kube/interface.go +++ b/pkg/kube/interface.go @@ -17,6 +17,7 @@ limitations under the License. package kube import ( + "context" "io" "time" @@ -72,6 +73,19 @@ type Interface interface { IsReachable() error } +// ContextInterface is introduced to avoid breaking backwards compatability for Interface implementers. +// +// TODO Helm 4: Replace Interface methods that accept a time.Duration as an argument, with a context. +type ContextInterface interface { + // WaitWithContext waits till a ctx timeout for the specified resources to be ready. + WaitWithContext(ctx context.Context, resources ResourceList) error + // WaitWithJobsContext wait up to the given ctx timeout for the specified resources to be ready, including jobs. + WaitWithJobsContext(ctx context.Context, resources ResourceList) error + WatchUntilReadyWithContext(context.Context, ResourceList) error + WaitAndGetCompletedPodPhaseWithContext(context.Context, string) (v1.PodPhase, error) + WaitForDeleteWithContext(context.Context, ResourceList) error +} + // InterfaceExt is introduced to avoid breaking backwards compatibility for Interface implementers. // // TODO Helm 4: Remove InterfaceExt and integrate its method(s) into the Interface. @@ -112,5 +126,6 @@ type InterfaceResources interface { var _ Interface = (*Client)(nil) var _ InterfaceExt = (*Client)(nil) +var _ ContextInterface = (*Client)(nil) var _ InterfaceDeletionPropagation = (*Client)(nil) var _ InterfaceResources = (*Client)(nil) diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index ecdd38940..03268f59a 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -37,18 +37,22 @@ import ( ) type waiter struct { - c ReadyChecker - timeout time.Duration - log func(string, ...interface{}) + c ReadyChecker + log func(string, ...interface{}) +} + +// timeFromCtx extracts time until deadline of ctx. +func timeFromCtx(ctx context.Context) time.Duration { + deadline, _ := ctx.Deadline() + + return time.Until(deadline) } // waitForResources polls to get the current status of all pods, PVCs, Services and // Jobs(optional) until all are ready or a timeout is reached -func (w *waiter) waitForResources(created ResourceList) error { - w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) +func (w *waiter) waitForResources(ctx context.Context, created ResourceList) error { - ctx, cancel := context.WithTimeout(context.Background(), w.timeout) - defer cancel() + w.log("beginning wait for %d resources with timeout of %v", len(created), timeFromCtx(ctx)) return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { for _, v := range created { @@ -62,11 +66,8 @@ func (w *waiter) waitForResources(created ResourceList) error { } // waitForDeletedResources polls to check if all the resources are deleted or a timeout is reached -func (w *waiter) waitForDeletedResources(deleted ResourceList) error { - w.log("beginning wait for %d resources to be deleted with timeout of %v", len(deleted), w.timeout) - - ctx, cancel := context.WithTimeout(context.Background(), w.timeout) - defer cancel() +func (w *waiter) waitForDeletedResources(ctx context.Context, deleted ResourceList) error { + w.log("beginning wait for %d resources to be deleted with timeout of %v", len(deleted), timeFromCtx(ctx)) return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { for _, v := range deleted {