From 5019f1cd6973debc231ec54a0973d61a72089f13 Mon Sep 17 00:00:00 2001 From: saisrikark Date: Tue, 18 Jun 2024 12:33:59 +0530 Subject: [PATCH] changs --- pkg/action/install.go | 11 +++++------ pkg/action/upgrade.go | 10 ++++------ pkg/kube/client.go | 33 ++++++++++++++++++++++++++++++++- pkg/kube/fake/fake.go | 18 ++++++++++++++++++ pkg/kube/fake/printer.go | 11 +++++++++++ pkg/kube/interface.go | 8 ++++++++ pkg/kube/wait.go | 4 ++++ 7 files changed, 82 insertions(+), 13 deletions(-) diff --git a/pkg/action/install.go b/pkg/action/install.go index dd5d4f40e..d5c35598a 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -412,12 +412,11 @@ func (i *Install) performInstallCtx(ctx context.Context, rel *release.Release, t } resultChan := make(chan Msg, 1) - // TODOS we are not handling context here - // figure out a way to handle the context go func() { - rel, err := i.performInstall(rel, toBeAdopted, resources) + rel, err := i.performInstall(ctx, rel, toBeAdopted, resources) resultChan <- Msg{rel, err} }() + select { case <-ctx.Done(): err := ctx.Err() @@ -435,7 +434,7 @@ func (i *Install) isDryRun() bool { return false } -func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) (*release.Release, error) { +func (i *Install) performInstall(ctx context.Context, rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) (*release.Release, error) { var err error // pre-install hooks if !i.DisableHooks { @@ -458,9 +457,9 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource if i.Wait { if i.WaitForJobs { - err = i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout) + err = i.cfg.KubeClient.WaitWithJobsWithContext(ctx, resources, i.Timeout) } else { - err = i.cfg.KubeClient.Wait(resources, i.Timeout) + err = i.cfg.KubeClient.WaitWithContext(ctx, resources, i.Timeout) } if err != nil { return rel, err diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index 772dd55e9..505ffb767 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -363,8 +363,6 @@ func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedR return upgradedRelease, nil } - // TODOS here - // ensure that when we do a RunWithContext and the context is cancelled, we should remove all the existing go routines u.cfg.Log("creating upgraded release for %s", upgradedRelease.Name) if err := u.cfg.Releases.Create(upgradedRelease); err != nil { return nil, err @@ -375,7 +373,7 @@ func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedR doneChan := make(chan interface{}) defer close(doneChan) - go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease) + go u.releasingUpgrade(ctx, rChan, upgradedRelease, current, target, originalRelease) go u.handleContext(ctx, doneChan, ctxChan, upgradedRelease) select { case result := <-rChan: @@ -409,7 +407,7 @@ func (u *Upgrade) handleContext(ctx context.Context, done chan interface{}, c ch return } } -func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) { +func (u *Upgrade) releasingUpgrade(ctx context.Context, c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) { // pre-upgrade hooks if !u.DisableHooks { @@ -443,13 +441,13 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele "waiting for release %s resources (created: %d updated: %d deleted: %d)", upgradedRelease.Name, len(results.Created), len(results.Updated), len(results.Deleted)) if u.WaitForJobs { - if err := u.cfg.KubeClient.WaitWithJobs(target, u.Timeout); err != nil { + if err := u.cfg.KubeClient.WaitWithJobsWithContext(ctx, target, u.Timeout); err != nil { u.cfg.recordRelease(originalRelease) u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) return } } else { - if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil { + if err := u.cfg.KubeClient.WaitWithContext(ctx, target, u.Timeout); err != nil { u.cfg.recordRelease(originalRelease) u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) return diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 5a36700b6..4353db5d0 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -320,8 +320,39 @@ func (c *Client) Wait(resources ResourceList, timeout time.Duration) error { return w.waitForResources(resources) } -// WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs. +// WaitWithJobs wait up to the given timeout for the specified resources to be ready or until the context is Done, including jobs. func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) error { + cs, err := c.getKubeClient() + if err != nil { + return err + } + checker := NewReadyChecker(cs, c.Log, PausedAsReady(true), CheckJobs(true)) + w := waiter{ + c: checker, + log: c.Log, + timeout: timeout, + } + return w.waitForResources(resources) +} + +// WaitWithContext waits up to the given timeout for the specified resources to be ready or until the context is Done. +func (c *Client) WaitWithContext(ctx context.Context, resources ResourceList, timeout time.Duration) error { + cs, err := c.getKubeClient() + if err != nil { + return err + } + checker := NewReadyChecker(cs, c.Log, PausedAsReady(true)) + w := waiter{ + c: checker, + log: c.Log, + timeout: timeout, + ctx: ctx, + } + return w.waitForResources(resources) +} + +// WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs. +func (c *Client) WaitWithJobsWithContext(ctx context.Context, resources ResourceList, timeout time.Duration) error { cs, err := c.getKubeClient() if err != nil { return err diff --git a/pkg/kube/fake/fake.go b/pkg/kube/fake/fake.go index 267020d57..9856da3e6 100644 --- a/pkg/kube/fake/fake.go +++ b/pkg/kube/fake/fake.go @@ -18,6 +18,7 @@ limitations under the License. package fake import ( + "context" "io" "time" @@ -82,6 +83,23 @@ func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Dur return f.PrintingKubeClient.WaitWithJobs(resources, d) } +// WaitWithContext the amount of time defined on f.WaitDuration, then returns the configured error if set or prints. +func (f *FailingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList, d time.Duration) error { + time.Sleep(f.WaitDuration) + if f.WaitError != nil { + return f.WaitError + } + return f.PrintingKubeClient.WaitWithContext(ctx, resources, d) +} + +// WaitWithJobsWithContext returns the configured error if set or prints +func (f *FailingKubeClient) WaitWithJobsWithContext(ctx context.Context, resources kube.ResourceList, d time.Duration) error { + if f.WaitError != nil { + return f.WaitError + } + return f.PrintingKubeClient.WaitWithJobsWithContext(ctx, resources, d) +} + // WaitForDelete returns the configured error if set or prints func (f *FailingKubeClient) WaitForDelete(resources kube.ResourceList, d time.Duration) error { if f.WaitError != nil { diff --git a/pkg/kube/fake/printer.go b/pkg/kube/fake/printer.go index cc2c84b40..397fa6d7e 100644 --- a/pkg/kube/fake/printer.go +++ b/pkg/kube/fake/printer.go @@ -17,6 +17,7 @@ limitations under the License. package fake import ( + "context" "io" "strings" "time" @@ -67,6 +68,16 @@ func (p *PrintingKubeClient) WaitWithJobs(resources kube.ResourceList, _ time.Du return err } +func (p *PrintingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList, _ time.Duration) error { + _, err := io.Copy(p.Out, bufferize(resources)) + return err +} + +func (p *PrintingKubeClient) WaitWithJobsWithContext(ctx context.Context, resources kube.ResourceList, _ time.Duration) error { + _, err := io.Copy(p.Out, bufferize(resources)) + return err +} + func (p *PrintingKubeClient) WaitForDelete(resources kube.ResourceList, _ time.Duration) error { _, err := io.Copy(p.Out, bufferize(resources)) return err diff --git a/pkg/kube/interface.go b/pkg/kube/interface.go index ce42ed950..f0ba8cb05 100644 --- a/pkg/kube/interface.go +++ b/pkg/kube/interface.go @@ -17,6 +17,7 @@ limitations under the License. package kube import ( + "context" "io" "time" @@ -38,6 +39,13 @@ type Interface interface { // WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs. WaitWithJobs(resources ResourceList, timeout time.Duration) error + // WaitWithContext waits up to the given timeout for the specified resources to be ready or until the context is Done. + WaitWithContext(ctx context.Context, resources ResourceList, timeout time.Duration) error + + // WaitWithJobsWithContext waits up to the given timeout for the specified resources to be ready or until the context is Done + // including jobs. + WaitWithJobsWithContext(ctx context.Context, resources ResourceList, timeout time.Duration) error + // Delete destroys one or more resources. Delete(resources ResourceList) (*Result, []error) diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index ea2e0fe37..ffdc29e2a 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -52,6 +52,10 @@ type waiter struct { func (w *waiter) waitForResources(created ResourceList) error { w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) + if w.ctx == nil { + w.ctx = context.Background() + } + ctx, cancel := context.WithTimeout(w.ctx, w.timeout) defer cancel()