diff --git a/pkg/action/install.go b/pkg/action/install.go index 6dce3ccbb..d5c35598a 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -413,9 +413,10 @@ func (i *Install) performInstallCtx(ctx context.Context, rel *release.Release, t resultChan := make(chan Msg, 1) go func() { - rel, err := i.performInstall(rel, toBeAdopted, resources) + rel, err := i.performInstall(ctx, rel, toBeAdopted, resources) resultChan <- Msg{rel, err} }() + select { case <-ctx.Done(): err := ctx.Err() @@ -433,7 +434,7 @@ func (i *Install) isDryRun() bool { return false } -func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) (*release.Release, error) { +func (i *Install) performInstall(ctx context.Context, rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) (*release.Release, error) { var err error // pre-install hooks if !i.DisableHooks { @@ -456,9 +457,9 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource if i.Wait { if i.WaitForJobs { - err = i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout) + err = i.cfg.KubeClient.WaitWithJobsWithContext(ctx, resources, i.Timeout) } else { - err = i.cfg.KubeClient.Wait(resources, i.Timeout) + err = i.cfg.KubeClient.WaitWithContext(ctx, resources, i.Timeout) } if err != nil { return rel, err diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index 6d26a754e..505ffb767 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -367,11 +367,13 @@ func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedR if err := u.cfg.Releases.Create(upgradedRelease); err != nil { return nil, err } + rChan := make(chan resultMessage) ctxChan := make(chan resultMessage) doneChan := make(chan interface{}) defer close(doneChan) - go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease) + + go u.releasingUpgrade(ctx, rChan, upgradedRelease, current, target, originalRelease) go u.handleContext(ctx, doneChan, ctxChan, upgradedRelease) select { case result := <-rChan: @@ -405,7 +407,7 @@ func (u *Upgrade) handleContext(ctx context.Context, done chan interface{}, c ch return } } -func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) { +func (u *Upgrade) releasingUpgrade(ctx context.Context, c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) { // pre-upgrade hooks if !u.DisableHooks { @@ -439,13 +441,13 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele "waiting for release %s resources (created: %d updated: %d deleted: %d)", upgradedRelease.Name, len(results.Created), len(results.Updated), len(results.Deleted)) if u.WaitForJobs { - if err := u.cfg.KubeClient.WaitWithJobs(target, u.Timeout); err != nil { + if err := u.cfg.KubeClient.WaitWithJobsWithContext(ctx, target, u.Timeout); err != nil { u.cfg.recordRelease(originalRelease) u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) return } } else { - if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil { + if err := u.cfg.KubeClient.WaitWithContext(ctx, target, u.Timeout); err != nil { u.cfg.recordRelease(originalRelease) u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) return diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 9df833a43..561d9edac 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -296,7 +296,7 @@ func (c *Client) Wait(resources ResourceList, timeout time.Duration) error { return w.waitForResources(resources) } -// WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs. +// WaitWithJobs wait up to the given timeout for the specified resources to be ready or until the context is Done, including jobs. func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) error { cs, err := c.getKubeClient() if err != nil { @@ -311,6 +311,38 @@ func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) err return w.waitForResources(resources) } +// WaitWithContext waits up to the given timeout for the specified resources to be ready or until the context is Done. +func (c *Client) WaitWithContext(ctx context.Context, resources ResourceList, timeout time.Duration) error { + cs, err := c.getKubeClient() + if err != nil { + return err + } + checker := NewReadyChecker(cs, c.Log, PausedAsReady(true)) + w := waiter{ + c: checker, + log: c.Log, + timeout: timeout, + ctx: ctx, + } + return w.waitForResources(resources) +} + +// WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs. +func (c *Client) WaitWithJobsWithContext(ctx context.Context, resources ResourceList, timeout time.Duration) error { + cs, err := c.getKubeClient() + if err != nil { + return err + } + checker := NewReadyChecker(cs, c.Log, PausedAsReady(true), CheckJobs(true)) + w := waiter{ + c: checker, + log: c.Log, + timeout: timeout, + ctx: ctx, + } + return w.waitForResources(resources) +} + // WaitForDelete wait up to the given timeout for the specified resources to be deleted. func (c *Client) WaitForDelete(resources ResourceList, timeout time.Duration) error { w := waiter{ diff --git a/pkg/kube/fake/fake.go b/pkg/kube/fake/fake.go index 267020d57..9856da3e6 100644 --- a/pkg/kube/fake/fake.go +++ b/pkg/kube/fake/fake.go @@ -18,6 +18,7 @@ limitations under the License. package fake import ( + "context" "io" "time" @@ -82,6 +83,23 @@ func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Dur return f.PrintingKubeClient.WaitWithJobs(resources, d) } +// WaitWithContext the amount of time defined on f.WaitDuration, then returns the configured error if set or prints. +func (f *FailingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList, d time.Duration) error { + time.Sleep(f.WaitDuration) + if f.WaitError != nil { + return f.WaitError + } + return f.PrintingKubeClient.WaitWithContext(ctx, resources, d) +} + +// WaitWithJobsWithContext returns the configured error if set or prints +func (f *FailingKubeClient) WaitWithJobsWithContext(ctx context.Context, resources kube.ResourceList, d time.Duration) error { + if f.WaitError != nil { + return f.WaitError + } + return f.PrintingKubeClient.WaitWithJobsWithContext(ctx, resources, d) +} + // WaitForDelete returns the configured error if set or prints func (f *FailingKubeClient) WaitForDelete(resources kube.ResourceList, d time.Duration) error { if f.WaitError != nil { diff --git a/pkg/kube/fake/printer.go b/pkg/kube/fake/printer.go index cc2c84b40..397fa6d7e 100644 --- a/pkg/kube/fake/printer.go +++ b/pkg/kube/fake/printer.go @@ -17,6 +17,7 @@ limitations under the License. package fake import ( + "context" "io" "strings" "time" @@ -67,6 +68,16 @@ func (p *PrintingKubeClient) WaitWithJobs(resources kube.ResourceList, _ time.Du return err } +func (p *PrintingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList, _ time.Duration) error { + _, err := io.Copy(p.Out, bufferize(resources)) + return err +} + +func (p *PrintingKubeClient) WaitWithJobsWithContext(ctx context.Context, resources kube.ResourceList, _ time.Duration) error { + _, err := io.Copy(p.Out, bufferize(resources)) + return err +} + func (p *PrintingKubeClient) WaitForDelete(resources kube.ResourceList, _ time.Duration) error { _, err := io.Copy(p.Out, bufferize(resources)) return err diff --git a/pkg/kube/interface.go b/pkg/kube/interface.go index ce42ed950..f0ba8cb05 100644 --- a/pkg/kube/interface.go +++ b/pkg/kube/interface.go @@ -17,6 +17,7 @@ limitations under the License. package kube import ( + "context" "io" "time" @@ -38,6 +39,13 @@ type Interface interface { // WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs. WaitWithJobs(resources ResourceList, timeout time.Duration) error + // WaitWithContext waits up to the given timeout for the specified resources to be ready or until the context is Done. + WaitWithContext(ctx context.Context, resources ResourceList, timeout time.Duration) error + + // WaitWithJobsWithContext waits up to the given timeout for the specified resources to be ready or until the context is Done + // including jobs. + WaitWithJobsWithContext(ctx context.Context, resources ResourceList, timeout time.Duration) error + // Delete destroys one or more resources. Delete(resources ResourceList) (*Result, []error) diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index 36110d0de..ffdc29e2a 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -42,6 +42,9 @@ type waiter struct { c ReadyChecker timeout time.Duration log func(string, ...interface{}) + + // ctx used to to cancel and avoid go routines leaking + ctx context.Context } // waitForResources polls to get the current status of all pods, PVCs, Services and @@ -49,7 +52,11 @@ type waiter struct { func (w *waiter) waitForResources(created ResourceList) error { w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) - ctx, cancel := context.WithTimeout(context.Background(), w.timeout) + if w.ctx == nil { + w.ctx = context.Background() + } + + ctx, cancel := context.WithTimeout(w.ctx, w.timeout) defer cancel() numberOfErrors := make([]int, len(created))