Expand kubeclient interface to avoid leaking goroutines in Upgrade/Install RunWithContext when context is cancelled

Signed-off-by: saisrikark <saisrikark@gmail.com>
pull/13135/head
saisrikark 1 year ago
parent 15f76cf83c
commit 5e73cd9aff

@ -413,9 +413,10 @@ func (i *Install) performInstallCtx(ctx context.Context, rel *release.Release, t
resultChan := make(chan Msg, 1)
go func() {
rel, err := i.performInstall(rel, toBeAdopted, resources)
rel, err := i.performInstall(ctx, rel, toBeAdopted, resources)
resultChan <- Msg{rel, err}
}()
select {
case <-ctx.Done():
err := ctx.Err()
@ -433,7 +434,7 @@ func (i *Install) isDryRun() bool {
return false
}
func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) (*release.Release, error) {
func (i *Install) performInstall(ctx context.Context, rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) (*release.Release, error) {
var err error
// pre-install hooks
if !i.DisableHooks {
@ -456,9 +457,9 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource
if i.Wait {
if i.WaitForJobs {
err = i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout)
err = i.cfg.KubeClient.WaitWithJobsWithContext(ctx, resources, i.Timeout)
} else {
err = i.cfg.KubeClient.Wait(resources, i.Timeout)
err = i.cfg.KubeClient.WaitWithContext(ctx, resources, i.Timeout)
}
if err != nil {
return rel, err

@ -367,11 +367,13 @@ func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedR
if err := u.cfg.Releases.Create(upgradedRelease); err != nil {
return nil, err
}
rChan := make(chan resultMessage)
ctxChan := make(chan resultMessage)
doneChan := make(chan interface{})
defer close(doneChan)
go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease)
go u.releasingUpgrade(ctx, rChan, upgradedRelease, current, target, originalRelease)
go u.handleContext(ctx, doneChan, ctxChan, upgradedRelease)
select {
case result := <-rChan:
@ -405,7 +407,7 @@ func (u *Upgrade) handleContext(ctx context.Context, done chan interface{}, c ch
return
}
}
func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) {
func (u *Upgrade) releasingUpgrade(ctx context.Context, c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) {
// pre-upgrade hooks
if !u.DisableHooks {
@ -439,13 +441,13 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele
"waiting for release %s resources (created: %d updated: %d deleted: %d)",
upgradedRelease.Name, len(results.Created), len(results.Updated), len(results.Deleted))
if u.WaitForJobs {
if err := u.cfg.KubeClient.WaitWithJobs(target, u.Timeout); err != nil {
if err := u.cfg.KubeClient.WaitWithJobsWithContext(ctx, target, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)
return
}
} else {
if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil {
if err := u.cfg.KubeClient.WaitWithContext(ctx, target, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)
return

@ -296,7 +296,7 @@ func (c *Client) Wait(resources ResourceList, timeout time.Duration) error {
return w.waitForResources(resources)
}
// WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs.
// WaitWithJobs wait up to the given timeout for the specified resources to be ready or until the context is Done, including jobs.
func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) error {
cs, err := c.getKubeClient()
if err != nil {
@ -311,6 +311,38 @@ func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) err
return w.waitForResources(resources)
}
// WaitWithContext waits up to the given timeout for the specified resources to be ready or until the context is Done.
func (c *Client) WaitWithContext(ctx context.Context, resources ResourceList, timeout time.Duration) error {
cs, err := c.getKubeClient()
if err != nil {
return err
}
checker := NewReadyChecker(cs, c.Log, PausedAsReady(true))
w := waiter{
c: checker,
log: c.Log,
timeout: timeout,
ctx: ctx,
}
return w.waitForResources(resources)
}
// WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs.
func (c *Client) WaitWithJobsWithContext(ctx context.Context, resources ResourceList, timeout time.Duration) error {
cs, err := c.getKubeClient()
if err != nil {
return err
}
checker := NewReadyChecker(cs, c.Log, PausedAsReady(true), CheckJobs(true))
w := waiter{
c: checker,
log: c.Log,
timeout: timeout,
ctx: ctx,
}
return w.waitForResources(resources)
}
// WaitForDelete wait up to the given timeout for the specified resources to be deleted.
func (c *Client) WaitForDelete(resources ResourceList, timeout time.Duration) error {
w := waiter{

@ -18,6 +18,7 @@ limitations under the License.
package fake
import (
"context"
"io"
"time"
@ -82,6 +83,23 @@ func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Dur
return f.PrintingKubeClient.WaitWithJobs(resources, d)
}
// WaitWithContext the amount of time defined on f.WaitDuration, then returns the configured error if set or prints.
func (f *FailingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList, d time.Duration) error {
time.Sleep(f.WaitDuration)
if f.WaitError != nil {
return f.WaitError
}
return f.PrintingKubeClient.WaitWithContext(ctx, resources, d)
}
// WaitWithJobsWithContext returns the configured error if set or prints
func (f *FailingKubeClient) WaitWithJobsWithContext(ctx context.Context, resources kube.ResourceList, d time.Duration) error {
if f.WaitError != nil {
return f.WaitError
}
return f.PrintingKubeClient.WaitWithJobsWithContext(ctx, resources, d)
}
// WaitForDelete returns the configured error if set or prints
func (f *FailingKubeClient) WaitForDelete(resources kube.ResourceList, d time.Duration) error {
if f.WaitError != nil {

@ -17,6 +17,7 @@ limitations under the License.
package fake
import (
"context"
"io"
"strings"
"time"
@ -67,6 +68,16 @@ func (p *PrintingKubeClient) WaitWithJobs(resources kube.ResourceList, _ time.Du
return err
}
func (p *PrintingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList, _ time.Duration) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
}
func (p *PrintingKubeClient) WaitWithJobsWithContext(ctx context.Context, resources kube.ResourceList, _ time.Duration) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
}
func (p *PrintingKubeClient) WaitForDelete(resources kube.ResourceList, _ time.Duration) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err

@ -17,6 +17,7 @@ limitations under the License.
package kube
import (
"context"
"io"
"time"
@ -38,6 +39,13 @@ type Interface interface {
// WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs.
WaitWithJobs(resources ResourceList, timeout time.Duration) error
// WaitWithContext waits up to the given timeout for the specified resources to be ready or until the context is Done.
WaitWithContext(ctx context.Context, resources ResourceList, timeout time.Duration) error
// WaitWithJobsWithContext waits up to the given timeout for the specified resources to be ready or until the context is Done
// including jobs.
WaitWithJobsWithContext(ctx context.Context, resources ResourceList, timeout time.Duration) error
// Delete destroys one or more resources.
Delete(resources ResourceList) (*Result, []error)

@ -42,6 +42,9 @@ type waiter struct {
c ReadyChecker
timeout time.Duration
log func(string, ...interface{})
// ctx used to to cancel and avoid go routines leaking
ctx context.Context
}
// waitForResources polls to get the current status of all pods, PVCs, Services and
@ -49,7 +52,11 @@ type waiter struct {
func (w *waiter) waitForResources(created ResourceList) error {
w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout)
ctx, cancel := context.WithTimeout(context.Background(), w.timeout)
if w.ctx == nil {
w.ctx = context.Background()
}
ctx, cancel := context.WithTimeout(w.ctx, w.timeout)
defer cancel()
numberOfErrors := make([]int, len(created))

Loading…
Cancel
Save