Refactor SIGTERM logic

Use context to handle SIGTERM in the cmd/helm instead of pkg/action

Signed-off-by: Stephane Moser <moser.sts@gmail.com>
pull/9180/head
Stephane Moser 3 years ago
parent 4026190e7b
commit c62ce12bed

@ -17,8 +17,13 @@ limitations under the License.
package main package main
import ( import (
"context"
"fmt"
"io" "io"
"log" "log"
"os"
"os/signal"
"syscall"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -239,7 +244,21 @@ func runInstall(args []string, client *action.Install, valueOpts *values.Options
} }
client.Namespace = settings.Namespace() client.Namespace = settings.Namespace()
return client.Run(chartRequested, vals)
// Create context and prepare the handle of SIGTERM
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
// Handle SIGTERM
cSignal := make(chan os.Signal)
signal.Notify(cSignal, os.Interrupt, syscall.SIGTERM)
go func() {
<-cSignal
fmt.Fprintf(out, "Release %s has been cancel.\n", args[0])
cancel()
}()
return client.RunWithContext(ctx, chartRequested, vals)
} }
// checkIfInstallable validates if a chart can be installed // checkIfInstallable validates if a chart can be installed

@ -17,9 +17,13 @@ limitations under the License.
package main package main
import ( import (
"context"
"fmt" "fmt"
"io" "io"
"log" "log"
"os"
"os/signal"
"syscall"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -174,7 +178,20 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
warning("This chart is deprecated") warning("This chart is deprecated")
} }
rel, err := client.Run(args[0], ch, vals) // Create context and prepare the handle of SIGTERM
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
// Handle SIGTERM
cSignal := make(chan os.Signal)
signal.Notify(cSignal, os.Interrupt, syscall.SIGTERM)
go func() {
<-cSignal
fmt.Fprintf(out, "Release %s has been cancel.\n", args[0])
cancel()
}()
rel, err := client.RunWithContext(ctx, args[0], ch, vals)
if err != nil { if err != nil {
return errors.Wrap(err, "UPGRADE FAILED") return errors.Wrap(err, "UPGRADE FAILED")
} }

