Merge pull request #10486 from gridai/fix-install-leak

Fix install memory/goroutine leak
pull/10618/head
Matt Farina 2 years ago committed by GitHub
commit 1ec0aacb88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -344,8 +344,10 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma
return rel, err
}
rChan := make(chan resultMessage)
doneChan := make(chan struct{})
defer close(doneChan)
go i.performInstall(rChan, rel, toBeAdopted, resources)
go i.handleContext(ctx, rChan, rel)
go i.handleContext(ctx, rChan, doneChan, rel)
result := <-rChan
//start preformInstall go routine
return result.r, result.e
@ -416,12 +418,14 @@ func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, t
i.reportToRun(c, rel, nil)
}
func (i *Install) handleContext(ctx context.Context, c chan<- resultMessage, rel *release.Release) {
go func() {
<-ctx.Done()
func (i *Install) handleContext(ctx context.Context, c chan<- resultMessage, done chan struct{}, rel *release.Release) {
select {
case <-ctx.Done():
err := ctx.Err()
i.reportToRun(c, rel, err)
}()
case <-done:
return
}
}
func (i *Install) reportToRun(c chan<- resultMessage, rel *release.Release, err error) {
i.Lock.Lock()

@ -29,6 +29,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"helm.sh/helm/v3/internal/test"
"helm.sh/helm/v3/pkg/chart"
@ -56,9 +57,12 @@ func installAction(t *testing.T) *Install {
func TestInstallRelease(t *testing.T) {
is := assert.New(t)
req := require.New(t)
instAction := installAction(t)
vals := map[string]interface{}{}
res, err := instAction.Run(buildChart(), vals)
ctx, done := context.WithCancel(context.Background())
res, err := instAction.RunWithContext(ctx, buildChart(), vals)
if err != nil {
t.Fatalf("Failed install: %s", err)
}
@ -77,6 +81,14 @@ func TestInstallRelease(t *testing.T) {
is.NotEqual(len(rel.Manifest), 0)
is.Contains(rel.Manifest, "---\n# Source: hello/templates/hello\nhello: world")
is.Equal(rel.Info.Description, "Install complete")
// Detecting previous bug where context termination after successful release
// caused release to fail.
done()
time.Sleep(time.Millisecond * 100)
lastRelease, err := instAction.cfg.Releases.Last(rel.Name)
req.NoError(err)
is.Equal(lastRelease.Info.Status, release.StatusDeployed)
}
func TestInstallReleaseWithValues(t *testing.T) {

@ -326,11 +326,11 @@ func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedR
rChan := make(chan resultMessage)
ctxChan := make(chan resultMessage)
doneChan := make(chan interface{})
defer close(doneChan)
go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease)
go u.handleContext(ctx, doneChan, ctxChan, upgradedRelease)
select {
case result := <-rChan:
doneChan <- true
return result.r, result.e
case result := <-ctxChan:
return result.r, result.e
@ -351,17 +351,15 @@ func (u *Upgrade) reportToPerformUpgrade(c chan<- resultMessage, rel *release.Re
// Setup listener for SIGINT and SIGTERM
func (u *Upgrade) handleContext(ctx context.Context, done chan interface{}, c chan<- resultMessage, upgradedRelease *release.Release) {
go func() {
select {
case <-ctx.Done():
err := ctx.Err()
// when the atomic flag is set the ongoing release finish first and doesn't give time for the rollback happens.
u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, err)
case <-done:
return
}
}()
select {
case <-ctx.Done():
err := ctx.Err()
// when the atomic flag is set the ongoing release finish first and doesn't give time for the rollback happens.
u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, err)
case <-done:
return
}
}
func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) {
// pre-upgrade hooks

@ -40,6 +40,33 @@ func upgradeAction(t *testing.T) *Upgrade {
return upAction
}
func TestUpgradeRelease_Success(t *testing.T) {
is := assert.New(t)
req := require.New(t)
upAction := upgradeAction(t)
rel := releaseStub()
rel.Name = "previous-release"
rel.Info.Status = release.StatusDeployed
req.NoError(upAction.cfg.Releases.Create(rel))
upAction.Wait = true
vals := map[string]interface{}{}
ctx, done := context.WithCancel(context.Background())
res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals)
done()
req.NoError(err)
is.Equal(res.Info.Status, release.StatusDeployed)
// Detecting previous bug where context termination after successful release
// caused release to fail.
time.Sleep(time.Millisecond * 100)
lastRelease, err := upAction.cfg.Releases.Last(rel.Name)
req.NoError(err)
is.Equal(lastRelease.Info.Status, release.StatusDeployed)
}
func TestUpgradeRelease_Wait(t *testing.T) {
is := assert.New(t)
req := require.New(t)

Loading…
Cancel
Save