Fix install memory/goroutine leak

Signed-off-by: Neven Miculinic <neven.miculinic@gmail.com>
pull/10486/head
Neven Miculinic 3 years ago
parent 8ca401398d
commit 5059ae843e

@ -344,8 +344,10 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma
return rel, err return rel, err
} }
rChan := make(chan resultMessage) rChan := make(chan resultMessage)
doneChan := make(chan struct{})
defer close(doneChan)
go i.performInstall(rChan, rel, toBeAdopted, resources) go i.performInstall(rChan, rel, toBeAdopted, resources)
go i.handleContext(ctx, rChan, rel) go i.handleContext(ctx, rChan, doneChan, rel)
result := <-rChan result := <-rChan
//start preformInstall go routine //start preformInstall go routine
return result.r, result.e return result.r, result.e
@ -416,12 +418,14 @@ func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, t
i.reportToRun(c, rel, nil) i.reportToRun(c, rel, nil)
} }
func (i *Install) handleContext(ctx context.Context, c chan<- resultMessage, rel *release.Release) { func (i *Install) handleContext(ctx context.Context, c chan<- resultMessage, done chan struct{}, rel *release.Release) {
go func() { select {
<-ctx.Done() case <-ctx.Done():
err := ctx.Err() err := ctx.Err()
i.reportToRun(c, rel, err) i.reportToRun(c, rel, err)
}() case <-done:
return
}
} }
func (i *Install) reportToRun(c chan<- resultMessage, rel *release.Release, err error) { func (i *Install) reportToRun(c chan<- resultMessage, rel *release.Release, err error) {
i.Lock.Lock() i.Lock.Lock()

@ -29,6 +29,7 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"helm.sh/helm/v3/internal/test" "helm.sh/helm/v3/internal/test"
"helm.sh/helm/v3/pkg/chart" "helm.sh/helm/v3/pkg/chart"
@ -56,9 +57,12 @@ func installAction(t *testing.T) *Install {
func TestInstallRelease(t *testing.T) { func TestInstallRelease(t *testing.T) {
is := assert.New(t) is := assert.New(t)
req := require.New(t)
instAction := installAction(t) instAction := installAction(t)
vals := map[string]interface{}{} vals := map[string]interface{}{}
res, err := instAction.Run(buildChart(), vals) ctx, done := context.WithCancel(context.Background())
res, err := instAction.RunWithContext(ctx, buildChart(), vals)
if err != nil { if err != nil {
t.Fatalf("Failed install: %s", err) t.Fatalf("Failed install: %s", err)
} }
@ -77,6 +81,14 @@ func TestInstallRelease(t *testing.T) {
is.NotEqual(len(rel.Manifest), 0) is.NotEqual(len(rel.Manifest), 0)
is.Contains(rel.Manifest, "---\n# Source: hello/templates/hello\nhello: world") is.Contains(rel.Manifest, "---\n# Source: hello/templates/hello\nhello: world")
is.Equal(rel.Info.Description, "Install complete") is.Equal(rel.Info.Description, "Install complete")
// Detecting previous bug where context termination after successful release
// caused release to fail.
done()
time.Sleep(time.Millisecond * 100)
lastRelease, err := instAction.cfg.Releases.Last(rel.Name)
req.NoError(err)
is.Equal(lastRelease.Info.Status, release.StatusDeployed)
} }
func TestInstallReleaseWithValues(t *testing.T) { func TestInstallReleaseWithValues(t *testing.T) {

@ -323,11 +323,11 @@ func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedR
rChan := make(chan resultMessage) rChan := make(chan resultMessage)
ctxChan := make(chan resultMessage) ctxChan := make(chan resultMessage)
doneChan := make(chan interface{}) doneChan := make(chan interface{})
defer close(doneChan)
go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease) go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease)
go u.handleContext(ctx, doneChan, ctxChan, upgradedRelease) go u.handleContext(ctx, doneChan, ctxChan, upgradedRelease)
select { select {
case result := <-rChan: case result := <-rChan:
doneChan <- true
return result.r, result.e return result.r, result.e
case result := <-ctxChan: case result := <-ctxChan:
return result.r, result.e return result.r, result.e
@ -348,17 +348,15 @@ func (u *Upgrade) reportToPerformUpgrade(c chan<- resultMessage, rel *release.Re
// Setup listener for SIGINT and SIGTERM // Setup listener for SIGINT and SIGTERM
func (u *Upgrade) handleContext(ctx context.Context, done chan interface{}, c chan<- resultMessage, upgradedRelease *release.Release) { func (u *Upgrade) handleContext(ctx context.Context, done chan interface{}, c chan<- resultMessage, upgradedRelease *release.Release) {
go func() { select {
select { case <-ctx.Done():
case <-ctx.Done(): err := ctx.Err()
err := ctx.Err()
// when the atomic flag is set the ongoing release finish first and doesn't give time for the rollback happens.
// when the atomic flag is set the ongoing release finish first and doesn't give time for the rollback happens. u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, err)
u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, err) case <-done:
case <-done: return
return }
}
}()
} }
func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) { func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) {
// pre-upgrade hooks // pre-upgrade hooks

@ -40,6 +40,33 @@ func upgradeAction(t *testing.T) *Upgrade {
return upAction return upAction
} }
func TestUpgradeRelease_Success(t *testing.T) {
is := assert.New(t)
req := require.New(t)
upAction := upgradeAction(t)
rel := releaseStub()
rel.Name = "previous-release"
rel.Info.Status = release.StatusDeployed
req.NoError(upAction.cfg.Releases.Create(rel))
upAction.Wait = true
vals := map[string]interface{}{}
ctx, done := context.WithCancel(context.Background())
res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals)
done()
req.NoError(err)
is.Equal(res.Info.Status, release.StatusDeployed)
// Detecting previous bug where context termination after successful release
// caused release to fail.
time.Sleep(time.Millisecond * 100)
lastRelease, err := upAction.cfg.Releases.Last(rel.Name)
req.NoError(err)
is.Equal(lastRelease.Info.Status, release.StatusDeployed)
}
func TestUpgradeRelease_Wait(t *testing.T) { func TestUpgradeRelease_Wait(t *testing.T) {
is := assert.New(t) is := assert.New(t)
req := require.New(t) req := require.New(t)

Loading…
Cancel
Save