diff --git a/cmd/helm/install.go b/cmd/helm/install.go index 04419d730..f72c3d388 100644 --- a/cmd/helm/install.go +++ b/cmd/helm/install.go @@ -17,8 +17,13 @@ limitations under the License. package main import ( + "context" + "fmt" "io" "log" + "os" + "os/signal" + "syscall" "time" "github.com/pkg/errors" @@ -239,7 +244,21 @@ func runInstall(args []string, client *action.Install, valueOpts *values.Options } client.Namespace = settings.Namespace() - return client.Run(chartRequested, vals) + + // Create context and prepare the handle of SIGTERM + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + + // Handle SIGTERM + cSignal := make(chan os.Signal) + signal.Notify(cSignal, os.Interrupt, syscall.SIGTERM) + go func() { + <-cSignal + fmt.Fprintf(out, "Release %s has been cancel.\n", args[0]) + cancel() + }() + + return client.RunWithContext(ctx, chartRequested, vals) } // checkIfInstallable validates if a chart can be installed diff --git a/cmd/helm/upgrade.go b/cmd/helm/upgrade.go index 3bd392d1d..862b6e341 100644 --- a/cmd/helm/upgrade.go +++ b/cmd/helm/upgrade.go @@ -17,9 +17,13 @@ limitations under the License. package main import ( + "context" "fmt" "io" "log" + "os" + "os/signal" + "syscall" "time" "github.com/pkg/errors" @@ -174,7 +178,20 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command { warning("This chart is deprecated") } - rel, err := client.Run(args[0], ch, vals) + // Create context and prepare the handle of SIGTERM + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + + // Handle SIGTERM + cSignal := make(chan os.Signal) + signal.Notify(cSignal, os.Interrupt, syscall.SIGTERM) + go func() { + <-cSignal + fmt.Fprintf(out, "Release %s has been cancel.\n", args[0]) + cancel() + }() + + rel, err := client.RunWithContext(ctx, args[0], ch, vals) if err != nil { return errors.Wrap(err, "UPGRADE FAILED") } diff --git a/pkg/action/install.go b/pkg/action/install.go index a394155ff..03a5cd7f3 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -18,16 +18,15 @@ package action import ( "bytes" + "context" "fmt" "io/ioutil" "net/url" "os" - "os/signal" "path" "path/filepath" "strings" "sync" - "syscall" "text/template" "time" @@ -179,7 +178,14 @@ func (i *Install) installCRDs(crds []chart.CRD) error { // Run executes the installation // // If DryRun is set to true, this will prepare the release, but not install it + func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.Release, error) { + ctx := context.Background() + return i.RunWithContext(ctx, chrt, vals) +} + +// Run executes the installation with Context +func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals map[string]interface{}) (*release.Release, error) { // Check reachability of cluster unless in client-only mode (e.g. `helm template` without `--validate`) if !i.ClientOnly { if err := i.cfg.KubeClient.IsReachable(); err != nil { @@ -338,7 +344,7 @@ func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release. } rChan := make(chan resultMessage) go i.performInstall(rChan, rel, toBeAdopted, resources) - go i.handleSignals(rChan, rel) + go i.handleContext(ctx, rChan, rel) result := <-rChan //start preformInstall go routine return result.r, result.e @@ -409,14 +415,11 @@ func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, t i.reportToRun(c, rel, nil) } -func (i *Install) handleSignals(c chan<- resultMessage, rel *release.Release) { - // Handle SIGINT - cSignal := make(chan os.Signal) - signal.Notify(cSignal, os.Interrupt, syscall.SIGTERM) +func (i *Install) handleContext(ctx context.Context, c chan<- resultMessage, rel *release.Release) { go func() { - <-cSignal - i.cfg.Log("SIGTERM or SIGINT received") - i.reportToRun(c, rel, fmt.Errorf("SIGTERM or SIGINT received, release failed")) + <-ctx.Done() + err := ctx.Err() + i.reportToRun(c, rel, err) }() } func (i *Install) reportToRun(c chan<- resultMessage, rel *release.Release, err error) { diff --git a/pkg/action/install_test.go b/pkg/action/install_test.go index 46fac72f3..b1844b2ce 100644 --- a/pkg/action/install_test.go +++ b/pkg/action/install_test.go @@ -17,11 +17,11 @@ limitations under the License. package action import ( + "context" "fmt" "io/ioutil" "log" "os" - "os/exec" "path/filepath" "regexp" "strings" @@ -364,60 +364,23 @@ func TestInstallRelease_Wait(t *testing.T) { is.Equal(res.Info.Status, release.StatusFailed) } func TestInstallRelease_Wait_Interrupted(t *testing.T) { - if os.Getenv("HANDLE_SIGINT") == "1" { - t.Run("Execute TestInstallRelease_Wait_Interrupted", func(t *testing.T) { - is := assert.New(t) - instAction := installAction(t) - instAction.ReleaseName = "interrupted-release" - failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) - failer.WaitDuration = 10 * time.Second - instAction.cfg.KubeClient = failer - instAction.Wait = true - vals := map[string]interface{}{} - - res, err := instAction.Run(buildChart(), vals) - is.Error(err) - is.Contains(res.Info.Description, "SIGTERM or SIGINT received, release failed") - is.Equal(res.Info.Status, release.StatusFailed) - }) - return - - } - t.Run("Setup TestInstallRelease_Wait_Interrupted", func(t *testing.T) { - cmd := exec.Command(os.Args[0], "-test.run=TestInstallRelease_Wait_Interrupted") - cmd.Env = append(os.Environ(), "HANDLE_SIGINT=1") - stdout, err := cmd.StdoutPipe() - if err != nil { - t.Fatal(err) - } - stderr, err := cmd.StderrPipe() - if err != nil { - t.Fatal(err) - } - if err := cmd.Start(); err != nil { - t.Fatal(err) - } - go func() { - slurp, _ := ioutil.ReadAll(stdout) - fmt.Printf("%s\n", slurp) - }() - - go func() { - slurp, _ := ioutil.ReadAll(stderr) - fmt.Printf("%s\n", slurp) - }() - - time.Sleep(2 * time.Second) - p, _ := os.FindProcess(cmd.Process.Pid) + is := assert.New(t) + instAction := installAction(t) + instAction.ReleaseName = "interrupted-release" + failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + failer.WaitDuration = 10 * time.Second + instAction.cfg.KubeClient = failer + instAction.Wait = true + vals := map[string]interface{}{} - if err := p.Signal(os.Interrupt); err != nil { - t.Fatal(err) - } + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + time.AfterFunc(time.Second, cancel) - if err := cmd.Wait(); err != nil { - t.FailNow() - } - }) + res, err := instAction.RunWithContext(ctx, buildChart(), vals) + is.Error(err) + is.Contains(res.Info.Description, "Release \"interrupted-release\" failed: context canceled") + is.Equal(res.Info.Status, release.StatusFailed) } func TestInstallRelease_WaitForJobs(t *testing.T) { is := assert.New(t) @@ -477,67 +440,31 @@ func TestInstallRelease_Atomic(t *testing.T) { }) } func TestInstallRelease_Atomic_Interrupted(t *testing.T) { - if os.Getenv("HANDLE_SIGINT") == "1" { - t.Run("Execute TestInstallRelease_Atomic_Interrupted", func(t *testing.T) { - is := assert.New(t) - instAction := installAction(t) - instAction.ReleaseName = "interrupted-release" - failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) - failer.WaitDuration = 10 * time.Second - instAction.cfg.KubeClient = failer - instAction.Atomic = true - vals := map[string]interface{}{} - - res, err := instAction.Run(buildChart(), vals) - is.Error(err) - is.Contains(err.Error(), "SIGTERM or SIGINT received, release failed") - is.Contains(err.Error(), "atomic") - is.Contains(err.Error(), "uninstalled") - - // Now make sure it isn't in storage any more - _, err = instAction.cfg.Releases.Get(res.Name, res.Version) - is.Error(err) - is.Equal(err, driver.ErrReleaseNotFound) - - }) - return - } - t.Run("Setup TestInstallRelease_Atomic_Interrupted", func(t *testing.T) { - cmd := exec.Command(os.Args[0], "-test.run=TestInstallRelease_Atomic_Interrupted") - cmd.Env = append(os.Environ(), "HANDLE_SIGINT=1") - stdout, err := cmd.StdoutPipe() - if err != nil { - t.Fatal(err) - } - stderr, err := cmd.StderrPipe() - if err != nil { - t.Fatal(err) - } - if err := cmd.Start(); err != nil { - t.Fatal(err) - } - go func() { - slurp, _ := ioutil.ReadAll(stdout) - fmt.Printf("%s\n", slurp) - }() + is := assert.New(t) + instAction := installAction(t) + instAction.ReleaseName = "interrupted-release" + failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + failer.WaitDuration = 10 * time.Second + instAction.cfg.KubeClient = failer + instAction.Atomic = true + vals := map[string]interface{}{} - go func() { - slurp, _ := ioutil.ReadAll(stderr) - fmt.Printf("%s\n", slurp) - }() + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + time.AfterFunc(time.Second, cancel) - time.Sleep(2 * time.Second) - p, _ := os.FindProcess(cmd.Process.Pid) + res, err := instAction.RunWithContext(ctx, buildChart(), vals) + is.Error(err) + is.Contains(err.Error(), "context canceled") + is.Contains(err.Error(), "atomic") + is.Contains(err.Error(), "uninstalled") - if err := p.Signal(os.Interrupt); err != nil { - t.Fatal(err) - } + // Now make sure it isn't in storage any more + _, err = instAction.cfg.Releases.Get(res.Name, res.Version) + is.Error(err) + is.Equal(err, driver.ErrReleaseNotFound) - if err := cmd.Wait(); err != nil { - t.FailNow() - } - }) } func TestNameTemplate(t *testing.T) { testCases := []nameTemplateTestCase{ diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index f1ca96764..f76dbbc6b 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -20,11 +20,8 @@ import ( "bytes" "context" "fmt" - "os" - "os/signal" "strings" "sync" - "syscall" "time" "github.com/pkg/errors" @@ -120,8 +117,14 @@ func NewUpgrade(cfg *Configuration) *Upgrade { } } -// Run executes the upgrade on the given release. +// Run executes the upgrade on the given release func (u *Upgrade) Run(name string, chart *chart.Chart, vals map[string]interface{}) (*release.Release, error) { + ctx := context.Background() + return u.RunWithContext(ctx, name, chart, vals) +} + +// Run executes the upgrade on the given release with context. +func (u *Upgrade) RunWithContext(ctx context.Context, name string, chart *chart.Chart, vals map[string]interface{}) (*release.Release, error) { if err := u.cfg.KubeClient.IsReachable(); err != nil { return nil, err } @@ -142,7 +145,7 @@ func (u *Upgrade) Run(name string, chart *chart.Chart, vals map[string]interface u.cfg.Releases.MaxHistory = u.MaxHistory u.cfg.Log("performing update for %s", name) - res, err := u.performUpgrade(currentRelease, upgradedRelease) + res, err := u.performUpgrade(ctx, currentRelease, upgradedRelease) if err != nil { return res, err } @@ -254,7 +257,7 @@ func (u *Upgrade) prepareUpgrade(name string, chart *chart.Chart, vals map[strin return currentRelease, upgradedRelease, err } -func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Release) (*release.Release, error) { +func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedRelease *release.Release) (*release.Release, error) { current, err := u.cfg.KubeClient.Build(bytes.NewBufferString(originalRelease.Manifest), false) if err != nil { // Checking for removed Kubernetes API error so can provide a more informative error message to the user @@ -319,7 +322,7 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea } rChan := make(chan resultMessage) go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease) - go u.handleSignals(rChan, upgradedRelease) + go u.handleContext(ctx, rChan, upgradedRelease) result := <-rChan return result.r, result.e @@ -338,14 +341,13 @@ func (u *Upgrade) reportToPerformUpgrade(c chan<- resultMessage, rel *release.Re } // Setup listener for SIGINT and SIGTERM -func (u *Upgrade) handleSignals(c chan<- resultMessage, upgradedRelease *release.Release) { - cSignal := make(chan os.Signal) - signal.Notify(cSignal, os.Interrupt, syscall.SIGTERM) +func (u *Upgrade) handleContext(ctx context.Context, c chan<- resultMessage, upgradedRelease *release.Release) { + go func() { - <-cSignal - u.cfg.Log("SIGTERM or SIGINT received") + <-ctx.Done() + err := ctx.Err() // when the atomic flag is set the ongoing release finish first and doesn't give time for the rollback happens. - u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, fmt.Errorf("SIGTERM or SIGINT received, release failed")) + u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, err) }() } func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) { diff --git a/pkg/action/upgrade_test.go b/pkg/action/upgrade_test.go index 2aad26573..7c286093e 100644 --- a/pkg/action/upgrade_test.go +++ b/pkg/action/upgrade_test.go @@ -17,10 +17,8 @@ limitations under the License. package action import ( + "context" "fmt" - "io/ioutil" - "os" - "os/exec" "testing" "time" @@ -302,132 +300,64 @@ func TestUpgradeRelease_Pending(t *testing.T) { } func TestUpgradeRelease_Interrupted_Wait(t *testing.T) { - if os.Getenv("HANDLE_SIGINT") == "1" { - t.Run("Execute TestUpgradeRelease_Interrupted_Wait", func(t *testing.T) { - is := assert.New(t) - req := require.New(t) - - upAction := upgradeAction(t) - rel := releaseStub() - rel.Name = "interrupted-release" - rel.Info.Status = release.StatusDeployed - upAction.cfg.Releases.Create(rel) - - failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) - failer.WaitDuration = 10 * time.Second - upAction.cfg.KubeClient = failer - upAction.Wait = true - vals := map[string]interface{}{} - res, err := upAction.Run(rel.Name, buildChart(), vals) - - req.Error(err) - is.Contains(res.Info.Description, "Upgrade \"interrupted-release\" failed: SIGTERM or SIGINT received, release failed") - is.Equal(res.Info.Status, release.StatusFailed) - }) - return - - } - t.Run("Setup TestUpgradeRelease_Interrupted_Wait", func(t *testing.T) { - cmd := exec.Command(os.Args[0], "-test.run=TestUpgradeRelease_Interrupted_Wait") - cmd.Env = append(os.Environ(), "HANDLE_SIGINT=1") - stdout, err := cmd.StdoutPipe() - if err != nil { - t.Fatal(err) - } - stderr, err := cmd.StderrPipe() - if err != nil { - t.Fatal(err) - } - if err := cmd.Start(); err != nil { - t.Fatal(err) - } - go func() { - slurp, _ := ioutil.ReadAll(stdout) - fmt.Printf("%s\n", slurp) - }() - go func() { - slurp, _ := ioutil.ReadAll(stderr) - fmt.Printf("%s\n", slurp) - }() + is := assert.New(t) + req := require.New(t) - time.Sleep(2 * time.Second) - p, _ := os.FindProcess(cmd.Process.Pid) + upAction := upgradeAction(t) + rel := releaseStub() + rel.Name = "interrupted-release" + rel.Info.Status = release.StatusDeployed + upAction.cfg.Releases.Create(rel) - if err := p.Signal(os.Interrupt); err != nil { - t.Fatal(err) - } + failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + failer.WaitDuration = 10 * time.Second + upAction.cfg.KubeClient = failer + upAction.Wait = true + vals := map[string]interface{}{} + + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + time.AfterFunc(time.Second, cancel) + + res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) + + req.Error(err) + is.Contains(res.Info.Description, "Upgrade \"interrupted-release\" failed: context canceled") + is.Equal(res.Info.Status, release.StatusFailed) - if err := cmd.Wait(); err != nil { - t.FailNow() - } - }) } func TestUpgradeRelease_Interrupted_Atomic(t *testing.T) { - if os.Getenv("HANDLE_SIGINT") == "1" { - t.Run("Execute TestUpgradeRelease_Interrupted_Atomic", func(t *testing.T) { - is := assert.New(t) - req := require.New(t) - - upAction := upgradeAction(t) - rel := releaseStub() - rel.Name = "interrupted-release" - rel.Info.Status = release.StatusDeployed - upAction.cfg.Releases.Create(rel) - - failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) - failer.WaitDuration = 5 * time.Second - upAction.cfg.KubeClient = failer - upAction.Atomic = true - vals := map[string]interface{}{} - res, err := upAction.Run(rel.Name, buildChart(), vals) - - req.Error(err) - is.Contains(err.Error(), "release interrupted-release failed, and has been rolled back due to atomic being set: SIGTERM or SIGINT received, release failed") - - // Now make sure it is actually upgraded - updatedRes, err := upAction.cfg.Releases.Get(res.Name, 3) - is.NoError(err) - // Should have rolled back to the previous - is.Equal(updatedRes.Info.Status, release.StatusDeployed) - }) - return - - } - t.Run("Setup TestUpgradeRelease_Interrupted_Atomic", func(t *testing.T) { - cmd := exec.Command(os.Args[0], "-test.run=TestUpgradeRelease_Interrupted_Atomic") - cmd.Env = append(os.Environ(), "HANDLE_SIGINT=1") - stdout, err := cmd.StdoutPipe() - if err != nil { - t.Fatal(err) - } - stderr, err := cmd.StderrPipe() - if err != nil { - t.Fatal(err) - } - if err := cmd.Start(); err != nil { - t.Fatal(err) - } - go func() { - slurp, _ := ioutil.ReadAll(stdout) - fmt.Printf("%s\n", slurp) - }() - go func() { - slurp, _ := ioutil.ReadAll(stderr) - fmt.Printf("%s\n", slurp) - }() + is := assert.New(t) + req := require.New(t) - time.Sleep(2 * time.Second) - p, _ := os.FindProcess(cmd.Process.Pid) + upAction := upgradeAction(t) + rel := releaseStub() + rel.Name = "interrupted-release" + rel.Info.Status = release.StatusDeployed + upAction.cfg.Releases.Create(rel) - if err := p.Signal(os.Interrupt); err != nil { - t.Fatal(err) - } + failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + failer.WaitDuration = 5 * time.Second + upAction.cfg.KubeClient = failer + upAction.Atomic = true + vals := map[string]interface{}{} + + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + time.AfterFunc(time.Second, cancel) + + res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) + + req.Error(err) + is.Contains(err.Error(), "release interrupted-release failed, and has been rolled back due to atomic being set: context canceled") + + // Now make sure it is actually upgraded + updatedRes, err := upAction.cfg.Releases.Get(res.Name, 3) + is.NoError(err) + // Should have rolled back to the previous + is.Equal(updatedRes.Info.Status, release.StatusDeployed) - if err := cmd.Wait(); err != nil { - t.FailNow() - } - }) }