@ -18,16 +18,15 @@ package action
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/url" "net/url"
"os" "os"
"os/signal"
"path" "path"
"path/filepath" "path/filepath"
"strings" "strings"
"sync" "sync"
"syscall"
"text/template" "text/template"
"time" "time"
@ -179,7 +178,14 @@ func (i *Install) installCRDs(crds []chart.CRD) error {
// Run executes the installation // Run executes the installation
// //
// If DryRun is set to true, this will prepare the release, but not install it // If DryRun is set to true, this will prepare the release, but not install it
func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.Release, error) { func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.Release, error) {
ctx := context.Background()
return i.RunWithContext(ctx, chrt, vals)
}
// Run executes the installation with Context
func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals map[string]interface{}) (*release.Release, error) {
// Check reachability of cluster unless in client-only mode (e.g. `helm template` without `--validate`) // Check reachability of cluster unless in client-only mode (e.g. `helm template` without `--validate`)
if !i.ClientOnly { if !i.ClientOnly {
if err := i.cfg.KubeClient.IsReachable(); err != nil { if err := i.cfg.KubeClient.IsReachable(); err != nil {
@ -338,7 +344,7 @@ func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.
} }
rChan := make(chan resultMessage) rChan := make(chan resultMessage)
go i.performInstall(rChan, rel, toBeAdopted, resources) go i.performInstall(rChan, rel, toBeAdopted, resources)
go i.handleSignals(rChan, rel) go i.handleContext(ctx, rChan, rel)
result := <-rChan result := <-rChan
//start preformInstall go routine //start preformInstall go routine
return result.r, result.e return result.r, result.e
@ -409,14 +415,11 @@ func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, t
i.reportToRun(c, rel, nil) i.reportToRun(c, rel, nil)
} }
func (i *Install) handleSignals(c chan<- resultMessage, rel *release.Release) { func (i *Install) handleContext(ctx context.Context, c chan<- resultMessage, rel *release.Release) {
// Handle SIGINT
cSignal := make(chan os.Signal)
signal.Notify(cSignal, os.Interrupt, syscall.SIGTERM)
go func() { go func() {
<-cSignal <-ctx.Done()
i.cfg.Log("SIGTERM or SIGINT received") err := ctx.Err()
i.reportToRun(c, rel, fmt.Errorf("SIGTERM or SIGINT received, release failed")) i.reportToRun(c, rel, err)
}() }()
} }
func (i *Install) reportToRun(c chan<- resultMessage, rel *release.Release, err error) { func (i *Install) reportToRun(c chan<- resultMessage, rel *release.Release, err error) {

@ -17,11 +17,11 @@ limitations under the License.
package action package action
import ( import (
"context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"os" "os"
"os/exec"
"path/filepath" "path/filepath"
"regexp" "regexp"
"strings" "strings"
@ -364,60 +364,23 @@ func TestInstallRelease_Wait(t *testing.T) {
is.Equal(res.Info.Status, release.StatusFailed) is.Equal(res.Info.Status, release.StatusFailed)
} }
func TestInstallRelease_Wait_Interrupted(t *testing.T) { func TestInstallRelease_Wait_Interrupted(t *testing.T) {
if os.Getenv("HANDLE_SIGINT") == "1" { is := assert.New(t)
t.Run("Execute TestInstallRelease_Wait_Interrupted", func(t *testing.T) { instAction := installAction(t)
is := assert.New(t) instAction.ReleaseName = "interrupted-release"
instAction := installAction(t) failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
instAction.ReleaseName = "interrupted-release" failer.WaitDuration = 10 * time.Second
failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) instAction.cfg.KubeClient = failer
failer.WaitDuration = 10 * time.Second instAction.Wait = true
instAction.cfg.KubeClient = failer vals := map[string]interface{}{}
instAction.Wait = true
vals := map[string]interface{}{}
res, err := instAction.Run(buildChart(), vals)
is.Error(err)
is.Contains(res.Info.Description, "SIGTERM or SIGINT received, release failed")
is.Equal(res.Info.Status, release.StatusFailed)
})
return
}
t.Run("Setup TestInstallRelease_Wait_Interrupted", func(t *testing.T) {
cmd := exec.Command(os.Args[0], "-test.run=TestInstallRelease_Wait_Interrupted")
cmd.Env = append(os.Environ(), "HANDLE_SIGINT=1")
stdout, err := cmd.StdoutPipe()
if err != nil {
t.Fatal(err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
t.Fatal(err)
}
if err := cmd.Start(); err != nil {
t.Fatal(err)
}
go func() {
slurp, _ := ioutil.ReadAll(stdout)
fmt.Printf("%s\n", slurp)
}()
go func() {
slurp, _ := ioutil.ReadAll(stderr)
fmt.Printf("%s\n", slurp)
}()
time.Sleep(2 * time.Second)
p, _ := os.FindProcess(cmd.Process.Pid)
if err := p.Signal(os.Interrupt); err != nil { ctx := context.Background()
t.Fatal(err) ctx, cancel := context.WithCancel(ctx)
} time.AfterFunc(time.Second, cancel)
if err := cmd.Wait(); err != nil { res, err := instAction.RunWithContext(ctx, buildChart(), vals)
t.FailNow() is.Error(err)
} is.Contains(res.Info.Description, "Release \"interrupted-release\" failed: context canceled")
}) is.Equal(res.Info.Status, release.StatusFailed)
} }
func TestInstallRelease_WaitForJobs(t *testing.T) { func TestInstallRelease_WaitForJobs(t *testing.T) {
is := assert.New(t) is := assert.New(t)
@ -477,67 +440,31 @@ func TestInstallRelease_Atomic(t *testing.T) {
}) })
} }
func TestInstallRelease_Atomic_Interrupted(t *testing.T) { func TestInstallRelease_Atomic_Interrupted(t *testing.T) {
if os.Getenv("HANDLE_SIGINT") == "1" {
t.Run("Execute TestInstallRelease_Atomic_Interrupted", func(t *testing.T) {
is := assert.New(t)
instAction := installAction(t)
instAction.ReleaseName = "interrupted-release"
failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitDuration = 10 * time.Second
instAction.cfg.KubeClient = failer
instAction.Atomic = true
vals := map[string]interface{}{}
res, err := instAction.Run(buildChart(), vals)
is.Error(err)
is.Contains(err.Error(), "SIGTERM or SIGINT received, release failed")
is.Contains(err.Error(), "atomic")
is.Contains(err.Error(), "uninstalled")
// Now make sure it isn't in storage any more
_, err = instAction.cfg.Releases.Get(res.Name, res.Version)
is.Error(err)
is.Equal(err, driver.ErrReleaseNotFound)
})
return
} is := assert.New(t)
t.Run("Setup TestInstallRelease_Atomic_Interrupted", func(t *testing.T) { instAction := installAction(t)
cmd := exec.Command(os.Args[0], "-test.run=TestInstallRelease_Atomic_Interrupted") instAction.ReleaseName = "interrupted-release"
cmd.Env = append(os.Environ(), "HANDLE_SIGINT=1") failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
stdout, err := cmd.StdoutPipe() failer.WaitDuration = 10 * time.Second
if err != nil { instAction.cfg.KubeClient = failer
t.Fatal(err) instAction.Atomic = true
} vals := map[string]interface{}{}
stderr, err := cmd.StderrPipe()
if err != nil {
t.Fatal(err)
}
if err := cmd.Start(); err != nil {
t.Fatal(err)
}
go func() {
slurp, _ := ioutil.ReadAll(stdout)
fmt.Printf("%s\n", slurp)
}()
go func() { ctx := context.Background()
slurp, _ := ioutil.ReadAll(stderr) ctx, cancel := context.WithCancel(ctx)
fmt.Printf("%s\n", slurp) time.AfterFunc(time.Second, cancel)
}()
time.Sleep(2 * time.Second) res, err := instAction.RunWithContext(ctx, buildChart(), vals)
p, _ := os.FindProcess(cmd.Process.Pid) is.Error(err)
is.Contains(err.Error(), "context canceled")
is.Contains(err.Error(), "atomic")
is.Contains(err.Error(), "uninstalled")
if err := p.Signal(os.Interrupt); err != nil { // Now make sure it isn't in storage any more
t.Fatal(err) _, err = instAction.cfg.Releases.Get(res.Name, res.Version)
} is.Error(err)
is.Equal(err, driver.ErrReleaseNotFound)
if err := cmd.Wait(); err != nil {
t.FailNow()
}
})
} }
func TestNameTemplate(t *testing.T) { func TestNameTemplate(t *testing.T) {
testCases := []nameTemplateTestCase{ testCases := []nameTemplateTestCase{

@ -20,11 +20,8 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"os"
"os/signal"
"strings" "strings"
"sync" "sync"
"syscall"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -120,8 +117,14 @@ func NewUpgrade(cfg *Configuration) *Upgrade {
} }
} }
// Run executes the upgrade on the given release. // Run executes the upgrade on the given release
func (u *Upgrade) Run(name string, chart *chart.Chart, vals map[string]interface{}) (*release.Release, error) { func (u *Upgrade) Run(name string, chart *chart.Chart, vals map[string]interface{}) (*release.Release, error) {
ctx := context.Background()
return u.RunWithContext(ctx, name, chart, vals)
}
// Run executes the upgrade on the given release with context.
func (u *Upgrade) RunWithContext(ctx context.Context, name string, chart *chart.Chart, vals map[string]interface{}) (*release.Release, error) {
if err := u.cfg.KubeClient.IsReachable(); err != nil { if err := u.cfg.KubeClient.IsReachable(); err != nil {
return nil, err return nil, err
} }
@ -142,7 +145,7 @@ func (u *Upgrade) Run(name string, chart *chart.Chart, vals map[string]interface
u.cfg.Releases.MaxHistory = u.MaxHistory u.cfg.Releases.MaxHistory = u.MaxHistory
u.cfg.Log("performing update for %s", name) u.cfg.Log("performing update for %s", name)
res, err := u.performUpgrade(currentRelease, upgradedRelease) res, err := u.performUpgrade(ctx, currentRelease, upgradedRelease)
if err != nil { if err != nil {
return res, err return res, err
} }
@ -254,7 +257,7 @@ func (u *Upgrade) prepareUpgrade(name string, chart *chart.Chart, vals map[strin
return currentRelease, upgradedRelease, err return currentRelease, upgradedRelease, err
} }
func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Release) (*release.Release, error) { func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedRelease *release.Release) (*release.Release, error) {
current, err := u.cfg.KubeClient.Build(bytes.NewBufferString(originalRelease.Manifest), false) current, err := u.cfg.KubeClient.Build(bytes.NewBufferString(originalRelease.Manifest), false)
if err != nil { if err != nil {
// Checking for removed Kubernetes API error so can provide a more informative error message to the user // Checking for removed Kubernetes API error so can provide a more informative error message to the user
@ -319,7 +322,7 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea
} }
rChan := make(chan resultMessage) rChan := make(chan resultMessage)
go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease) go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease)
go u.handleSignals(rChan, upgradedRelease) go u.handleContext(ctx, rChan, upgradedRelease)
result := <-rChan result := <-rChan
return result.r, result.e return result.r, result.e
@ -338,14 +341,13 @@ func (u *Upgrade) reportToPerformUpgrade(c chan<- resultMessage, rel *release.Re
} }
// Setup listener for SIGINT and SIGTERM // Setup listener for SIGINT and SIGTERM
func (u *Upgrade) handleSignals(c chan<- resultMessage, upgradedRelease *release.Release) { func (u *Upgrade) handleContext(ctx context.Context, c chan<- resultMessage, upgradedRelease *release.Release) {
cSignal := make(chan os.Signal)
signal.Notify(cSignal, os.Interrupt, syscall.SIGTERM)
go func() { go func() {
<-cSignal <-ctx.Done()
u.cfg.Log("SIGTERM or SIGINT received") err := ctx.Err()
// when the atomic flag is set the ongoing release finish first and doesn't give time for the rollback happens. // when the atomic flag is set the ongoing release finish first and doesn't give time for the rollback happens.
u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, fmt.Errorf("SIGTERM or SIGINT received, release failed")) u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, err)
}() }()
} }
func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) { func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) {

@ -17,10 +17,8 @@ limitations under the License.
package action package action
import ( import (
"context"
"fmt" "fmt"
"io/ioutil"
"os"
"os/exec"
"testing" "testing"
"time" "time"
@ -302,132 +300,64 @@ func TestUpgradeRelease_Pending(t *testing.T) {
} }
func TestUpgradeRelease_Interrupted_Wait(t *testing.T) { func TestUpgradeRelease_Interrupted_Wait(t *testing.T) {
if os.Getenv("HANDLE_SIGINT") == "1" {
t.Run("Execute TestUpgradeRelease_Interrupted_Wait", func(t *testing.T) {
is := assert.New(t)
req := require.New(t)
upAction := upgradeAction(t)
rel := releaseStub()
rel.Name = "interrupted-release"
rel.Info.Status = release.StatusDeployed
upAction.cfg.Releases.Create(rel)
failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitDuration = 10 * time.Second
upAction.cfg.KubeClient = failer
upAction.Wait = true
vals := map[string]interface{}{}
res, err := upAction.Run(rel.Name, buildChart(), vals)
req.Error(err)
is.Contains(res.Info.Description, "Upgrade \"interrupted-release\" failed: SIGTERM or SIGINT received, release failed")
is.Equal(res.Info.Status, release.StatusFailed)
})
return
}
t.Run("Setup TestUpgradeRelease_Interrupted_Wait", func(t *testing.T) {
cmd := exec.Command(os.Args[0], "-test.run=TestUpgradeRelease_Interrupted_Wait")
cmd.Env = append(os.Environ(), "HANDLE_SIGINT=1")
stdout, err := cmd.StdoutPipe()
if err != nil {
t.Fatal(err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
t.Fatal(err)
}
if err := cmd.Start(); err != nil {
t.Fatal(err)
}
go func() {
slurp, _ := ioutil.ReadAll(stdout)
fmt.Printf("%s\n", slurp)
}()
go func() { is := assert.New(t)
slurp, _ := ioutil.ReadAll(stderr) req := require.New(t)
fmt.Printf("%s\n", slurp)
}()
time.Sleep(2 * time.Second) upAction := upgradeAction(t)
p, _ := os.FindProcess(cmd.Process.Pid) rel := releaseStub()
rel.Name = "interrupted-release"
rel.Info.Status = release.StatusDeployed
upAction.cfg.Releases.Create(rel)
if err := p.Signal(os.Interrupt); err != nil { failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
t.Fatal(err) failer.WaitDuration = 10 * time.Second
} upAction.cfg.KubeClient = failer
upAction.Wait = true
vals := map[string]interface{}{}
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
time.AfterFunc(time.Second, cancel)
res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals)
req.Error(err)
is.Contains(res.Info.Description, "Upgrade \"interrupted-release\" failed: context canceled")
is.Equal(res.Info.Status, release.StatusFailed)
if err := cmd.Wait(); err != nil {
t.FailNow()
}
})
} }
func TestUpgradeRelease_Interrupted_Atomic(t *testing.T) { func TestUpgradeRelease_Interrupted_Atomic(t *testing.T) {
if os.Getenv("HANDLE_SIGINT") == "1" {
t.Run("Execute TestUpgradeRelease_Interrupted_Atomic", func(t *testing.T) {
is := assert.New(t)
req := require.New(t)
upAction := upgradeAction(t)
rel := releaseStub()
rel.Name = "interrupted-release"
rel.Info.Status = release.StatusDeployed
upAction.cfg.Releases.Create(rel)
failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitDuration = 5 * time.Second
upAction.cfg.KubeClient = failer
upAction.Atomic = true
vals := map[string]interface{}{}
res, err := upAction.Run(rel.Name, buildChart(), vals)
req.Error(err)
is.Contains(err.Error(), "release interrupted-release failed, and has been rolled back due to atomic being set: SIGTERM or SIGINT received, release failed")
// Now make sure it is actually upgraded
updatedRes, err := upAction.cfg.Releases.Get(res.Name, 3)
is.NoError(err)
// Should have rolled back to the previous
is.Equal(updatedRes.Info.Status, release.StatusDeployed)
})
return
}
t.Run("Setup TestUpgradeRelease_Interrupted_Atomic", func(t *testing.T) {
cmd := exec.Command(os.Args[0], "-test.run=TestUpgradeRelease_Interrupted_Atomic")
cmd.Env = append(os.Environ(), "HANDLE_SIGINT=1")
stdout, err := cmd.StdoutPipe()
if err != nil {
t.Fatal(err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
t.Fatal(err)
}
if err := cmd.Start(); err != nil {
t.Fatal(err)
}
go func() {
slurp, _ := ioutil.ReadAll(stdout)
fmt.Printf("%s\n", slurp)
}()
go func() { is := assert.New(t)
slurp, _ := ioutil.ReadAll(stderr) req := require.New(t)
fmt.Printf("%s\n", slurp)
}()
time.Sleep(2 * time.Second) upAction := upgradeAction(t)
p, _ := os.FindProcess(cmd.Process.Pid) rel := releaseStub()
rel.Name = "interrupted-release"
rel.Info.Status = release.StatusDeployed
upAction.cfg.Releases.Create(rel)
if err := p.Signal(os.Interrupt); err != nil { failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
t.Fatal(err) failer.WaitDuration = 5 * time.Second
} upAction.cfg.KubeClient = failer
upAction.Atomic = true
vals := map[string]interface{}{}
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
time.AfterFunc(time.Second, cancel)
res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals)
req.Error(err)
is.Contains(err.Error(), "release interrupted-release failed, and has been rolled back due to atomic being set: context canceled")
// Now make sure it is actually upgraded
updatedRes, err := upAction.cfg.Releases.Get(res.Name, 3)
is.NoError(err)
// Should have rolled back to the previous
is.Equal(updatedRes.Info.Status, release.StatusDeployed)
if err := cmd.Wait(); err != nil {
t.FailNow()
}
})
} }

Loading…
Cancel
Save