From 5059ae843ef6b504fe55f914953e249a14ff5838 Mon Sep 17 00:00:00 2001 From: Neven Miculinic Date: Mon, 20 Dec 2021 10:13:36 +0100 Subject: [PATCH] Fix install memory/goroutine leak Signed-off-by: Neven Miculinic --- pkg/action/install.go | 14 +++++++++----- pkg/action/install_test.go | 14 +++++++++++++- pkg/action/upgrade.go | 22 ++++++++++------------ pkg/action/upgrade_test.go | 27 +++++++++++++++++++++++++++ 4 files changed, 59 insertions(+), 18 deletions(-) diff --git a/pkg/action/install.go b/pkg/action/install.go index e91154515..d645a810a 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -344,8 +344,10 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma return rel, err } rChan := make(chan resultMessage) + doneChan := make(chan struct{}) + defer close(doneChan) go i.performInstall(rChan, rel, toBeAdopted, resources) - go i.handleContext(ctx, rChan, rel) + go i.handleContext(ctx, rChan, doneChan, rel) result := <-rChan //start preformInstall go routine return result.r, result.e @@ -416,12 +418,14 @@ func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, t i.reportToRun(c, rel, nil) } -func (i *Install) handleContext(ctx context.Context, c chan<- resultMessage, rel *release.Release) { - go func() { - <-ctx.Done() +func (i *Install) handleContext(ctx context.Context, c chan<- resultMessage, done chan struct{}, rel *release.Release) { + select { + case <-ctx.Done(): err := ctx.Err() i.reportToRun(c, rel, err) - }() + case <-done: + return + } } func (i *Install) reportToRun(c chan<- resultMessage, rel *release.Release, err error) { i.Lock.Lock() diff --git a/pkg/action/install_test.go b/pkg/action/install_test.go index b1844b2ce..111374f04 100644 --- a/pkg/action/install_test.go +++ b/pkg/action/install_test.go @@ -29,6 +29,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "helm.sh/helm/v3/internal/test" "helm.sh/helm/v3/pkg/chart" @@ -56,9 +57,12 @@ func installAction(t *testing.T) *Install { func TestInstallRelease(t *testing.T) { is := assert.New(t) + req := require.New(t) + instAction := installAction(t) vals := map[string]interface{}{} - res, err := instAction.Run(buildChart(), vals) + ctx, done := context.WithCancel(context.Background()) + res, err := instAction.RunWithContext(ctx, buildChart(), vals) if err != nil { t.Fatalf("Failed install: %s", err) } @@ -77,6 +81,14 @@ func TestInstallRelease(t *testing.T) { is.NotEqual(len(rel.Manifest), 0) is.Contains(rel.Manifest, "---\n# Source: hello/templates/hello\nhello: world") is.Equal(rel.Info.Description, "Install complete") + + // Detecting previous bug where context termination after successful release + // caused release to fail. + done() + time.Sleep(time.Millisecond * 100) + lastRelease, err := instAction.cfg.Releases.Last(rel.Name) + req.NoError(err) + is.Equal(lastRelease.Info.Status, release.StatusDeployed) } func TestInstallReleaseWithValues(t *testing.T) { diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index 1e7054118..2488d4432 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -323,11 +323,11 @@ func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedR rChan := make(chan resultMessage) ctxChan := make(chan resultMessage) doneChan := make(chan interface{}) + defer close(doneChan) go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease) go u.handleContext(ctx, doneChan, ctxChan, upgradedRelease) select { case result := <-rChan: - doneChan <- true return result.r, result.e case result := <-ctxChan: return result.r, result.e @@ -348,17 +348,15 @@ func (u *Upgrade) reportToPerformUpgrade(c chan<- resultMessage, rel *release.Re // Setup listener for SIGINT and SIGTERM func (u *Upgrade) handleContext(ctx context.Context, done chan interface{}, c chan<- resultMessage, upgradedRelease *release.Release) { - go func() { - select { - case <-ctx.Done(): - err := ctx.Err() - - // when the atomic flag is set the ongoing release finish first and doesn't give time for the rollback happens. - u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, err) - case <-done: - return - } - }() + select { + case <-ctx.Done(): + err := ctx.Err() + + // when the atomic flag is set the ongoing release finish first and doesn't give time for the rollback happens. + u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, err) + case <-done: + return + } } func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) { // pre-upgrade hooks diff --git a/pkg/action/upgrade_test.go b/pkg/action/upgrade_test.go index 7c286093e..62922b373 100644 --- a/pkg/action/upgrade_test.go +++ b/pkg/action/upgrade_test.go @@ -40,6 +40,33 @@ func upgradeAction(t *testing.T) *Upgrade { return upAction } +func TestUpgradeRelease_Success(t *testing.T) { + is := assert.New(t) + req := require.New(t) + + upAction := upgradeAction(t) + rel := releaseStub() + rel.Name = "previous-release" + rel.Info.Status = release.StatusDeployed + req.NoError(upAction.cfg.Releases.Create(rel)) + + upAction.Wait = true + vals := map[string]interface{}{} + + ctx, done := context.WithCancel(context.Background()) + res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) + done() + req.NoError(err) + is.Equal(res.Info.Status, release.StatusDeployed) + + // Detecting previous bug where context termination after successful release + // caused release to fail. + time.Sleep(time.Millisecond * 100) + lastRelease, err := upAction.cfg.Releases.Last(rel.Name) + req.NoError(err) + is.Equal(lastRelease.Info.Status, release.StatusDeployed) +} + func TestUpgradeRelease_Wait(t *testing.T) { is := assert.New(t) req := require.New(t)