pull/12109/merge
Josh Rowley 2 years ago committed by GitHub
commit a35464b3f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -17,17 +17,19 @@ package action
import ( import (
"bytes" "bytes"
"context"
"sort" "sort"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
"helm.sh/helm/v3/pkg/kube"
"helm.sh/helm/v3/pkg/release" "helm.sh/helm/v3/pkg/release"
helmtime "helm.sh/helm/v3/pkg/time" helmtime "helm.sh/helm/v3/pkg/time"
) )
// execHook executes all of the hooks for the given hook event. // execHook executes all of the hooks for the given hook event.
func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent, timeout time.Duration) error { func (cfg *Configuration) execHook(timeout time.Duration, rl *release.Release, hook release.HookEvent) error {
executingHooks := []*release.Hook{} executingHooks := []*release.Hook{}
for _, h := range rl.Hooks { for _, h := range rl.Hooks {
@ -79,8 +81,20 @@ func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent,
return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path) return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path)
} }
// Watch hook resources until they have completed // Check if kube.Interface implementation satisfies kube.ContextInterface interface.
err = cfg.KubeClient.WatchUntilReady(resources, timeout) // If not, fallback to time based watch to maintain backwards compatibility.
if kubeClient, ok := cfg.KubeClient.(kube.ContextInterface); ok {
// Helm 4 TODO: WatchUntilReady should be replaced with it's context
// aware counterpart.
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
// Watch hook resources until they have completed
err = kubeClient.WatchUntilReadyWithContext(ctx, resources)
} else {
//
err = cfg.KubeClient.WatchUntilReady(resources, timeout)
}
// Note the time of success/failure // Note the time of success/failure
h.LastRun.CompletedAt = helmtime.Now() h.LastRun.CompletedAt = helmtime.Now()
// Mark hook as succeeded or failed // Mark hook as succeeded or failed

