Merge pull request #12088 from mslapek/fix/11971-run-with-ctx

Fix leaking goroutines in Install
pull/12405/head
Matt Farina 1 year ago committed by GitHub
commit 30254deff4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -219,6 +219,9 @@ func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.
}
// Run executes the installation with Context
//
// When the task is cancelled through ctx, the function returns and the install
// proceeds in the background.
func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals map[string]interface{}) (*release.Release, error) {
// Check reachability of cluster unless in client-only mode (e.g. `helm template` without `--validate`)
if !i.ClientOnly {
@ -385,17 +388,31 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma
// not working.
return rel, err
}
rChan := make(chan resultMessage)
ctxChan := make(chan resultMessage)
doneChan := make(chan struct{})
defer close(doneChan)
go i.performInstall(rChan, rel, toBeAdopted, resources)
go i.handleContext(ctx, ctxChan, doneChan, rel)
rel, err = i.performInstallCtx(ctx, rel, toBeAdopted, resources)
if err != nil {
rel, err = i.failRelease(rel, err)
}
return rel, err
}
func (i *Install) performInstallCtx(ctx context.Context, rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) (*release.Release, error) {
type Msg struct {
r *release.Release
e error
}
resultChan := make(chan Msg, 1)
go func() {
rel, err := i.performInstall(rel, toBeAdopted, resources)
resultChan <- Msg{rel, err}
}()
select {
case result := <-rChan:
return result.r, result.e
case result := <-ctxChan:
return result.r, result.e
case <-ctx.Done():
err := ctx.Err()
return rel, err
case msg := <-resultChan:
return msg.r, msg.e
}
}
@ -407,13 +424,12 @@ func (i *Install) isDryRun() bool {
return false
}
func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) {
func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) (*release.Release, error) {
var err error
// pre-install hooks
if !i.DisableHooks {
if err := i.cfg.execHook(rel, release.HookPreInstall, i.Timeout); err != nil {
i.reportToRun(c, rel, fmt.Errorf("failed pre-install: %s", err))
return
return rel, fmt.Errorf("failed pre-install: %s", err)
}
}
@ -421,35 +437,28 @@ func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, t
// do an update, but it's not clear whether we WANT to do an update if the re-use is set
// to true, since that is basically an upgrade operation.
if len(toBeAdopted) == 0 && len(resources) > 0 {
if _, err := i.cfg.KubeClient.Create(resources); err != nil {
i.reportToRun(c, rel, err)
return
}
_, err = i.cfg.KubeClient.Create(resources)
} else if len(resources) > 0 {
if _, err := i.cfg.KubeClient.Update(toBeAdopted, resources, i.Force); err != nil {
i.reportToRun(c, rel, err)
return
_, err = i.cfg.KubeClient.Update(toBeAdopted, resources, i.Force)
}
if err != nil {
return rel, err
}
if i.Wait {
if i.WaitForJobs {
if err := i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout); err != nil {
i.reportToRun(c, rel, err)
return
}
err = i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout)
} else {
if err := i.cfg.KubeClient.Wait(resources, i.Timeout); err != nil {
i.reportToRun(c, rel, err)
return
err = i.cfg.KubeClient.Wait(resources, i.Timeout)
}
if err != nil {
return rel, err
}
}
if !i.DisableHooks {
if err := i.cfg.execHook(rel, release.HookPostInstall, i.Timeout); err != nil {
i.reportToRun(c, rel, fmt.Errorf("failed post-install: %s", err))
return
return rel, fmt.Errorf("failed post-install: %s", err)
}
}
@ -470,25 +479,9 @@ func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, t
i.cfg.Log("failed to record the release: %s", err)
}
i.reportToRun(c, rel, nil)
}
func (i *Install) handleContext(ctx context.Context, c chan<- resultMessage, done chan struct{}, rel *release.Release) {
select {
case <-ctx.Done():
err := ctx.Err()
i.reportToRun(c, rel, err)
case <-done:
return
}
}
func (i *Install) reportToRun(c chan<- resultMessage, rel *release.Release, err error) {
i.Lock.Lock()
if err != nil {
rel, err = i.failRelease(rel, err)
}
c <- resultMessage{r: rel, e: err}
i.Lock.Unlock()
return rel, nil
}
func (i *Install) failRelease(rel *release.Release, err error) (*release.Release, error) {
rel.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", i.ReleaseName, err.Error()))
if i.Atomic {

@ -23,6 +23,7 @@ import (
"os"
"path/filepath"
"regexp"
"runtime"
"strings"
"testing"
"time"
@ -369,10 +370,14 @@ func TestInstallRelease_Wait(t *testing.T) {
instAction.Wait = true
vals := map[string]interface{}{}
goroutines := runtime.NumGoroutine()
res, err := instAction.Run(buildChart(), vals)
is.Error(err)
is.Contains(res.Info.Description, "I timed out")
is.Equal(res.Info.Status, release.StatusFailed)
is.Equal(goroutines, runtime.NumGoroutine())
}
func TestInstallRelease_Wait_Interrupted(t *testing.T) {
is := assert.New(t)
@ -388,10 +393,16 @@ func TestInstallRelease_Wait_Interrupted(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
time.AfterFunc(time.Second, cancel)
goroutines := runtime.NumGoroutine()
res, err := instAction.RunWithContext(ctx, buildChart(), vals)
is.Error(err)
is.Contains(res.Info.Description, "Release \"interrupted-release\" failed: context canceled")
is.Equal(res.Info.Status, release.StatusFailed)
is.Equal(goroutines+1, runtime.NumGoroutine()) // installation goroutine still is in background
time.Sleep(10 * time.Second) // wait for goroutine to finish
is.Equal(goroutines, runtime.NumGoroutine())
}
func TestInstallRelease_WaitForJobs(t *testing.T) {
is := assert.New(t)

Loading…
Cancel
Save