Merge pull request #9180 from Moser-ss/feature-handle-SIGINT

Handle SIGTERMs during helm upgrade and helm install
pull/10081/head
Matt Farina 3 years ago committed by GitHub
commit accf82ba1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -17,8 +17,13 @@ limitations under the License.
package main
import (
"context"
"fmt"
"io"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/pkg/errors"
@ -119,7 +124,7 @@ func newInstallCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
RunE: func(_ *cobra.Command, args []string) error {
rel, err := runInstall(args, client, valueOpts, out)
if err != nil {
return err
return errors.Wrap(err, "INSTALLATION FAILED")
}
return outfmt.Write(out, &statusPrinter{rel, settings.Debug, false})
@ -243,7 +248,21 @@ func runInstall(args []string, client *action.Install, valueOpts *values.Options
}
client.Namespace = settings.Namespace()
return client.Run(chartRequested, vals)
// Create context and prepare the handle of SIGTERM
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
// Handle SIGTERM
cSignal := make(chan os.Signal)
signal.Notify(cSignal, os.Interrupt, syscall.SIGTERM)
go func() {
<-cSignal
fmt.Fprintf(out, "Release %s has been cancelled.\n", args[0])
cancel()
}()
return client.RunWithContext(ctx, chartRequested, vals)
}
// checkIfInstallable validates if a chart can be installed

@ -169,7 +169,7 @@ func TestInstall(t *testing.T) {
name: "install library chart",
cmd: "install libchart testdata/testcharts/lib-chart",
wantError: true,
golden: "output/template-lib-chart.txt",
golden: "output/install-lib-chart.txt",
},
// Install, chart with bad type
{

@ -62,7 +62,7 @@ func TestTemplateCmd(t *testing.T) {
name: "check chart bad type",
cmd: fmt.Sprintf("template '%s'", "testdata/testcharts/chart-bad-type"),
wantError: true,
golden: "output/install-chart-bad-type.txt",
golden: "output/template-chart-bad-type.txt",
},
{
name: "check chart with dependency which is an app chart acting as a library chart",

@ -1 +1 @@
Error: validation: chart.metadata.type must be application or library
Error: INSTALLATION FAILED: validation: chart.metadata.type must be application or library

@ -0,0 +1 @@
Error: INSTALLATION FAILED: validation: chart.metadata.type must be application or library

@ -1,4 +1,4 @@
Error: values don't meet the specifications of the schema(s) in the following chart(s):
Error: INSTALLATION FAILED: values don't meet the specifications of the schema(s) in the following chart(s):
empty:
- age: Must be greater than or equal to 0

@ -1,4 +1,4 @@
Error: values don't meet the specifications of the schema(s) in the following chart(s):
Error: INSTALLATION FAILED: values don't meet the specifications of the schema(s) in the following chart(s):
empty:
- (root): employmentInfo is required
- age: Must be greater than or equal to 0

@ -1,4 +1,4 @@
Error: values don't meet the specifications of the schema(s) in the following chart(s):
Error: INSTALLATION FAILED: values don't meet the specifications of the schema(s) in the following chart(s):
subchart-with-schema:
- age: Must be greater than or equal to 0

@ -1,4 +1,4 @@
Error: values don't meet the specifications of the schema(s) in the following chart(s):
Error: INSTALLATION FAILED: values don't meet the specifications of the schema(s) in the following chart(s):
chart-without-schema:
- (root): lastname is required
subchart-with-schema:

@ -0,0 +1 @@
Error: validation: chart.metadata.type must be application or library

@ -17,9 +17,13 @@ limitations under the License.
package main
import (
"context"
"fmt"
"io"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/pkg/errors"
@ -178,7 +182,20 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
warning("This chart is deprecated")
}
rel, err := client.Run(args[0], ch, vals)
// Create context and prepare the handle of SIGTERM
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
// Handle SIGTERM
cSignal := make(chan os.Signal)
signal.Notify(cSignal, os.Interrupt, syscall.SIGTERM)
go func() {
<-cSignal
fmt.Fprintf(out, "Release %s has been cancelled.\n", args[0])
cancel()
}()
rel, err := client.RunWithContext(ctx, args[0], ch, vals)
if err != nil {
return errors.Wrap(err, "UPGRADE FAILED")
}

@ -18,6 +18,7 @@ package action
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/url"
@ -25,6 +26,7 @@ import (
"path"
"path/filepath"
"strings"
"sync"
"text/template"
"time"
@ -105,6 +107,8 @@ type Install struct {
// OutputDir/<ReleaseName>
UseReleaseName bool
PostRenderer postrender.PostRenderer
// Lock to control raceconditions when the process receives a SIGTERM
Lock sync.Mutex
}
// ChartPathOptions captures common options used for controlling chart paths
@ -175,7 +179,14 @@ func (i *Install) installCRDs(crds []chart.CRD) error {
// Run executes the installation
//
// If DryRun is set to true, this will prepare the release, but not install it
func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.Release, error) {
ctx := context.Background()
return i.RunWithContext(ctx, chrt, vals)
}
// Run executes the installation with Context
func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals map[string]interface{}) (*release.Release, error) {
// Check reachability of cluster unless in client-only mode (e.g. `helm template` without `--validate`)
if !i.ClientOnly {
if err := i.cfg.KubeClient.IsReachable(); err != nil {
@ -332,11 +343,21 @@ func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.
// not working.
return rel, err
}
rChan := make(chan resultMessage)
go i.performInstall(rChan, rel, toBeAdopted, resources)
go i.handleContext(ctx, rChan, rel)
result := <-rChan
//start preformInstall go routine
return result.r, result.e
}
func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) {
// pre-install hooks
if !i.DisableHooks {
if err := i.cfg.execHook(rel, release.HookPreInstall, i.Timeout); err != nil {
return i.failRelease(rel, fmt.Errorf("failed pre-install: %s", err))
i.reportToRun(c, rel, fmt.Errorf("failed pre-install: %s", err))
return
}
}
@ -345,29 +366,34 @@ func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.
// to true, since that is basically an upgrade operation.
if len(toBeAdopted) == 0 && len(resources) > 0 {
if _, err := i.cfg.KubeClient.Create(resources); err != nil {
return i.failRelease(rel, err)
i.reportToRun(c, rel, err)
return
}
} else if len(resources) > 0 {
if _, err := i.cfg.KubeClient.Update(toBeAdopted, resources, false); err != nil {
return i.failRelease(rel, err)
i.reportToRun(c, rel, err)
return
}
}
if i.Wait {
if i.WaitForJobs {
if err := i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout); err != nil {
return i.failRelease(rel, err)
i.reportToRun(c, rel, err)
return
}
} else {
if err := i.cfg.KubeClient.Wait(resources, i.Timeout); err != nil {
return i.failRelease(rel, err)
i.reportToRun(c, rel, err)
return
}
}
}
if !i.DisableHooks {
if err := i.cfg.execHook(rel, release.HookPostInstall, i.Timeout); err != nil {
return i.failRelease(rel, fmt.Errorf("failed post-install: %s", err))
i.reportToRun(c, rel, fmt.Errorf("failed post-install: %s", err))
return
}
}
@ -388,9 +414,23 @@ func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.
i.cfg.Log("failed to record the release: %s", err)
}
return rel, nil
i.reportToRun(c, rel, nil)
}
func (i *Install) handleContext(ctx context.Context, c chan<- resultMessage, rel *release.Release) {
go func() {
<-ctx.Done()
err := ctx.Err()
i.reportToRun(c, rel, err)
}()
}
func (i *Install) reportToRun(c chan<- resultMessage, rel *release.Release, err error) {
i.Lock.Lock()
if err != nil {
rel, err = i.failRelease(rel, err)
}
c <- resultMessage{r: rel, e: err}
i.Lock.Unlock()
}
func (i *Install) failRelease(rel *release.Release, err error) (*release.Release, error) {
rel.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", i.ReleaseName, err.Error()))
if i.Atomic {

@ -17,6 +17,7 @@ limitations under the License.
package action
import (
"context"
"fmt"
"io/ioutil"
"log"
@ -25,6 +26,7 @@ import (
"regexp"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
@ -34,7 +36,7 @@ import (
kubefake "helm.sh/helm/v3/pkg/kube/fake"
"helm.sh/helm/v3/pkg/release"
"helm.sh/helm/v3/pkg/storage/driver"
"helm.sh/helm/v3/pkg/time"
helmtime "helm.sh/helm/v3/pkg/time"
)
type nameTemplateTestCase struct {
@ -361,7 +363,25 @@ func TestInstallRelease_Wait(t *testing.T) {
is.Contains(res.Info.Description, "I timed out")
is.Equal(res.Info.Status, release.StatusFailed)
}
func TestInstallRelease_Wait_Interrupted(t *testing.T) {
is := assert.New(t)
instAction := installAction(t)
instAction.ReleaseName = "interrupted-release"
failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitDuration = 10 * time.Second
instAction.cfg.KubeClient = failer
instAction.Wait = true
vals := map[string]interface{}{}
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
time.AfterFunc(time.Second, cancel)
res, err := instAction.RunWithContext(ctx, buildChart(), vals)
is.Error(err)
is.Contains(res.Info.Description, "Release \"interrupted-release\" failed: context canceled")
is.Equal(res.Info.Status, release.StatusFailed)
}
func TestInstallRelease_WaitForJobs(t *testing.T) {
is := assert.New(t)
instAction := installAction(t)
@ -419,7 +439,33 @@ func TestInstallRelease_Atomic(t *testing.T) {
is.Contains(err.Error(), "an error occurred while uninstalling the release")
})
}
func TestInstallRelease_Atomic_Interrupted(t *testing.T) {
is := assert.New(t)
instAction := installAction(t)
instAction.ReleaseName = "interrupted-release"
failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitDuration = 10 * time.Second
instAction.cfg.KubeClient = failer
instAction.Atomic = true
vals := map[string]interface{}{}
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
time.AfterFunc(time.Second, cancel)
res, err := instAction.RunWithContext(ctx, buildChart(), vals)
is.Error(err)
is.Contains(err.Error(), "context canceled")
is.Contains(err.Error(), "atomic")
is.Contains(err.Error(), "uninstalled")
// Now make sure it isn't in storage any more
_, err = instAction.cfg.Releases.Get(res.Name, res.Version)
is.Error(err)
is.Equal(err, driver.ErrReleaseNotFound)
}
func TestNameTemplate(t *testing.T) {
testCases := []nameTemplateTestCase{
// Just a straight up nop please
@ -624,32 +670,32 @@ func TestNameAndChartGenerateName(t *testing.T) {
{
"local filepath",
"./chart",
fmt.Sprintf("chart-%d", time.Now().Unix()),
fmt.Sprintf("chart-%d", helmtime.Now().Unix()),
},
{
"dot filepath",
".",
fmt.Sprintf("chart-%d", time.Now().Unix()),
fmt.Sprintf("chart-%d", helmtime.Now().Unix()),
},
{
"empty filepath",
"",
fmt.Sprintf("chart-%d", time.Now().Unix()),
fmt.Sprintf("chart-%d", helmtime.Now().Unix()),
},
{
"packaged chart",
"chart.tgz",
fmt.Sprintf("chart-%d", time.Now().Unix()),
fmt.Sprintf("chart-%d", helmtime.Now().Unix()),
},
{
"packaged chart with .tar.gz extension",
"chart.tar.gz",
fmt.Sprintf("chart-%d", time.Now().Unix()),
fmt.Sprintf("chart-%d", helmtime.Now().Unix()),
},
{
"packaged chart with local extension",
"./chart.tgz",
fmt.Sprintf("chart-%d", time.Now().Unix()),
fmt.Sprintf("chart-%d", helmtime.Now().Unix()),
},
}

@ -21,6 +21,7 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/pkg/errors"
@ -100,6 +101,13 @@ type Upgrade struct {
DisableOpenAPIValidation bool
// Get missing dependencies
DependencyUpdate bool
// Lock to control raceconditions when the process receives a SIGTERM
Lock sync.Mutex
}
type resultMessage struct {
r *release.Release
e error
}
// NewUpgrade creates a new Upgrade object with the given configuration.
@ -109,8 +117,14 @@ func NewUpgrade(cfg *Configuration) *Upgrade {
}
}
// Run executes the upgrade on the given release.
// Run executes the upgrade on the given release
func (u *Upgrade) Run(name string, chart *chart.Chart, vals map[string]interface{}) (*release.Release, error) {
ctx := context.Background()
return u.RunWithContext(ctx, name, chart, vals)
}
// Run executes the upgrade on the given release with context.
func (u *Upgrade) RunWithContext(ctx context.Context, name string, chart *chart.Chart, vals map[string]interface{}) (*release.Release, error) {
if err := u.cfg.KubeClient.IsReachable(); err != nil {
return nil, err
}
@ -131,7 +145,7 @@ func (u *Upgrade) Run(name string, chart *chart.Chart, vals map[string]interface
u.cfg.Releases.MaxHistory = u.MaxHistory
u.cfg.Log("performing update for %s", name)
res, err := u.performUpgrade(currentRelease, upgradedRelease)
res, err := u.performUpgrade(ctx, currentRelease, upgradedRelease)
if err != nil {
return res, err
}
@ -243,7 +257,7 @@ func (u *Upgrade) prepareUpgrade(name string, chart *chart.Chart, vals map[strin
return currentRelease, upgradedRelease, err
}
func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Release) (*release.Release, error) {
func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedRelease *release.Release) (*release.Release, error) {
current, err := u.cfg.KubeClient.Build(bytes.NewBufferString(originalRelease.Manifest), false)
if err != nil {
// Checking for removed Kubernetes API error so can provide a more informative error message to the user
@ -306,11 +320,43 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea
if err := u.cfg.Releases.Create(upgradedRelease); err != nil {
return nil, err
}
rChan := make(chan resultMessage)
go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease)
go u.handleContext(ctx, rChan, upgradedRelease)
result := <-rChan
return result.r, result.e
}
// Function used to lock the Mutex, this is important for the case when the atomic flag is set.
// In that case the upgrade will finish before the rollback is finished so it is necessary to wait for the rollback to finish.
// The rollback will be trigger by the function failRelease
func (u *Upgrade) reportToPerformUpgrade(c chan<- resultMessage, rel *release.Release, created kube.ResourceList, err error) {
u.Lock.Lock()
if err != nil {
rel, err = u.failRelease(rel, created, err)
}
c <- resultMessage{r: rel, e: err}
u.Lock.Unlock()
}
// Setup listener for SIGINT and SIGTERM
func (u *Upgrade) handleContext(ctx context.Context, c chan<- resultMessage, upgradedRelease *release.Release) {
go func() {
<-ctx.Done()
err := ctx.Err()
// when the atomic flag is set the ongoing release finish first and doesn't give time for the rollback happens.
u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, err)
}()
}
func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) {
// pre-upgrade hooks
if !u.DisableHooks {
if err := u.cfg.execHook(upgradedRelease, release.HookPreUpgrade, u.Timeout); err != nil {
return u.failRelease(upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %s", err))
u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %s", err))
return
}
} else {
u.cfg.Log("upgrade hooks disabled for %s", upgradedRelease.Name)
@ -319,7 +365,8 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea
results, err := u.cfg.KubeClient.Update(current, target, u.Force)
if err != nil {
u.cfg.recordRelease(originalRelease)
return u.failRelease(upgradedRelease, results.Created, err)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)
return
}
if u.Recreate {
@ -336,12 +383,14 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea
if u.WaitForJobs {
if err := u.cfg.KubeClient.WaitWithJobs(target, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease)
return u.failRelease(upgradedRelease, results.Created, err)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)
return
}
} else {
if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease)
return u.failRelease(upgradedRelease, results.Created, err)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)
return
}
}
}
@ -349,7 +398,8 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea
// post-upgrade hooks
if !u.DisableHooks {
if err := u.cfg.execHook(upgradedRelease, release.HookPostUpgrade, u.Timeout); err != nil {
return u.failRelease(upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %s", err))
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %s", err))
return
}
}
@ -362,8 +412,7 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea
} else {
upgradedRelease.Info.Description = "Upgrade complete"
}
return upgradedRelease, nil
u.reportToPerformUpgrade(c, upgradedRelease, nil, nil)
}
func (u *Upgrade) failRelease(rel *release.Release, created kube.ResourceList, err error) (*release.Release, error) {

@ -17,8 +17,10 @@ limitations under the License.
package action
import (
"context"
"fmt"
"testing"
"time"
"helm.sh/helm/v3/pkg/chart"
@ -27,7 +29,7 @@ import (
kubefake "helm.sh/helm/v3/pkg/kube/fake"
"helm.sh/helm/v3/pkg/release"
"helm.sh/helm/v3/pkg/time"
helmtime "helm.sh/helm/v3/pkg/time"
)
func upgradeAction(t *testing.T) *Upgrade {
@ -225,7 +227,7 @@ func TestUpgradeRelease_ReuseValues(t *testing.T) {
withValues(chartDefaultValues),
withMetadataDependency(dependency),
)
now := time.Now()
now := helmtime.Now()
existingValues := map[string]interface{}{
"subchart": map[string]interface{}{
"enabled": false,
@ -296,3 +298,66 @@ func TestUpgradeRelease_Pending(t *testing.T) {
_, err := upAction.Run(rel.Name, buildChart(), vals)
req.Contains(err.Error(), "progress", err)
}
func TestUpgradeRelease_Interrupted_Wait(t *testing.T) {
is := assert.New(t)
req := require.New(t)
upAction := upgradeAction(t)
rel := releaseStub()
rel.Name = "interrupted-release"
rel.Info.Status = release.StatusDeployed
upAction.cfg.Releases.Create(rel)
failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitDuration = 10 * time.Second
upAction.cfg.KubeClient = failer
upAction.Wait = true
vals := map[string]interface{}{}
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
time.AfterFunc(time.Second, cancel)
res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals)
req.Error(err)
is.Contains(res.Info.Description, "Upgrade \"interrupted-release\" failed: context canceled")
is.Equal(res.Info.Status, release.StatusFailed)
}
func TestUpgradeRelease_Interrupted_Atomic(t *testing.T) {
is := assert.New(t)
req := require.New(t)
upAction := upgradeAction(t)
rel := releaseStub()
rel.Name = "interrupted-release"
rel.Info.Status = release.StatusDeployed
upAction.cfg.Releases.Create(rel)
failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitDuration = 5 * time.Second
upAction.cfg.KubeClient = failer
upAction.Atomic = true
vals := map[string]interface{}{}
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
time.AfterFunc(time.Second, cancel)
res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals)
req.Error(err)
is.Contains(err.Error(), "release interrupted-release failed, and has been rolled back due to atomic being set: context canceled")
// Now make sure it is actually upgraded
updatedRes, err := upAction.cfg.Releases.Get(res.Name, 3)
is.NoError(err)
// Should have rolled back to the previous
is.Equal(updatedRes.Info.Status, release.StatusDeployed)
}

@ -40,6 +40,7 @@ type FailingKubeClient struct {
BuildError error
BuildUnstructuredError error
WaitAndGetCompletedPodPhaseError error
WaitDuration time.Duration
}
// Create returns the configured error if set or prints
@ -50,8 +51,9 @@ func (f *FailingKubeClient) Create(resources kube.ResourceList) (*kube.Result, e
return f.PrintingKubeClient.Create(resources)
}
// Wait returns the configured error if set or prints
// Waits the amount of time defined on f.WaitDuration, then returns the configured error if set or prints.
func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration) error {
time.Sleep(f.WaitDuration)
if f.WaitError != nil {
return f.WaitError
}

Loading…
Cancel
Save