@ -212,10 +212,8 @@ func (i *Install) installCRDs(crds []chart.CRD) error {
// Run executes the installation // Run executes the installation
// //
// If DryRun is set to true, this will prepare the release, but not install it // If DryRun is set to true, this will prepare the release, but not install it
func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.Release, error) { func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.Release, error) {
ctx := context.Background() return i.RunWithContext(context.TODO(), chrt, vals)
return i.RunWithContext(ctx, chrt, vals)
} }
// Run executes the installation with Context // Run executes the installation with Context
@ -389,31 +387,12 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma
return rel, err return rel, err
} }
rel, err = i.performInstallCtx(ctx, rel, toBeAdopted, resources) rel, err = i.performInstall(ctx, rel, toBeAdopted, resources)
if err != nil { if err != nil {
rel, err = i.failRelease(rel, err) rel, err = i.failRelease(rel, err)
} }
return rel, err
}
func (i *Install) performInstallCtx(ctx context.Context, rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) (*release.Release, error) {
type Msg struct {
r *release.Release
e error
}
resultChan := make(chan Msg, 1)
go func() { return rel, err
rel, err := i.performInstall(rel, toBeAdopted, resources)
resultChan <- Msg{rel, err}
}()
select {
case <-ctx.Done():
err := ctx.Err()
return rel, err
case msg := <-resultChan:
return msg.r, msg.e
}
} }
// isDryRun returns true if Upgrade is set to run as a DryRun // isDryRun returns true if Upgrade is set to run as a DryRun
@ -424,12 +403,12 @@ func (i *Install) isDryRun() bool {
return false return false
} }
func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) (*release.Release, error) { func (i *Install) performInstall(ctx context.Context, rel *release.Release, toBeAdopted, resources kube.ResourceList) (*release.Release, error) {
var err error var err error
// pre-install hooks // pre-install hooks
if !i.DisableHooks { if !i.DisableHooks {
if err := i.cfg.execHook(rel, release.HookPreInstall, i.Timeout); err != nil { if err := i.cfg.execHook(i.Timeout, rel, release.HookPreInstall); err != nil {
return rel, fmt.Errorf("failed pre-install: %s", err) return rel, fmt.Errorf("failed pre-install: %w", err)
} }
} }
@ -442,23 +421,37 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource
_, err = i.cfg.KubeClient.Update(toBeAdopted, resources, i.Force) _, err = i.cfg.KubeClient.Update(toBeAdopted, resources, i.Force)
} }
if err != nil { if err != nil {
return rel, err return nil, err
} }
if i.Wait { if i.Wait {
if i.WaitForJobs { var err error
ctx, cancel := context.WithTimeout(ctx, i.Timeout)
defer cancel()
kubeClient, ok := i.cfg.KubeClient.(kube.ContextInterface)
// Helm 4 TODO: WaitWithJobs and Wait should be replaced with their context
// aware counterparts.
switch {
case ok && i.WaitForJobs:
err = kubeClient.WaitWithJobsContext(ctx, resources)
case ok && !i.WaitForJobs:
err = kubeClient.WaitWithContext(ctx, resources)
case i.WaitForJobs:
err = i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout) err = i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout)
} else { case !i.WaitForJobs:
err = i.cfg.KubeClient.Wait(resources, i.Timeout) err = i.cfg.KubeClient.Wait(resources, i.Timeout)
} }
if err != nil { if err != nil {
return rel, err return rel, err
} }
} }
if !i.DisableHooks { if !i.DisableHooks {
if err := i.cfg.execHook(rel, release.HookPostInstall, i.Timeout); err != nil { if err := i.cfg.execHook(i.Timeout, rel, release.HookPostInstall); err != nil {
return rel, fmt.Errorf("failed post-install: %s", err) return rel, fmt.Errorf("failed post-install: %w", err)
} }
} }
@ -490,7 +483,8 @@ func (i *Install) failRelease(rel *release.Release, err error) (*release.Release
uninstall.DisableHooks = i.DisableHooks uninstall.DisableHooks = i.DisableHooks
uninstall.KeepHistory = false uninstall.KeepHistory = false
uninstall.Timeout = i.Timeout uninstall.Timeout = i.Timeout
if _, uninstallErr := uninstall.Run(i.ReleaseName); uninstallErr != nil { // Helm 4 TODO: Uninstalling needs to be handled properly on a failed atomic install.
if _, uninstallErr := uninstall.RunWithContext(context.TODO(), i.ReleaseName); uninstallErr != nil {
return rel, errors.Wrapf(uninstallErr, "an error occurred while uninstalling the release. original install error: %s", err) return rel, errors.Wrapf(uninstallErr, "an error occurred while uninstalling the release. original install error: %s", err)
} }
return rel, errors.Wrapf(err, "release %s failed, and has been uninstalled due to atomic being set", i.ReleaseName) return rel, errors.Wrapf(err, "release %s failed, and has been uninstalled due to atomic being set", i.ReleaseName)

@ -104,7 +104,8 @@ func TestInstallReleaseWithValues(t *testing.T) {
"simpleKey": "simpleValue", "simpleKey": "simpleValue",
}, },
} }
res, err := instAction.Run(buildChart(withSampleValues()), userVals)
res, err := instAction.RunWithContext(context.Background(), buildChart(withSampleValues()), userVals)
if err != nil { if err != nil {
t.Fatalf("Failed install: %s", err) t.Fatalf("Failed install: %s", err)
} }
@ -130,7 +131,7 @@ func TestInstallReleaseClientOnly(t *testing.T) {
is := assert.New(t) is := assert.New(t)
instAction := installAction(t) instAction := installAction(t)
instAction.ClientOnly = true instAction.ClientOnly = true
instAction.Run(buildChart(), nil) // disregard output instAction.RunWithContext(context.Background(), buildChart(), nil) // disregard output
is.Equal(instAction.cfg.Capabilities, chartutil.DefaultCapabilities) is.Equal(instAction.cfg.Capabilities, chartutil.DefaultCapabilities)
is.Equal(instAction.cfg.KubeClient, &kubefake.PrintingKubeClient{Out: io.Discard}) is.Equal(instAction.cfg.KubeClient, &kubefake.PrintingKubeClient{Out: io.Discard})
@ -140,7 +141,7 @@ func TestInstallRelease_NoName(t *testing.T) {
instAction := installAction(t) instAction := installAction(t)
instAction.ReleaseName = "" instAction.ReleaseName = ""
vals := map[string]interface{}{} vals := map[string]interface{}{}
_, err := instAction.Run(buildChart(), vals) _, err := instAction.RunWithContext(context.Background(), buildChart(), vals)
if err == nil { if err == nil {
t.Fatal("expected failure when no name is specified") t.Fatal("expected failure when no name is specified")
} }
@ -152,7 +153,8 @@ func TestInstallRelease_WithNotes(t *testing.T) {
instAction := installAction(t) instAction := installAction(t)
instAction.ReleaseName = "with-notes" instAction.ReleaseName = "with-notes"
vals := map[string]interface{}{} vals := map[string]interface{}{}
res, err := instAction.Run(buildChart(withNotes("note here")), vals)
res, err := instAction.RunWithContext(context.Background(), buildChart(withNotes("note here")), vals)
if err != nil { if err != nil {
t.Fatalf("Failed install: %s", err) t.Fatalf("Failed install: %s", err)
} }
@ -179,7 +181,8 @@ func TestInstallRelease_WithNotesRendered(t *testing.T) {
instAction := installAction(t) instAction := installAction(t)
instAction.ReleaseName = "with-notes" instAction.ReleaseName = "with-notes"
vals := map[string]interface{}{} vals := map[string]interface{}{}
res, err := instAction.Run(buildChart(withNotes("got-{{.Release.Name}}")), vals)
res, err := instAction.RunWithContext(context.Background(), buildChart(withNotes("got-{{.Release.Name}}")), vals)
if err != nil { if err != nil {
t.Fatalf("Failed install: %s", err) t.Fatalf("Failed install: %s", err)
} }
@ -198,7 +201,8 @@ func TestInstallRelease_WithChartAndDependencyParentNotes(t *testing.T) {
instAction := installAction(t) instAction := installAction(t)
instAction.ReleaseName = "with-notes" instAction.ReleaseName = "with-notes"
vals := map[string]interface{}{} vals := map[string]interface{}{}
res, err := instAction.Run(buildChart(withNotes("parent"), withDependency(withNotes("child"))), vals)
res, err := instAction.RunWithContext(context.Background(), buildChart(withNotes("parent"), withDependency(withNotes("child"))), vals)
if err != nil { if err != nil {
t.Fatalf("Failed install: %s", err) t.Fatalf("Failed install: %s", err)
} }
@ -216,8 +220,9 @@ func TestInstallRelease_WithChartAndDependencyAllNotes(t *testing.T) {
instAction := installAction(t) instAction := installAction(t)
instAction.ReleaseName = "with-notes" instAction.ReleaseName = "with-notes"
instAction.SubNotes = true instAction.SubNotes = true
vals := map[string]interface{}{} vals := map[string]interface{}{}
res, err := instAction.Run(buildChart(withNotes("parent"), withDependency(withNotes("child"))), vals) res, err := instAction.RunWithContext(context.Background(), buildChart(withNotes("parent"), withDependency(withNotes("child"))), vals)
if err != nil { if err != nil {
t.Fatalf("Failed install: %s", err) t.Fatalf("Failed install: %s", err)
} }
@ -237,7 +242,7 @@ func TestInstallRelease_DryRun(t *testing.T) {
instAction := installAction(t) instAction := installAction(t)
instAction.DryRun = true instAction.DryRun = true
vals := map[string]interface{}{} vals := map[string]interface{}{}
res, err := instAction.Run(buildChart(withSampleTemplates()), vals) res, err := instAction.RunWithContext(context.Background(), buildChart(withSampleTemplates()), vals)
if err != nil { if err != nil {
t.Fatalf("Failed install: %s", err) t.Fatalf("Failed install: %s", err)
} }
@ -268,7 +273,7 @@ func TestInstallRelease_DryRun_Lookup(t *testing.T) {
Data: []byte(`goodbye: {{ lookup "v1" "Namespace" "" "___" }}`), Data: []byte(`goodbye: {{ lookup "v1" "Namespace" "" "___" }}`),
}) })
res, err := instAction.Run(mockChart, vals) res, err := instAction.RunWithContext(context.Background(), mockChart, vals)
if err != nil { if err != nil {
t.Fatalf("Failed install: %s", err) t.Fatalf("Failed install: %s", err)
} }
@ -281,7 +286,7 @@ func TestInstallReleaseIncorrectTemplate_DryRun(t *testing.T) {
instAction := installAction(t) instAction := installAction(t)
instAction.DryRun = true instAction.DryRun = true
vals := map[string]interface{}{} vals := map[string]interface{}{}
_, err := instAction.Run(buildChart(withSampleIncludingIncorrectTemplates()), vals) _, err := instAction.RunWithContext(context.Background(), buildChart(withSampleIncludingIncorrectTemplates()), vals)
expectedErr := "\"hello/templates/incorrect\" at <.Values.bad.doh>: nil pointer evaluating interface {}.doh" expectedErr := "\"hello/templates/incorrect\" at <.Values.bad.doh>: nil pointer evaluating interface {}.doh"
if err == nil { if err == nil {
t.Fatalf("Install should fail containing error: %s", expectedErr) t.Fatalf("Install should fail containing error: %s", expectedErr)
@ -299,7 +304,7 @@ func TestInstallRelease_NoHooks(t *testing.T) {
instAction.cfg.Releases.Create(releaseStub()) instAction.cfg.Releases.Create(releaseStub())
vals := map[string]interface{}{} vals := map[string]interface{}{}
res, err := instAction.Run(buildChart(), vals) res, err := instAction.RunWithContext(context.Background(), buildChart(), vals)
if err != nil { if err != nil {
t.Fatalf("Failed install: %s", err) t.Fatalf("Failed install: %s", err)
} }
@ -316,7 +321,7 @@ func TestInstallRelease_FailedHooks(t *testing.T) {
instAction.cfg.KubeClient = failer instAction.cfg.KubeClient = failer
vals := map[string]interface{}{} vals := map[string]interface{}{}
res, err := instAction.Run(buildChart(), vals) res, err := instAction.RunWithContext(context.Background(), buildChart(), vals)
is.Error(err) is.Error(err)
is.Contains(res.Info.Description, "failed post-install") is.Contains(res.Info.Description, "failed post-install")
is.Equal(release.StatusFailed, res.Info.Status) is.Equal(release.StatusFailed, res.Info.Status)
@ -333,7 +338,8 @@ func TestInstallRelease_ReplaceRelease(t *testing.T) {
instAction.ReleaseName = rel.Name instAction.ReleaseName = rel.Name
vals := map[string]interface{}{} vals := map[string]interface{}{}
res, err := instAction.Run(buildChart(), vals)
res, err := instAction.RunWithContext(context.Background(), buildChart(), vals)
is.NoError(err) is.NoError(err)
// This should have been auto-incremented // This should have been auto-incremented
@ -349,13 +355,15 @@ func TestInstallRelease_KubeVersion(t *testing.T) {
is := assert.New(t) is := assert.New(t)
instAction := installAction(t) instAction := installAction(t)
vals := map[string]interface{}{} vals := map[string]interface{}{}
_, err := instAction.Run(buildChart(withKube(">=0.0.0")), vals)
_, err := instAction.RunWithContext(context.Background(), buildChart(withKube(">=0.0.0")), vals)
is.NoError(err) is.NoError(err)
// This should fail for a few hundred years // This should fail for a few hundred years
instAction.ReleaseName = "should-fail" instAction.ReleaseName = "should-fail"
vals = map[string]interface{}{} vals = map[string]interface{}{}
_, err = instAction.Run(buildChart(withKube(">=99.0.0")), vals)
_, err = instAction.RunWithContext(context.Background(), buildChart(withKube(">=99.0.0")), vals)
is.Error(err) is.Error(err)
is.Contains(err.Error(), "chart requires kubeVersion") is.Contains(err.Error(), "chart requires kubeVersion")
} }
@ -372,7 +380,7 @@ func TestInstallRelease_Wait(t *testing.T) {
goroutines := runtime.NumGoroutine() goroutines := runtime.NumGoroutine()
res, err := instAction.Run(buildChart(), vals) res, err := instAction.RunWithContext(context.Background(), buildChart(), vals)
is.Error(err) is.Error(err)
is.Contains(res.Info.Description, "I timed out") is.Contains(res.Info.Description, "I timed out")
is.Equal(res.Info.Status, release.StatusFailed) is.Equal(res.Info.Status, release.StatusFailed)
@ -384,25 +392,15 @@ func TestInstallRelease_Wait_Interrupted(t *testing.T) {
instAction := installAction(t) instAction := installAction(t)
instAction.ReleaseName = "interrupted-release" instAction.ReleaseName = "interrupted-release"
failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitDuration = 10 * time.Second failer.WaitError = context.Canceled
instAction.cfg.KubeClient = failer instAction.cfg.KubeClient = failer
instAction.Wait = true instAction.Wait = true
vals := map[string]interface{}{} vals := map[string]interface{}{}
ctx := context.Background() res, err := instAction.RunWithContext(context.Background(), buildChart(), vals)
ctx, cancel := context.WithCancel(ctx)
time.AfterFunc(time.Second, cancel)
goroutines := runtime.NumGoroutine()
res, err := instAction.RunWithContext(ctx, buildChart(), vals)
is.Error(err) is.Error(err)
is.Contains(res.Info.Description, "Release \"interrupted-release\" failed: context canceled") is.Contains(res.Info.Description, "Release \"interrupted-release\" failed: context canceled")
is.Equal(res.Info.Status, release.StatusFailed) is.Equal(res.Info.Status, release.StatusFailed)
is.Equal(goroutines+1, runtime.NumGoroutine()) // installation goroutine still is in background
time.Sleep(10 * time.Second) // wait for goroutine to finish
is.Equal(goroutines, runtime.NumGoroutine())
} }
func TestInstallRelease_WaitForJobs(t *testing.T) { func TestInstallRelease_WaitForJobs(t *testing.T) {
is := assert.New(t) is := assert.New(t)
@ -415,7 +413,7 @@ func TestInstallRelease_WaitForJobs(t *testing.T) {
instAction.WaitForJobs = true instAction.WaitForJobs = true
vals := map[string]interface{}{} vals := map[string]interface{}{}
res, err := instAction.Run(buildChart(), vals) res, err := instAction.RunWithContext(context.Background(), buildChart(), vals)
is.Error(err) is.Error(err)
is.Contains(res.Info.Description, "I timed out") is.Contains(res.Info.Description, "I timed out")
is.Equal(res.Info.Status, release.StatusFailed) is.Equal(res.Info.Status, release.StatusFailed)
@ -431,9 +429,10 @@ func TestInstallRelease_Atomic(t *testing.T) {
failer.WaitError = fmt.Errorf("I timed out") failer.WaitError = fmt.Errorf("I timed out")
instAction.cfg.KubeClient = failer instAction.cfg.KubeClient = failer
instAction.Atomic = true instAction.Atomic = true
vals := map[string]interface{}{} vals := map[string]interface{}{}
res, err := instAction.Run(buildChart(), vals) res, err := instAction.RunWithContext(context.Background(), buildChart(), vals)
is.Error(err) is.Error(err)
is.Contains(err.Error(), "I timed out") is.Contains(err.Error(), "I timed out")
is.Contains(err.Error(), "atomic") is.Contains(err.Error(), "atomic")
@ -454,7 +453,7 @@ func TestInstallRelease_Atomic(t *testing.T) {
instAction.Atomic = true instAction.Atomic = true
vals := map[string]interface{}{} vals := map[string]interface{}{}
_, err := instAction.Run(buildChart(), vals) _, err := instAction.RunWithContext(context.Background(), buildChart(), vals)
is.Error(err) is.Error(err)
is.Contains(err.Error(), "I timed out") is.Contains(err.Error(), "I timed out")
is.Contains(err.Error(), "uninstall fail") is.Contains(err.Error(), "uninstall fail")
@ -467,16 +466,12 @@ func TestInstallRelease_Atomic_Interrupted(t *testing.T) {
instAction := installAction(t) instAction := installAction(t)
instAction.ReleaseName = "interrupted-release" instAction.ReleaseName = "interrupted-release"
failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitDuration = 10 * time.Second failer.WaitError = context.Canceled
instAction.cfg.KubeClient = failer instAction.cfg.KubeClient = failer
instAction.Atomic = true instAction.Atomic = true
vals := map[string]interface{}{} vals := map[string]interface{}{}
ctx := context.Background() res, err := instAction.RunWithContext(context.Background(), buildChart(), vals)
ctx, cancel := context.WithCancel(ctx)
time.AfterFunc(time.Second, cancel)
res, err := instAction.RunWithContext(ctx, buildChart(), vals)
is.Error(err) is.Error(err)
is.Contains(err.Error(), "context canceled") is.Contains(err.Error(), "context canceled")
is.Contains(err.Error(), "atomic") is.Contains(err.Error(), "atomic")
@ -566,7 +561,7 @@ func TestInstallReleaseOutputDir(t *testing.T) {
instAction.OutputDir = dir instAction.OutputDir = dir
_, err := instAction.Run(buildChart(withSampleTemplates(), withMultipleManifestTemplate()), vals) _, err := instAction.RunWithContext(context.Background(), buildChart(withSampleTemplates(), withMultipleManifestTemplate()), vals)
if err != nil { if err != nil {
t.Fatalf("Failed install: %s", err) t.Fatalf("Failed install: %s", err)
} }
@ -602,7 +597,7 @@ func TestInstallOutputDirWithReleaseName(t *testing.T) {
newDir := filepath.Join(dir, instAction.ReleaseName) newDir := filepath.Join(dir, instAction.ReleaseName)
_, err := instAction.Run(buildChart(withSampleTemplates(), withMultipleManifestTemplate()), vals) _, err := instAction.RunWithContext(context.Background(), buildChart(withSampleTemplates(), withMultipleManifestTemplate()), vals)
if err != nil { if err != nil {
t.Fatalf("Failed install: %s", err) t.Fatalf("Failed install: %s", err)
} }

@ -54,8 +54,7 @@ func NewReleaseTesting(cfg *Configuration) *ReleaseTesting {
} }
} }
// Run executes 'helm test' against the given release. func (r *ReleaseTesting) RunWithContext(ctx context.Context, name string) (*release.Release, error) {
func (r *ReleaseTesting) Run(name string) (*release.Release, error) {
if err := r.cfg.KubeClient.IsReachable(); err != nil { if err := r.cfg.KubeClient.IsReachable(); err != nil {
return nil, err return nil, err
} }
@ -94,7 +93,7 @@ func (r *ReleaseTesting) Run(name string) (*release.Release, error) {
rel.Hooks = executingHooks rel.Hooks = executingHooks
} }
if err := r.cfg.execHook(rel, release.HookTest, r.Timeout); err != nil { if err := r.cfg.execHook(r.Timeout, rel, release.HookTest); err != nil {
rel.Hooks = append(skippedHooks, rel.Hooks...) rel.Hooks = append(skippedHooks, rel.Hooks...)
r.cfg.Releases.Update(rel) r.cfg.Releases.Update(rel)
return rel, err return rel, err
@ -104,6 +103,11 @@ func (r *ReleaseTesting) Run(name string) (*release.Release, error) {
return rel, r.cfg.Releases.Update(rel) return rel, r.cfg.Releases.Update(rel)
} }
// Run executes 'helm test' against the given release.
func (r *ReleaseTesting) Run(name string) (*release.Release, error) {
return r.RunWithContext(context.TODO(), name)
}
// GetPodLogs will write the logs for all test pods in the given release into // GetPodLogs will write the logs for all test pods in the given release into
// the given writer. These can be immediately output to the user or captured for // the given writer. These can be immediately output to the user or captured for
// other uses // other uses

@ -18,6 +18,7 @@ package action
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"strings" "strings"
"time" "time"
@ -54,8 +55,7 @@ func NewRollback(cfg *Configuration) *Rollback {
} }
} }
// Run executes 'helm rollback' against the given release. func (r *Rollback) RunWithContext(ctx context.Context, name string) error {
func (r *Rollback) Run(name string) error {
if err := r.cfg.KubeClient.IsReachable(); err != nil { if err := r.cfg.KubeClient.IsReachable(); err != nil {
return err return err
} }
@ -76,7 +76,7 @@ func (r *Rollback) Run(name string) error {
} }
r.cfg.Log("performing rollback of %s", name) r.cfg.Log("performing rollback of %s", name)
if _, err := r.performRollback(currentRelease, targetRelease); err != nil { if _, err := r.performRollback(ctx, currentRelease, targetRelease); err != nil {
return err return err
} }
@ -89,6 +89,11 @@ func (r *Rollback) Run(name string) error {
return nil return nil
} }
// Run executes 'helm rollback' against the given release.
func (r *Rollback) Run(name string) error {
return r.RunWithContext(context.TODO(), name)
}
// prepareRollback finds the previous release and prepares a new release object with // prepareRollback finds the previous release and prepares a new release object with
// the previous release's configuration // the previous release's configuration
func (r *Rollback) prepareRollback(name string) (*release.Release, *release.Release, error) { func (r *Rollback) prepareRollback(name string) (*release.Release, *release.Release, error) {
@ -159,7 +164,7 @@ func (r *Rollback) prepareRollback(name string) (*release.Release, *release.Rele
return currentRelease, targetRelease, nil return currentRelease, targetRelease, nil
} }
func (r *Rollback) performRollback(currentRelease, targetRelease *release.Release) (*release.Release, error) { func (r *Rollback) performRollback(ctx context.Context, currentRelease, targetRelease *release.Release) (*release.Release, error) {
if r.DryRun { if r.DryRun {
r.cfg.Log("dry run for %s", targetRelease.Name) r.cfg.Log("dry run for %s", targetRelease.Name)
return targetRelease, nil return targetRelease, nil
@ -176,7 +181,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
// pre-rollback hooks // pre-rollback hooks
if !r.DisableHooks { if !r.DisableHooks {
if err := r.cfg.execHook(targetRelease, release.HookPreRollback, r.Timeout); err != nil { if err := r.cfg.execHook(r.Timeout, targetRelease, release.HookPreRollback); err != nil {
return targetRelease, err return targetRelease, err
} }
} else { } else {
@ -243,7 +248,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
// post-rollback hooks // post-rollback hooks
if !r.DisableHooks { if !r.DisableHooks {
if err := r.cfg.execHook(targetRelease, release.HookPostRollback, r.Timeout); err != nil { if err := r.cfg.execHook(r.Timeout, targetRelease, release.HookPostRollback); err != nil {
return targetRelease, err return targetRelease, err
} }
} }

@ -17,6 +17,7 @@ limitations under the License.
package action package action
import ( import (
"context"
"strings" "strings"
"time" "time"
@ -56,6 +57,13 @@ func NewUninstall(cfg *Configuration) *Uninstall {
// Run uninstalls the given release. // Run uninstalls the given release.
func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error) { func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error) {
ctx, cancel := context.WithTimeout(context.TODO(), u.Timeout)
defer cancel()
return u.RunWithContext(ctx, name)
}
func (u *Uninstall) RunWithContext(ctx context.Context, name string) (*release.UninstallReleaseResponse, error) {
if err := u.cfg.KubeClient.IsReachable(); err != nil { if err := u.cfg.KubeClient.IsReachable(); err != nil {
return nil, err return nil, err
} }
@ -106,7 +114,7 @@ func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error)
res := &release.UninstallReleaseResponse{Release: rel} res := &release.UninstallReleaseResponse{Release: rel}
if !u.DisableHooks { if !u.DisableHooks {
if err := u.cfg.execHook(rel, release.HookPreDelete, u.Timeout); err != nil { if err := u.cfg.execHook(u.Timeout, rel, release.HookPreDelete); err != nil {
return res, err return res, err
} }
} else { } else {
@ -131,15 +139,25 @@ func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error)
res.Info = kept res.Info = kept
if u.Wait { if u.Wait {
if kubeClient, ok := u.cfg.KubeClient.(kube.InterfaceExt); ok { var err error
if err := kubeClient.WaitForDelete(deletedResources, u.Timeout); err != nil { // Helm 4 TODO: WaitForDelete should be replaced with it's context
errs = append(errs, err) // aware counterpart.
} switch kubeClient := u.cfg.KubeClient.(type) {
case kube.ContextInterface:
err = kubeClient.WaitForDeleteWithContext(ctx, deletedResources)
case kube.InterfaceExt:
err = kubeClient.WaitForDelete(deletedResources, u.Timeout)
default:
u.cfg.Log("WARNING: KubeClient does not satisfy ContextInterface, or InterfaceExt")
}
if err != nil {
errs = append(errs, err)
} }
} }
if !u.DisableHooks { if !u.DisableHooks {
if err := u.cfg.execHook(rel, release.HookPostDelete, u.Timeout); err != nil { if err := u.cfg.execHook(u.Timeout, rel, release.HookPostDelete); err != nil {
errs = append(errs, err) errs = append(errs, err)
} }
} }

@ -17,6 +17,7 @@ limitations under the License.
package action package action
import ( import (
"context"
"fmt" "fmt"
"testing" "testing"
@ -68,7 +69,10 @@ func TestUninstallRelease_deleteRelease(t *testing.T) {
} }
}` }`
unAction.cfg.Releases.Create(rel) unAction.cfg.Releases.Create(rel)
res, err := unAction.Run(rel.Name)
ctx := context.Background()
res, err := unAction.RunWithContext(ctx, rel.Name)
is.NoError(err) is.NoError(err)
expected := `These resources were kept due to the resource policy: expected := `These resources were kept due to the resource policy:
[Secret] secret [Secret] secret
@ -101,7 +105,10 @@ func TestUninstallRelease_Wait(t *testing.T) {
failer := unAction.cfg.KubeClient.(*kubefake.FailingKubeClient) failer := unAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitError = fmt.Errorf("U timed out") failer.WaitError = fmt.Errorf("U timed out")
unAction.cfg.KubeClient = failer unAction.cfg.KubeClient = failer
res, err := unAction.Run(rel.Name)
ctx := context.Background()
res, err := unAction.RunWithContext(ctx, rel.Name)
is.Error(err) is.Error(err)
is.Contains(err.Error(), "U timed out") is.Contains(err.Error(), "U timed out")
is.Equal(res.Release.Info.Status, release.StatusUninstalled) is.Equal(res.Release.Info.Status, release.StatusUninstalled)
@ -134,7 +141,10 @@ func TestUninstallRelease_Cascade(t *testing.T) {
failer.DeleteWithPropagationError = fmt.Errorf("Uninstall with cascade failed") failer.DeleteWithPropagationError = fmt.Errorf("Uninstall with cascade failed")
failer.BuildDummy = true failer.BuildDummy = true
unAction.cfg.KubeClient = failer unAction.cfg.KubeClient = failer
_, err := unAction.Run(rel.Name)
ctx := context.Background()
_, err := unAction.RunWithContext(ctx, rel.Name)
is.Error(err) is.Error(err)
is.Contains(err.Error(), "failed to delete release: come-fail-away") is.Contains(err.Error(), "failed to delete release: come-fail-away")
} }

@ -134,7 +134,9 @@ func (u *Upgrade) SetRegistryClient(client *registry.Client) {
// Run executes the upgrade on the given release. // Run executes the upgrade on the given release.
func (u *Upgrade) Run(name string, chart *chart.Chart, vals map[string]interface{}) (*release.Release, error) { func (u *Upgrade) Run(name string, chart *chart.Chart, vals map[string]interface{}) (*release.Release, error) {
ctx := context.Background() ctx, cancel := context.WithTimeout(context.TODO(), u.Timeout)
defer cancel()
return u.RunWithContext(ctx, name, chart, vals) return u.RunWithContext(ctx, name, chart, vals)
} }
@ -357,51 +359,16 @@ func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedR
if err := u.cfg.Releases.Create(upgradedRelease); err != nil { if err := u.cfg.Releases.Create(upgradedRelease); err != nil {
return nil, err return nil, err
} }
rChan := make(chan resultMessage)
ctxChan := make(chan resultMessage)
doneChan := make(chan interface{})
defer close(doneChan)
go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease)
go u.handleContext(ctx, doneChan, ctxChan, upgradedRelease)
select {
case result := <-rChan:
return result.r, result.e
case result := <-ctxChan:
return result.r, result.e
}
}
// Function used to lock the Mutex, this is important for the case when the atomic flag is set. return u.releasingUpgrade(ctx, upgradedRelease, current, target, originalRelease)
// In that case the upgrade will finish before the rollback is finished so it is necessary to wait for the rollback to finish.
// The rollback will be trigger by the function failRelease
func (u *Upgrade) reportToPerformUpgrade(c chan<- resultMessage, rel *release.Release, created kube.ResourceList, err error) {
u.Lock.Lock()
if err != nil {
rel, err = u.failRelease(rel, created, err)
}
c <- resultMessage{r: rel, e: err}
u.Lock.Unlock()
} }
// Setup listener for SIGINT and SIGTERM func (u *Upgrade) releasingUpgrade(ctx context.Context, upgradedRelease *release.Release, current, target kube.ResourceList, originalRelease *release.Release) (*release.Release, error) {
func (u *Upgrade) handleContext(ctx context.Context, done chan interface{}, c chan<- resultMessage, upgradedRelease *release.Release) {
select {
case <-ctx.Done():
err := ctx.Err()
// when the atomic flag is set the ongoing release finish first and doesn't give time for the rollback happens.
u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, err)
case <-done:
return
}
}
func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) {
// pre-upgrade hooks // pre-upgrade hooks
if !u.DisableHooks { if !u.DisableHooks {
if err := u.cfg.execHook(upgradedRelease, release.HookPreUpgrade, u.Timeout); err != nil { if err := u.cfg.execHook(u.Timeout, upgradedRelease, release.HookPreUpgrade); err != nil {
u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %s", err)) return u.failRelease(upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %w", err))
return
} }
} else { } else {
u.cfg.Log("upgrade hooks disabled for %s", upgradedRelease.Name) u.cfg.Log("upgrade hooks disabled for %s", upgradedRelease.Name)
@ -410,8 +377,7 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele
results, err := u.cfg.KubeClient.Update(current, target, u.Force) results, err := u.cfg.KubeClient.Update(current, target, u.Force)
if err != nil { if err != nil {
u.cfg.recordRelease(originalRelease) u.cfg.recordRelease(originalRelease)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) return u.failRelease(upgradedRelease, results.Created, err)
return
} }
if u.Recreate { if u.Recreate {
@ -428,26 +394,35 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele
u.cfg.Log( u.cfg.Log(
"waiting for release %s resources (created: %d updated: %d deleted: %d)", "waiting for release %s resources (created: %d updated: %d deleted: %d)",
upgradedRelease.Name, len(results.Created), len(results.Updated), len(results.Deleted)) upgradedRelease.Name, len(results.Created), len(results.Updated), len(results.Deleted))
if u.WaitForJobs { var err error
if err := u.cfg.KubeClient.WaitWithJobs(target, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease) ctx, cancel := context.WithTimeout(ctx, u.Timeout)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) defer cancel()
return
} kubeClient, ok := u.cfg.KubeClient.(kube.ContextInterface)
} else { // Helm 4 TODO: WaitWithJobs and Wait should be replaced with their context
if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil { // aware counterparts.
u.cfg.recordRelease(originalRelease) switch {
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) case ok && u.WaitForJobs:
return err = kubeClient.WaitWithJobsContext(ctx, target)
} case ok && !u.WaitForJobs:
err = kubeClient.WaitWithContext(ctx, target)
case u.WaitForJobs:
err = u.cfg.KubeClient.WaitWithJobs(target, u.Timeout)
case !u.WaitForJobs:
err = u.cfg.KubeClient.Wait(target, u.Timeout)
}
if err != nil {
u.cfg.recordRelease(originalRelease)
return u.failRelease(upgradedRelease, results.Created, err)
} }
} }
// post-upgrade hooks // post-upgrade hooks
if !u.DisableHooks { if !u.DisableHooks {
if err := u.cfg.execHook(upgradedRelease, release.HookPostUpgrade, u.Timeout); err != nil { if err := u.cfg.execHook(u.Timeout, upgradedRelease, release.HookPostUpgrade); err != nil {
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %s", err)) return u.failRelease(upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %w", err))
return
} }
} }
@ -460,7 +435,8 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele
} else { } else {
upgradedRelease.Info.Description = "Upgrade complete" upgradedRelease.Info.Description = "Upgrade complete"
} }
u.reportToPerformUpgrade(c, upgradedRelease, nil, nil)
return upgradedRelease, nil
} }
func (u *Upgrade) failRelease(rel *release.Release, created kube.ResourceList, err error) (*release.Release, error) { func (u *Upgrade) failRelease(rel *release.Release, created kube.ResourceList, err error) (*release.Release, error) {

@ -56,6 +56,7 @@ func TestUpgradeRelease_Success(t *testing.T) {
vals := map[string]interface{}{} vals := map[string]interface{}{}
ctx, done := context.WithCancel(context.Background()) ctx, done := context.WithCancel(context.Background())
res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals)
done() done()
req.NoError(err) req.NoError(err)
@ -85,7 +86,9 @@ func TestUpgradeRelease_Wait(t *testing.T) {
upAction.Wait = true upAction.Wait = true
vals := map[string]interface{}{} vals := map[string]interface{}{}
res, err := upAction.Run(rel.Name, buildChart(), vals) ctx := context.Background()
res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals)
req.Error(err) req.Error(err)
is.Contains(res.Info.Description, "I timed out") is.Contains(res.Info.Description, "I timed out")
is.Equal(res.Info.Status, release.StatusFailed) is.Equal(res.Info.Status, release.StatusFailed)
@ -108,7 +111,9 @@ func TestUpgradeRelease_WaitForJobs(t *testing.T) {
upAction.WaitForJobs = true upAction.WaitForJobs = true
vals := map[string]interface{}{} vals := map[string]interface{}{}
res, err := upAction.Run(rel.Name, buildChart(), vals) ctx := context.Background()
res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals)
req.Error(err) req.Error(err)
is.Contains(res.Info.Description, "I timed out") is.Contains(res.Info.Description, "I timed out")
is.Equal(res.Info.Status, release.StatusFailed) is.Equal(res.Info.Status, release.StatusFailed)
@ -132,7 +137,9 @@ func TestUpgradeRelease_CleanupOnFail(t *testing.T) {
upAction.CleanupOnFail = true upAction.CleanupOnFail = true
vals := map[string]interface{}{} vals := map[string]interface{}{}
res, err := upAction.Run(rel.Name, buildChart(), vals) ctx := context.Background()
res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals)
req.Error(err) req.Error(err)
is.NotContains(err.Error(), "unable to cleanup resources") is.NotContains(err.Error(), "unable to cleanup resources")
is.Contains(res.Info.Description, "I timed out") is.Contains(res.Info.Description, "I timed out")
@ -158,7 +165,9 @@ func TestUpgradeRelease_Atomic(t *testing.T) {
upAction.Atomic = true upAction.Atomic = true
vals := map[string]interface{}{} vals := map[string]interface{}{}
res, err := upAction.Run(rel.Name, buildChart(), vals) ctx := context.Background()
res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals)
req.Error(err) req.Error(err)
is.Contains(err.Error(), "arming key removed") is.Contains(err.Error(), "arming key removed")
is.Contains(err.Error(), "atomic") is.Contains(err.Error(), "atomic")
@ -183,7 +192,9 @@ func TestUpgradeRelease_Atomic(t *testing.T) {
upAction.Atomic = true upAction.Atomic = true
vals := map[string]interface{}{} vals := map[string]interface{}{}
_, err := upAction.Run(rel.Name, buildChart(), vals) ctx := context.Background()
_, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals)
req.Error(err) req.Error(err)
is.Contains(err.Error(), "update fail") is.Contains(err.Error(), "update fail")
is.Contains(err.Error(), "an error occurred while rolling back the release") is.Contains(err.Error(), "an error occurred while rolling back the release")
@ -222,8 +233,11 @@ func TestUpgradeRelease_ReuseValues(t *testing.T) {
is.NoError(err) is.NoError(err)
upAction.ReuseValues = true upAction.ReuseValues = true
ctx := context.Background()
// setting newValues and upgrading // setting newValues and upgrading
res, err := upAction.Run(rel.Name, buildChart(), newValues) res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), newValues)
is.NoError(err) is.NoError(err)
// Now make sure it is actually upgraded // Now make sure it is actually upgraded
@ -284,8 +298,11 @@ func TestUpgradeRelease_ReuseValues(t *testing.T) {
withDependency(withName("subchart")), withDependency(withName("subchart")),
withMetadataDependency(dependency), withMetadataDependency(dependency),
) )
ctx := context.Background()
// reusing values and upgrading // reusing values and upgrading
res, err := upAction.Run(rel.Name, sampleChartWithSubChart, map[string]interface{}{}) res, err := upAction.RunWithContext(ctx, rel.Name, sampleChartWithSubChart, map[string]interface{}{})
is.NoError(err) is.NoError(err)
// Now get the upgraded release // Now get the upgraded release
@ -377,7 +394,9 @@ func TestUpgradeRelease_Pending(t *testing.T) {
vals := map[string]interface{}{} vals := map[string]interface{}{}
_, err := upAction.Run(rel.Name, buildChart(), vals) ctx := context.Background()
_, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals)
req.Contains(err.Error(), "progress", err) req.Contains(err.Error(), "progress", err)
} }
@ -393,16 +412,12 @@ func TestUpgradeRelease_Interrupted_Wait(t *testing.T) {
upAction.cfg.Releases.Create(rel) upAction.cfg.Releases.Create(rel)
failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitDuration = 10 * time.Second failer.WaitError = context.Canceled
upAction.cfg.KubeClient = failer upAction.cfg.KubeClient = failer
upAction.Wait = true upAction.Wait = true
vals := map[string]interface{}{} vals := map[string]interface{}{}
ctx := context.Background() res, err := upAction.RunWithContext(context.Background(), rel.Name, buildChart(), vals)
ctx, cancel := context.WithCancel(ctx)
time.AfterFunc(time.Second, cancel)
res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals)
req.Error(err) req.Error(err)
is.Contains(res.Info.Description, "Upgrade \"interrupted-release\" failed: context canceled") is.Contains(res.Info.Description, "Upgrade \"interrupted-release\" failed: context canceled")
@ -422,16 +437,17 @@ func TestUpgradeRelease_Interrupted_Atomic(t *testing.T) {
upAction.cfg.Releases.Create(rel) upAction.cfg.Releases.Create(rel)
failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitDuration = 5 * time.Second failer.WaitError = context.Canceled
failer.WaitDuration = 2 * time.Second
upAction.cfg.KubeClient = failer upAction.cfg.KubeClient = failer
upAction.Atomic = true upAction.Atomic = true
vals := map[string]interface{}{} vals := map[string]interface{}{}
ctx := context.Background() // After the first Wait error, error needs to be set nil
ctx, cancel := context.WithCancel(ctx) // so atomic cleanup passes.
time.AfterFunc(time.Second, cancel) time.AfterFunc(failer.WaitDuration, func() { failer.WaitError = nil })
res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals) res, err := upAction.RunWithContext(context.Background(), rel.Name, buildChart(), vals)
req.Error(err) req.Error(err)
is.Contains(err.Error(), "release interrupted-release failed, and has been rolled back due to atomic being set: context canceled") is.Contains(err.Error(), "release interrupted-release failed, and has been rolled back due to atomic being set: context canceled")

@ -283,41 +283,67 @@ func getResource(info *resource.Info) (runtime.Object, error) {
// Wait waits up to the given timeout for the specified resources to be ready. // Wait waits up to the given timeout for the specified resources to be ready.
func (c *Client) Wait(resources ResourceList, timeout time.Duration) error { func (c *Client) Wait(resources ResourceList, timeout time.Duration) error {
// Helm 4 TODO: remove decarator around WaitWithContext.
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
return c.WaitWithContext(ctx, resources)
}
// WaitWithContext waits up till ctx timeout for the specified resources to be ready.
func (c *Client) WaitWithContext(ctx context.Context, resources ResourceList) error {
cs, err := c.getKubeClient() cs, err := c.getKubeClient()
if err != nil { if err != nil {
return err return err
} }
checker := NewReadyChecker(cs, c.Log, PausedAsReady(true)) checker := NewReadyChecker(cs, c.Log, PausedAsReady(true))
w := waiter{ w := waiter{
c: checker, c: checker,
log: c.Log, log: c.Log,
timeout: timeout,
} }
return w.waitForResources(resources) return w.waitForResources(ctx, resources)
} }
// WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs. // WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs.
func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) error { func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) error {
// Helm 4 TODO: remove decarator around WaitWithJobsContext.
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
return c.WaitWithJobsContext(ctx, resources)
}
// WaitWithJobsContext wait up to the given ctx timeout for the specified resources to be ready, including jobs.
func (c *Client) WaitWithJobsContext(ctx context.Context, resources ResourceList) error {
cs, err := c.getKubeClient() cs, err := c.getKubeClient()
if err != nil { if err != nil {
return err return err
} }
checker := NewReadyChecker(cs, c.Log, PausedAsReady(true), CheckJobs(true)) checker := NewReadyChecker(cs, c.Log, PausedAsReady(true), CheckJobs(true))
w := waiter{ w := waiter{
c: checker, c: checker,
log: c.Log, log: c.Log,
timeout: timeout,
} }
return w.waitForResources(resources)
return w.waitForResources(ctx, resources)
} }
// WaitForDelete wait up to the given timeout for the specified resources to be deleted. // WaitForDelete wait up to the given timeout for the specified resources to be deleted.
func (c *Client) WaitForDelete(resources ResourceList, timeout time.Duration) error { func (c *Client) WaitForDelete(resources ResourceList, timeout time.Duration) error {
// Helm 4 TODO: remove decarator around WaitWithJobsContext.
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
return c.WaitForDeleteWithContext(ctx, resources)
}
// WaitForDelete wait up to the given timeout for the specified resources to be deleted.
func (c *Client) WaitForDeleteWithContext(ctx context.Context, resources ResourceList) error {
w := waiter{ w := waiter{
log: c.Log, log: c.Log,
timeout: timeout,
} }
return w.waitForDeletedResources(resources)
return w.waitForDeletedResources(ctx, resources)
} }
func (c *Client) namespace() string { func (c *Client) namespace() string {
@ -512,9 +538,9 @@ func delete(c *Client, resources ResourceList, propagation metav1.DeletionPropag
return res, nil return res, nil
} }
func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error { func (c *Client) watchTimeout(ctx context.Context) func(*resource.Info) error {
return func(info *resource.Info) error { return func(info *resource.Info) error {
return c.watchUntilReady(t, info) return c.watchUntilReady(ctx, info)
} }
} }
@ -533,9 +559,18 @@ func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error {
// //
// Handling for other kinds will be added as necessary. // Handling for other kinds will be added as necessary.
func (c *Client) WatchUntilReady(resources ResourceList, timeout time.Duration) error { func (c *Client) WatchUntilReady(resources ResourceList, timeout time.Duration) error {
// Helm 4 TODO: remove decarator around WatchUntilReadyWithContext.
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
// For jobs, there's also the option to do poll c.Jobs(namespace).Get(): // For jobs, there's also the option to do poll c.Jobs(namespace).Get():
// https://github.com/adamreese/kubernetes/blob/master/test/e2e/job.go#L291-L300 // https://github.com/adamreese/kubernetes/blob/master/test/e2e/job.go#L291-L300
return perform(resources, c.watchTimeout(timeout)) return c.WatchUntilReadyWithContext(ctx, resources)
}
// WatchUntilReadyWithContext -.
func (c *Client) WatchUntilReadyWithContext(ctx context.Context, resources ResourceList) error {
return perform(resources, c.watchTimeout(ctx))
} }
func perform(infos ResourceList, fn func(*resource.Info) error) error { func perform(infos ResourceList, fn func(*resource.Info) error) error {
@ -600,6 +635,7 @@ func createResource(info *resource.Info) error {
if err != nil { if err != nil {
return err return err
} }
return info.Refresh(obj, true) return info.Refresh(obj, true)
} }
@ -701,7 +737,7 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object,
return nil return nil
} }
func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error { func (c *Client) watchUntilReady(ctx context.Context, info *resource.Info) error {
kind := info.Mapping.GroupVersionKind.Kind kind := info.Mapping.GroupVersionKind.Kind
switch kind { switch kind {
case "Job", "Pod": case "Job", "Pod":
@ -709,7 +745,7 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err
return nil return nil
} }
c.Log("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeout) c.Log("Watching for changes to %s %s with timeout of %q", kind, info.Name, timeFromCtx(ctx))
// Use a selector on the name of the resource. This should be unique for the // Use a selector on the name of the resource. This should be unique for the
// given version and kind // given version and kind
@ -725,8 +761,6 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err
// In the future, we might want to add some special logic for types // In the future, we might want to add some special logic for types
// like Ingress, Volume, etc. // like Ingress, Volume, etc.
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel()
_, err = watchtools.UntilWithSync(ctx, lw, &unstructured.Unstructured{}, nil, func(e watch.Event) (bool, error) { _, err = watchtools.UntilWithSync(ctx, lw, &unstructured.Unstructured{}, nil, func(e watch.Event) (bool, error) {
// Make sure the incoming object is versioned as we use unstructured // Make sure the incoming object is versioned as we use unstructured
// objects when we build manifests // objects when we build manifests
@ -817,6 +851,35 @@ func scrubValidationError(err error) error {
return err return err
} }
func (c *Client) WaitAndGetCompletedPodPhaseWithContext(ctx context.Context, name string) (v1.PodPhase, error) {
client, err := c.getKubeClient()
if err != nil {
return v1.PodUnknown, err
}
watcher, err := client.CoreV1().Pods(c.namespace()).Watch(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
})
if err != nil {
return v1.PodUnknown, err
}
for event := range watcher.ResultChan() {
p, ok := event.Object.(*v1.Pod)
if !ok {
return v1.PodUnknown, fmt.Errorf("%s not a pod", name)
}
switch p.Status.Phase {
case v1.PodFailed:
return v1.PodFailed, nil
case v1.PodSucceeded:
return v1.PodSucceeded, nil
}
}
return v1.PodUnknown, err
}
// WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase // WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase
// and returns said phase (PodSucceeded or PodFailed qualify). // and returns said phase (PodSucceeded or PodFailed qualify).
func (c *Client) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) { func (c *Client) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) {

@ -18,6 +18,7 @@ limitations under the License.
package fake package fake
import ( import (
"context"
"io" "io"
"time" "time"
@ -74,6 +75,16 @@ func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration) e
return f.PrintingKubeClient.Wait(resources, d) return f.PrintingKubeClient.Wait(resources, d)
} }
// Waits the amount of time defined on f.WaitDuration, then returns the configured error if set or prints.
func (f *FailingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList) error {
time.Sleep(f.WaitDuration)
if f.WaitError != nil {
return f.WaitError
}
return f.PrintingKubeClient.WaitWithContext(ctx, resources)
}
// WaitWithJobs returns the configured error if set or prints // WaitWithJobs returns the configured error if set or prints
func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Duration) error { func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Duration) error {
if f.WaitError != nil { if f.WaitError != nil {
@ -82,6 +93,14 @@ func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Dur
return f.PrintingKubeClient.WaitWithJobs(resources, d) return f.PrintingKubeClient.WaitWithJobs(resources, d)
} }
// WaitWithJobs returns the configured error if set or prints
func (f *FailingKubeClient) WaitWithJobsContext(ctx context.Context, resources kube.ResourceList) error {
if f.WaitError != nil {
return f.WaitError
}
return f.PrintingKubeClient.WaitWithJobsContext(ctx, resources)
}
// WaitForDelete returns the configured error if set or prints // WaitForDelete returns the configured error if set or prints
func (f *FailingKubeClient) WaitForDelete(resources kube.ResourceList, d time.Duration) error { func (f *FailingKubeClient) WaitForDelete(resources kube.ResourceList, d time.Duration) error {
if f.WaitError != nil { if f.WaitError != nil {
@ -90,6 +109,14 @@ func (f *FailingKubeClient) WaitForDelete(resources kube.ResourceList, d time.Du
return f.PrintingKubeClient.WaitForDelete(resources, d) return f.PrintingKubeClient.WaitForDelete(resources, d)
} }
// WaitForDeleteWithContext returns the configured error if set or prints
func (f *FailingKubeClient) WaitForDeleteWithContext(ctx context.Context, resources kube.ResourceList) error {
if f.WaitError != nil {
return f.WaitError
}
return f.PrintingKubeClient.WaitForDeleteWithContext(ctx, resources)
}
// Delete returns the configured error if set or prints // Delete returns the configured error if set or prints
func (f *FailingKubeClient) Delete(resources kube.ResourceList) (*kube.Result, []error) { func (f *FailingKubeClient) Delete(resources kube.ResourceList) (*kube.Result, []error) {
if f.DeleteError != nil { if f.DeleteError != nil {
@ -141,6 +168,15 @@ func (f *FailingKubeClient) WaitAndGetCompletedPodPhase(s string, d time.Duratio
return f.PrintingKubeClient.WaitAndGetCompletedPodPhase(s, d) return f.PrintingKubeClient.WaitAndGetCompletedPodPhase(s, d)
} }
// WaitAndGetCompletedPodPhaseWithContext returns the configured error if set or prints
func (f *FailingKubeClient) WaitAndGetCompletedPodPhaseWithContext(ctx context.Context, s string) (v1.PodPhase, error) {
if f.WaitAndGetCompletedPodPhaseError != nil {
return v1.PodSucceeded, f.WaitAndGetCompletedPodPhaseError
}
return f.PrintingKubeClient.WaitAndGetCompletedPodPhaseWithContext(ctx, s)
}
// DeleteWithPropagationPolicy returns the configured error if set or prints // DeleteWithPropagationPolicy returns the configured error if set or prints
func (f *FailingKubeClient) DeleteWithPropagationPolicy(resources kube.ResourceList, policy metav1.DeletionPropagation) (*kube.Result, []error) { func (f *FailingKubeClient) DeleteWithPropagationPolicy(resources kube.ResourceList, policy metav1.DeletionPropagation) (*kube.Result, []error) {
if f.DeleteWithPropagationError != nil { if f.DeleteWithPropagationError != nil {
@ -149,6 +185,13 @@ func (f *FailingKubeClient) DeleteWithPropagationPolicy(resources kube.ResourceL
return f.PrintingKubeClient.DeleteWithPropagationPolicy(resources, policy) return f.PrintingKubeClient.DeleteWithPropagationPolicy(resources, policy)
} }
func (f *FailingKubeClient) WatchUntilReadyWithContext(ctx context.Context, resources kube.ResourceList) error {
if f.WatchUntilReadyError != nil {
return f.WatchUntilReadyError
}
return f.PrintingKubeClient.WatchUntilReady(resources, 0)
}
func createDummyResourceList() kube.ResourceList { func createDummyResourceList() kube.ResourceList {
var resInfo resource.Info var resInfo resource.Info
resInfo.Name = "dummyName" resInfo.Name = "dummyName"
@ -158,3 +201,10 @@ func createDummyResourceList() kube.ResourceList {
return resourceList return resourceList
} }
// compile time check that FailingKubeClient satiesfies our interfaces.
var (
_ kube.Interface = &FailingKubeClient{}
_ kube.ContextInterface = &FailingKubeClient{}
_ kube.InterfaceExt = &FailingKubeClient{}
)

@ -17,6 +17,7 @@ limitations under the License.
package fake package fake
import ( import (
"context"
"io" "io"
"strings" "strings"
"time" "time"
@ -62,16 +63,31 @@ func (p *PrintingKubeClient) Wait(resources kube.ResourceList, _ time.Duration)
return err return err
} }
func (p *PrintingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
}
func (p *PrintingKubeClient) WaitWithJobs(resources kube.ResourceList, _ time.Duration) error { func (p *PrintingKubeClient) WaitWithJobs(resources kube.ResourceList, _ time.Duration) error {
_, err := io.Copy(p.Out, bufferize(resources)) _, err := io.Copy(p.Out, bufferize(resources))
return err return err
} }
func (p *PrintingKubeClient) WaitWithJobsContext(ctx context.Context, resources kube.ResourceList) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
}
func (p *PrintingKubeClient) WaitForDelete(resources kube.ResourceList, _ time.Duration) error { func (p *PrintingKubeClient) WaitForDelete(resources kube.ResourceList, _ time.Duration) error {
_, err := io.Copy(p.Out, bufferize(resources)) _, err := io.Copy(p.Out, bufferize(resources))
return err return err
} }
func (p *PrintingKubeClient) WaitForDeleteWithContext(ctx context.Context, resources kube.ResourceList) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
}
// Delete implements KubeClient delete. // Delete implements KubeClient delete.
// //
// It only prints out the content to be deleted. // It only prints out the content to be deleted.
@ -89,6 +105,12 @@ func (p *PrintingKubeClient) WatchUntilReady(resources kube.ResourceList, _ time
return err return err
} }
// WatchUntilReadyWithContext implements KubeClient WatchUntilReadyWithContext.
func (p *PrintingKubeClient) WatchUntilReadyWithContext(ctx context.Context, resources kube.ResourceList) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
}
// Update implements KubeClient Update. // Update implements KubeClient Update.
func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ bool) (*kube.Result, error) { func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ bool) (*kube.Result, error) {
_, err := io.Copy(p.Out, bufferize(modified)) _, err := io.Copy(p.Out, bufferize(modified))
@ -116,6 +138,11 @@ func (p *PrintingKubeClient) WaitAndGetCompletedPodPhase(_ string, _ time.Durati
return v1.PodSucceeded, nil return v1.PodSucceeded, nil
} }
// WaitAndGetCompletedPodPhaseWithContext implements KubeClient WaitAndGetCompletedPodPhaseWithContext.
func (p *PrintingKubeClient) WaitAndGetCompletedPodPhaseWithContext(_ context.Context, _ string) (v1.PodPhase, error) {
return v1.PodSucceeded, nil
}
// DeleteWithPropagationPolicy implements KubeClient delete. // DeleteWithPropagationPolicy implements KubeClient delete.
// //
// It only prints out the content to be deleted. // It only prints out the content to be deleted.
@ -134,3 +161,10 @@ func bufferize(resources kube.ResourceList) io.Reader {
} }
return strings.NewReader(builder.String()) return strings.NewReader(builder.String())
} }
// compile time check that PrintingKubeClient satiesfies our interfaces.
var (
_ kube.Interface = &PrintingKubeClient{}
_ kube.ContextInterface = &PrintingKubeClient{}
_ kube.InterfaceExt = &PrintingKubeClient{}
)

@ -17,6 +17,7 @@ limitations under the License.
package kube package kube
import ( import (
"context"
"io" "io"
"time" "time"
@ -72,6 +73,31 @@ type Interface interface {
IsReachable() error IsReachable() error
} }
// ContextInterface is introduced to avoid breaking backwards compatability for Interface implementers.
//
// TODO Helm 4: Replace Interface methods that accept a time.Duration as an argument, with a context.
type ContextInterface interface {
// WaitWithContext waits till a ctx timeout for the specified resources to be ready.
WaitWithContext(ctx context.Context, resources ResourceList) error
// WaitWithJobsContext waits till a ctx timeout for the specified resources to be ready, including jobs.
WaitWithJobsContext(ctx context.Context, resources ResourceList) error
// WatchUntilReadyWithContext watches the resources given and waits until it is ready.
//
// This method is mainly for hook implementations. It watches for a resource to
// hit a particular milestone. The milestone depends on the Kind.
//
// For Jobs, "ready" means the Job ran to completion (exited without error).
// For Pods, "ready" means the Pod phase is marked "succeeded".
// For all other kinds, it means the kind was created or modified without
// error.
WatchUntilReadyWithContext(context.Context, ResourceList) error
// WaitAndGetCompletedPodPhaseWithContext waits up to a timeout until a pod enters a completed phase
// and returns said phase (PodSucceeded or PodFailed qualify).
WaitAndGetCompletedPodPhaseWithContext(context.Context, string) (v1.PodPhase, error)
// WaitForDeleteWithContext waits till a ctx timeout for the specified resources to be deleted.
WaitForDeleteWithContext(context.Context, ResourceList) error
}
// InterfaceExt is introduced to avoid breaking backwards compatibility for Interface implementers. // InterfaceExt is introduced to avoid breaking backwards compatibility for Interface implementers.
// //
// TODO Helm 4: Remove InterfaceExt and integrate its method(s) into the Interface. // TODO Helm 4: Remove InterfaceExt and integrate its method(s) into the Interface.
@ -112,5 +138,6 @@ type InterfaceResources interface {
var _ Interface = (*Client)(nil) var _ Interface = (*Client)(nil)
var _ InterfaceExt = (*Client)(nil) var _ InterfaceExt = (*Client)(nil)
var _ ContextInterface = (*Client)(nil)
var _ InterfaceDeletionPropagation = (*Client)(nil) var _ InterfaceDeletionPropagation = (*Client)(nil)
var _ InterfaceResources = (*Client)(nil) var _ InterfaceResources = (*Client)(nil)

@ -37,18 +37,26 @@ import (
) )
type waiter struct { type waiter struct {
c ReadyChecker c ReadyChecker
timeout time.Duration log func(string, ...interface{})
log func(string, ...interface{}) }
// timeFromCtx extracts time until deadline of ctx.
func timeFromCtx(ctx context.Context) string {
deadline, ok := ctx.Deadline()
// No deadline means context won't timeout
if !ok {
return "none"
}
return time.Until(deadline).String()
} }
// waitForResources polls to get the current status of all pods, PVCs, Services and // waitForResources polls to get the current status of all pods, PVCs, Services and
// Jobs(optional) until all are ready or a timeout is reached // Jobs(optional) until all are ready or a timeout is reached
func (w *waiter) waitForResources(created ResourceList) error { func (w *waiter) waitForResources(ctx context.Context, created ResourceList) error {
w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout)
ctx, cancel := context.WithTimeout(context.Background(), w.timeout) w.log("beginning wait for %d resources with timeout of %q", len(created), timeFromCtx(ctx))
defer cancel()
return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) {
for _, v := range created { for _, v := range created {
@ -62,11 +70,8 @@ func (w *waiter) waitForResources(created ResourceList) error {
} }
// waitForDeletedResources polls to check if all the resources are deleted or a timeout is reached // waitForDeletedResources polls to check if all the resources are deleted or a timeout is reached
func (w *waiter) waitForDeletedResources(deleted ResourceList) error { func (w *waiter) waitForDeletedResources(ctx context.Context, deleted ResourceList) error {
w.log("beginning wait for %d resources to be deleted with timeout of %v", len(deleted), w.timeout) w.log("beginning wait for %d resources to be deleted with timeout of %q", len(deleted), timeFromCtx(ctx))
ctx, cancel := context.WithTimeout(context.Background(), w.timeout)
defer cancel()
return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) {
for _, v := range deleted { for _, v := range deleted {

Loading…
Cancel
Save