pull/13134/head^2
saisrikark 1 year ago
parent 31c4974644
commit 5019f1cd69

@ -412,12 +412,11 @@ func (i *Install) performInstallCtx(ctx context.Context, rel *release.Release, t
} }
resultChan := make(chan Msg, 1) resultChan := make(chan Msg, 1)
// TODOS we are not handling context here
// figure out a way to handle the context
go func() { go func() {
rel, err := i.performInstall(rel, toBeAdopted, resources) rel, err := i.performInstall(ctx, rel, toBeAdopted, resources)
resultChan <- Msg{rel, err} resultChan <- Msg{rel, err}
}() }()
select { select {
case <-ctx.Done(): case <-ctx.Done():
err := ctx.Err() err := ctx.Err()
@ -435,7 +434,7 @@ func (i *Install) isDryRun() bool {
return false return false
} }
func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) (*release.Release, error) { func (i *Install) performInstall(ctx context.Context, rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) (*release.Release, error) {
var err error var err error
// pre-install hooks // pre-install hooks
if !i.DisableHooks { if !i.DisableHooks {
@ -458,9 +457,9 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource
if i.Wait { if i.Wait {
if i.WaitForJobs { if i.WaitForJobs {
err = i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout) err = i.cfg.KubeClient.WaitWithJobsWithContext(ctx, resources, i.Timeout)
} else { } else {
err = i.cfg.KubeClient.Wait(resources, i.Timeout) err = i.cfg.KubeClient.WaitWithContext(ctx, resources, i.Timeout)
} }
if err != nil { if err != nil {
return rel, err return rel, err

@ -363,8 +363,6 @@ func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedR
return upgradedRelease, nil return upgradedRelease, nil
} }
// TODOS here
// ensure that when we do a RunWithContext and the context is cancelled, we should remove all the existing go routines
u.cfg.Log("creating upgraded release for %s", upgradedRelease.Name) u.cfg.Log("creating upgraded release for %s", upgradedRelease.Name)
if err := u.cfg.Releases.Create(upgradedRelease); err != nil { if err := u.cfg.Releases.Create(upgradedRelease); err != nil {
return nil, err return nil, err
@ -375,7 +373,7 @@ func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedR
doneChan := make(chan interface{}) doneChan := make(chan interface{})
defer close(doneChan) defer close(doneChan)
go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease) go u.releasingUpgrade(ctx, rChan, upgradedRelease, current, target, originalRelease)
go u.handleContext(ctx, doneChan, ctxChan, upgradedRelease) go u.handleContext(ctx, doneChan, ctxChan, upgradedRelease)
select { select {
case result := <-rChan: case result := <-rChan:
@ -409,7 +407,7 @@ func (u *Upgrade) handleContext(ctx context.Context, done chan interface{}, c ch
return return
} }
} }
func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) { func (u *Upgrade) releasingUpgrade(ctx context.Context, c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) {
// pre-upgrade hooks // pre-upgrade hooks
if !u.DisableHooks { if !u.DisableHooks {
@ -443,13 +441,13 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele
"waiting for release %s resources (created: %d updated: %d deleted: %d)", "waiting for release %s resources (created: %d updated: %d deleted: %d)",
upgradedRelease.Name, len(results.Created), len(results.Updated), len(results.Deleted)) upgradedRelease.Name, len(results.Created), len(results.Updated), len(results.Deleted))
if u.WaitForJobs { if u.WaitForJobs {
if err := u.cfg.KubeClient.WaitWithJobs(target, u.Timeout); err != nil { if err := u.cfg.KubeClient.WaitWithJobsWithContext(ctx, target, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease) u.cfg.recordRelease(originalRelease)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)
return return
} }
} else { } else {
if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil { if err := u.cfg.KubeClient.WaitWithContext(ctx, target, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease) u.cfg.recordRelease(originalRelease)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)
return return

@ -320,8 +320,39 @@ func (c *Client) Wait(resources ResourceList, timeout time.Duration) error {
return w.waitForResources(resources) return w.waitForResources(resources)
} }
// WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs. // WaitWithJobs wait up to the given timeout for the specified resources to be ready or until the context is Done, including jobs.
func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) error { func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) error {
cs, err := c.getKubeClient()
if err != nil {
return err
}
checker := NewReadyChecker(cs, c.Log, PausedAsReady(true), CheckJobs(true))
w := waiter{
c: checker,
log: c.Log,
timeout: timeout,
}
return w.waitForResources(resources)
}
// WaitWithContext waits up to the given timeout for the specified resources to be ready or until the context is Done.
func (c *Client) WaitWithContext(ctx context.Context, resources ResourceList, timeout time.Duration) error {
cs, err := c.getKubeClient()
if err != nil {
return err
}
checker := NewReadyChecker(cs, c.Log, PausedAsReady(true))
w := waiter{
c: checker,
log: c.Log,
timeout: timeout,
ctx: ctx,
}
return w.waitForResources(resources)
}
// WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs.
func (c *Client) WaitWithJobsWithContext(ctx context.Context, resources ResourceList, timeout time.Duration) error {
cs, err := c.getKubeClient() cs, err := c.getKubeClient()
if err != nil { if err != nil {
return err return err

@ -18,6 +18,7 @@ limitations under the License.
package fake package fake
import ( import (
"context"
"io" "io"
"time" "time"
@ -82,6 +83,23 @@ func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Dur
return f.PrintingKubeClient.WaitWithJobs(resources, d) return f.PrintingKubeClient.WaitWithJobs(resources, d)
} }
// WaitWithContext the amount of time defined on f.WaitDuration, then returns the configured error if set or prints.
func (f *FailingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList, d time.Duration) error {
time.Sleep(f.WaitDuration)
if f.WaitError != nil {
return f.WaitError
}
return f.PrintingKubeClient.WaitWithContext(ctx, resources, d)
}
// WaitWithJobsWithContext returns the configured error if set or prints
func (f *FailingKubeClient) WaitWithJobsWithContext(ctx context.Context, resources kube.ResourceList, d time.Duration) error {
if f.WaitError != nil {
return f.WaitError
}
return f.PrintingKubeClient.WaitWithJobsWithContext(ctx, resources, d)
}
// WaitForDelete returns the configured error if set or prints // WaitForDelete returns the configured error if set or prints
func (f *FailingKubeClient) WaitForDelete(resources kube.ResourceList, d time.Duration) error { func (f *FailingKubeClient) WaitForDelete(resources kube.ResourceList, d time.Duration) error {
if f.WaitError != nil { if f.WaitError != nil {

@ -17,6 +17,7 @@ limitations under the License.
package fake package fake
import ( import (
"context"
"io" "io"
"strings" "strings"
"time" "time"
@ -67,6 +68,16 @@ func (p *PrintingKubeClient) WaitWithJobs(resources kube.ResourceList, _ time.Du
return err return err
} }
func (p *PrintingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList, _ time.Duration) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
}
func (p *PrintingKubeClient) WaitWithJobsWithContext(ctx context.Context, resources kube.ResourceList, _ time.Duration) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
}
func (p *PrintingKubeClient) WaitForDelete(resources kube.ResourceList, _ time.Duration) error { func (p *PrintingKubeClient) WaitForDelete(resources kube.ResourceList, _ time.Duration) error {
_, err := io.Copy(p.Out, bufferize(resources)) _, err := io.Copy(p.Out, bufferize(resources))
return err return err

@ -17,6 +17,7 @@ limitations under the License.
package kube package kube
import ( import (
"context"
"io" "io"
"time" "time"
@ -38,6 +39,13 @@ type Interface interface {
// WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs. // WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs.
WaitWithJobs(resources ResourceList, timeout time.Duration) error WaitWithJobs(resources ResourceList, timeout time.Duration) error
// WaitWithContext waits up to the given timeout for the specified resources to be ready or until the context is Done.
WaitWithContext(ctx context.Context, resources ResourceList, timeout time.Duration) error
// WaitWithJobsWithContext waits up to the given timeout for the specified resources to be ready or until the context is Done
// including jobs.
WaitWithJobsWithContext(ctx context.Context, resources ResourceList, timeout time.Duration) error
// Delete destroys one or more resources. // Delete destroys one or more resources.
Delete(resources ResourceList) (*Result, []error) Delete(resources ResourceList) (*Result, []error)

@ -52,6 +52,10 @@ type waiter struct {
func (w *waiter) waitForResources(created ResourceList) error { func (w *waiter) waitForResources(created ResourceList) error {
w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout)
if w.ctx == nil {
w.ctx = context.Background()
}
ctx, cancel := context.WithTimeout(w.ctx, w.timeout) ctx, cancel := context.WithTimeout(w.ctx, w.timeout)
defer cancel() defer cancel()

Loading…
Cancel
Save