addressed PR feedback

Signed-off-by: Josh Rowley <joshlrowley@gmail.com>
pull/12109/head
Josh Rowley 2 years ago
parent 0f22ab8223
commit ab0f14fe2e

@ -19,6 +19,7 @@ import (
"bytes"
"context"
"sort"
"time"
"github.com/pkg/errors"
@ -28,7 +29,7 @@ import (
)
// execHook executes all of the hooks for the given hook event.
func (cfg *Configuration) execHook(ctx context.Context, rl *release.Release, hook release.HookEvent) error {
func (cfg *Configuration) execHook(timeout time.Duration, rl *release.Release, hook release.HookEvent) error {
executingHooks := []*release.Hook{}
for _, h := range rl.Hooks {
@ -81,12 +82,17 @@ func (cfg *Configuration) execHook(ctx context.Context, rl *release.Release, hoo
}
// Check if kube.Interface implementation satisfies kube.ContextInterface interface.
// If it doesn't log a warning and move on, nothing we can do.
// If not, fallback to time based watch to maintain backwards compatibility.
if kubeClient, ok := cfg.KubeClient.(kube.ContextInterface); ok {
// Helm 4 TODO: WatchUntilReady should be replaced with it's context
// aware counterpart.
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
// Watch hook resources until they have completed
err = kubeClient.WatchUntilReadyWithContext(ctx, resources)
} else {
cfg.Log("WARNING: kube.ContextInterface not satisfied")
//
err = cfg.KubeClient.WatchUntilReady(resources, timeout)
}
// Note the time of success/failure

@ -205,10 +205,7 @@ func (i *Install) installCRDs(crds []chart.CRD) error {
//
// If DryRun is set to true, this will prepare the release, but not install it
func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.Release, error) {
ctx, cancel := context.WithTimeout(context.TODO(), i.Timeout)
defer cancel()
return i.RunWithContext(ctx, chrt, vals)
return i.RunWithContext(context.TODO(), chrt, vals)
}
// Run executes the installation with Context
@ -376,8 +373,8 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma
func (i *Install) performInstall(ctx context.Context, rel *release.Release, toBeAdopted, resources kube.ResourceList) (*release.Release, error) {
// pre-install hooks
if !i.DisableHooks {
if err := i.cfg.execHook(ctx, rel, release.HookPreInstall); err != nil {
return i.failRelease(ctx, rel, fmt.Errorf("failed pre-install: %w", err))
if err := i.cfg.execHook(i.Timeout, rel, release.HookPreInstall); err != nil {
return i.failRelease(rel, fmt.Errorf("failed pre-install: %w", err))
}
}
@ -386,31 +383,42 @@ func (i *Install) performInstall(ctx context.Context, rel *release.Release, toBe
// to true, since that is basically an upgrade operation.
if len(toBeAdopted) == 0 && len(resources) > 0 {
if _, err := i.cfg.KubeClient.Create(resources); err != nil {
return i.failRelease(ctx, rel, err)
return i.failRelease(rel, err)
}
} else if len(resources) > 0 {
if _, err := i.cfg.KubeClient.Update(toBeAdopted, resources, i.Force); err != nil {
return i.failRelease(ctx, rel, err)
return i.failRelease(rel, err)
}
}
// Check if kube.Interface implementation, also satisfies kube.ContextInterface.
if i.Wait {
var err error
ctx, cancel := context.WithTimeout(ctx, i.Timeout)
defer cancel()
kubeClient, ok := i.cfg.KubeClient.(kube.ContextInterface)
if i.Wait && ok {
if i.WaitForJobs {
if err := kubeClient.WaitWithJobsContext(ctx, resources); err != nil {
return i.failRelease(ctx, rel, err)
}
} else {
if err := kubeClient.WaitWithContext(ctx, resources); err != nil {
return i.failRelease(ctx, rel, err)
// Helm 4 TODO: WaitWithJobs and Wait should be replaced with their context
// aware counterparts.
switch {
case ok && i.WaitForJobs:
err = kubeClient.WaitWithJobsContext(ctx, resources)
case ok && !i.WaitForJobs:
err = kubeClient.WaitWithContext(ctx, resources)
case i.WaitForJobs:
err = i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout)
case !i.WaitForJobs:
err = i.cfg.KubeClient.Wait(resources, i.Timeout)
}
if err != nil {
return i.failRelease(rel, err)
}
}
if !i.DisableHooks {
if err := i.cfg.execHook(ctx, rel, release.HookPostInstall); err != nil {
return i.failRelease(ctx, rel, fmt.Errorf("failed post-install: %w", err))
if err := i.cfg.execHook(i.Timeout, rel, release.HookPostInstall); err != nil {
return i.failRelease(rel, fmt.Errorf("failed post-install: %w", err))
}
}
@ -434,18 +442,16 @@ func (i *Install) performInstall(ctx context.Context, rel *release.Release, toBe
return rel, nil
}
func (i *Install) failRelease(ctx context.Context, rel *release.Release, err error) (*release.Release, error) {
func (i *Install) failRelease(rel *release.Release, err error) (*release.Release, error) {
rel.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", i.ReleaseName, err.Error()))
if i.Atomic {
i.cfg.Log("Install failed and atomic is set, uninstalling release")
uninstall := NewUninstall(i.cfg)
uninstall.DisableHooks = i.DisableHooks
uninstall.KeepHistory = false
// TODO: Not sure if a new ctx should be created by the timeout field,
// because Timeout will be replaced by contexts. Should a background ctx be used
// so we don't timeout while uninstalling?
uninstall.Timeout = i.Timeout
if _, uninstallErr := uninstall.RunWithContext(context.Background(), i.ReleaseName); uninstallErr != nil {
// Helm 4 TODO: Uninstalling needs to be handled properly on a failed atomic install.
if _, uninstallErr := uninstall.RunWithContext(context.TODO(), i.ReleaseName); uninstallErr != nil {
return rel, errors.Wrapf(uninstallErr, "an error occurred while uninstalling the release. original install error: %s", err)
}
return rel, errors.Wrapf(err, "release %s failed, and has been uninstalled due to atomic being set", i.ReleaseName)

@ -387,15 +387,13 @@ func TestInstallRelease_Wait_Interrupted(t *testing.T) {
instAction := installAction(t)
instAction.ReleaseName = "interrupted-release"
failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitError = context.Canceled
instAction.cfg.KubeClient = failer
instAction.Wait = true
vals := map[string]interface{}{}
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
cancel()
res, err := instAction.RunWithContext(context.Background(), buildChart(), vals)
res, err := instAction.RunWithContext(ctx, buildChart(), vals)
is.Error(err)
is.Contains(res.Info.Description, "Release \"interrupted-release\" failed: context canceled")
is.Equal(res.Info.Status, release.StatusFailed)
@ -464,15 +462,12 @@ func TestInstallRelease_Atomic_Interrupted(t *testing.T) {
instAction := installAction(t)
instAction.ReleaseName = "interrupted-release"
failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitError = context.Canceled
instAction.cfg.KubeClient = failer
instAction.Atomic = true
vals := map[string]interface{}{}
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
cancel()
res, err := instAction.RunWithContext(ctx, buildChart(), vals)
res, err := instAction.RunWithContext(context.Background(), buildChart(), vals)
is.Error(err)
is.Contains(err.Error(), "context canceled")
is.Contains(err.Error(), "atomic")

@ -87,7 +87,7 @@ func (r *ReleaseTesting) RunWithContext(ctx context.Context, name string) (*rele
rel.Hooks = executingHooks
}
if err := r.cfg.execHook(ctx, rel, release.HookTest); err != nil {
if err := r.cfg.execHook(r.Timeout, rel, release.HookTest); err != nil {
rel.Hooks = append(skippedHooks, rel.Hooks...)
r.cfg.Releases.Update(rel)
return rel, err
@ -99,10 +99,7 @@ func (r *ReleaseTesting) RunWithContext(ctx context.Context, name string) (*rele
// Run executes 'helm test' against the given release.
func (r *ReleaseTesting) Run(name string) (*release.Release, error) {
ctx, cancel := context.WithTimeout(context.TODO(), r.Timeout)
defer cancel()
return r.RunWithContext(ctx, name)
return r.RunWithContext(context.TODO(), name)
}
// GetPodLogs will write the logs for all test pods in the given release into

@ -91,10 +91,7 @@ func (r *Rollback) RunWithContext(ctx context.Context, name string) error {
// Run executes 'helm rollback' against the given release.
func (r *Rollback) Run(name string) error {
ctx, cancel := context.WithTimeout(context.TODO(), r.Timeout)
defer cancel()
return r.RunWithContext(ctx, name)
return r.RunWithContext(context.TODO(), name)
}
// prepareRollback finds the previous release and prepares a new release object with
@ -165,7 +162,7 @@ func (r *Rollback) performRollback(ctx context.Context, currentRelease, targetRe
// pre-rollback hooks
if !r.DisableHooks {
if err := r.cfg.execHook(ctx, targetRelease, release.HookPreRollback); err != nil {
if err := r.cfg.execHook(r.Timeout, targetRelease, release.HookPreRollback); err != nil {
return targetRelease, err
}
} else {
@ -232,7 +229,7 @@ func (r *Rollback) performRollback(ctx context.Context, currentRelease, targetRe
// post-rollback hooks
if !r.DisableHooks {
if err := r.cfg.execHook(ctx, targetRelease, release.HookPostRollback); err != nil {
if err := r.cfg.execHook(r.Timeout, targetRelease, release.HookPostRollback); err != nil {
return targetRelease, err
}
}

@ -109,7 +109,7 @@ func (u *Uninstall) RunWithContext(ctx context.Context, name string) (*release.U
res := &release.UninstallReleaseResponse{Release: rel}
if !u.DisableHooks {
if err := u.cfg.execHook(ctx, rel, release.HookPreDelete); err != nil {
if err := u.cfg.execHook(u.Timeout, rel, release.HookPreDelete); err != nil {
return res, err
}
} else {
@ -134,15 +134,25 @@ func (u *Uninstall) RunWithContext(ctx context.Context, name string) (*release.U
res.Info = kept
if u.Wait {
if kubeClient, ok := u.cfg.KubeClient.(kube.ContextInterface); ok {
if err := kubeClient.WaitForDeleteWithContext(ctx, deletedResources); err != nil {
errs = append(errs, err)
var err error
// Helm 4 TODO: WaitForDelete should be replaced with it's context
// aware counterpart.
switch kubeClient := u.cfg.KubeClient.(type) {
case kube.ContextInterface:
err = kubeClient.WaitForDeleteWithContext(ctx, deletedResources)
case kube.InterfaceExt:
err = kubeClient.WaitForDelete(deletedResources, u.Timeout)
default:
u.cfg.Log("WARNING: KubeClient does not satisfy ContextInterface, or InterfaceExt")
}
if err != nil {
errs = append(errs, err)
}
}
if !u.DisableHooks {
if err := u.cfg.execHook(ctx, rel, release.HookPostDelete); err != nil {
if err := u.cfg.execHook(u.Timeout, rel, release.HookPostDelete); err != nil {
errs = append(errs, err)
}
}

@ -341,7 +341,7 @@ func (u *Upgrade) releasingUpgrade(ctx context.Context, upgradedRelease *release
// pre-upgrade hooks
if !u.DisableHooks {
if err := u.cfg.execHook(ctx, upgradedRelease, release.HookPreUpgrade); err != nil {
if err := u.cfg.execHook(u.Timeout, upgradedRelease, release.HookPreUpgrade); err != nil {
return u.failRelease(upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %w", err))
}
} else {
@ -364,27 +364,38 @@ func (u *Upgrade) releasingUpgrade(ctx context.Context, upgradedRelease *release
}
}
kubeClient, ok := u.cfg.KubeClient.(kube.ContextInterface)
if u.Wait && ok {
if u.Wait {
u.cfg.Log(
"waiting for release %s resources (created: %d updated: %d deleted: %d)",
upgradedRelease.Name, len(results.Created), len(results.Updated), len(results.Deleted))
if u.WaitForJobs {
if err := kubeClient.WaitWithJobsContext(ctx, target); err != nil {
u.cfg.recordRelease(originalRelease)
return u.failRelease(upgradedRelease, results.Created, err)
var err error
ctx, cancel := context.WithTimeout(ctx, u.Timeout)
defer cancel()
kubeClient, ok := u.cfg.KubeClient.(kube.ContextInterface)
// Helm 4 TODO: WaitWithJobs and Wait should be replaced with their context
// aware counterparts.
switch {
case ok && u.WaitForJobs:
err = kubeClient.WaitWithJobsContext(ctx, target)
case ok && !u.WaitForJobs:
err = kubeClient.WaitWithContext(ctx, target)
case u.WaitForJobs:
err = u.cfg.KubeClient.WaitWithJobs(target, u.Timeout)
case !u.WaitForJobs:
err = u.cfg.KubeClient.Wait(target, u.Timeout)
}
} else {
if err := kubeClient.WaitWithContext(ctx, target); err != nil {
if err != nil {
u.cfg.recordRelease(originalRelease)
return u.failRelease(upgradedRelease, results.Created, err)
}
}
}
// post-upgrade hooks
if !u.DisableHooks {
if err := u.cfg.execHook(ctx, upgradedRelease, release.HookPostUpgrade); err != nil {
if err := u.cfg.execHook(u.Timeout, upgradedRelease, release.HookPostUpgrade); err != nil {
return u.failRelease(upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %w", err))
}
}

@ -357,15 +357,12 @@ func TestUpgradeRelease_Interrupted_Wait(t *testing.T) {
upAction.cfg.Releases.Create(rel)
failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitError = context.Canceled
upAction.cfg.KubeClient = failer
upAction.Wait = true
vals := map[string]interface{}{}
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
cancel()
res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals)
res, err := upAction.RunWithContext(context.Background(), rel.Name, buildChart(), vals)
req.Error(err)
is.Contains(res.Info.Description, "Upgrade \"interrupted-release\" failed: context canceled")
@ -385,15 +382,17 @@ func TestUpgradeRelease_Interrupted_Atomic(t *testing.T) {
upAction.cfg.Releases.Create(rel)
failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitError = context.Canceled
failer.WaitDuration = 2 * time.Second
upAction.cfg.KubeClient = failer
upAction.Atomic = true
vals := map[string]interface{}{}
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
cancel()
// After the first Wait error, error needs to be set nil
// so atomic cleanup passes.
time.AfterFunc(failer.WaitDuration, func() { failer.WaitError = nil })
res, err := upAction.RunWithContext(ctx, rel.Name, buildChart(), vals)
res, err := upAction.RunWithContext(context.Background(), rel.Name, buildChart(), vals)
req.Error(err)
is.Contains(err.Error(), "release interrupted-release failed, and has been rolled back due to atomic being set: context canceled")

@ -739,7 +739,7 @@ func (c *Client) watchUntilReady(ctx context.Context, info *resource.Info) error
return nil
}
c.Log("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeFromCtx(ctx))
c.Log("Watching for changes to %s %s with timeout of %q", kind, info.Name, timeFromCtx(ctx))
// Use a selector on the name of the resource. This should be unique for the
// given version and kind

@ -77,14 +77,11 @@ func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration) e
// Waits the amount of time defined on f.WaitDuration, then returns the configured error if set or prints.
func (f *FailingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList) error {
time.Sleep(f.WaitDuration)
if f.WaitError != nil {
return f.WaitError
}
if err := ctx.Err(); err != nil {
return err
}
return f.PrintingKubeClient.WaitWithContext(ctx, resources)
}
@ -101,11 +98,6 @@ func (f *FailingKubeClient) WaitWithJobsContext(ctx context.Context, resources k
if f.WaitError != nil {
return f.WaitError
}
if err := ctx.Err(); err != nil {
return err
}
return f.PrintingKubeClient.WaitWithJobsContext(ctx, resources)
}
@ -122,11 +114,6 @@ func (f *FailingKubeClient) WaitForDeleteWithContext(ctx context.Context, resour
if f.WaitError != nil {
return f.WaitError
}
if err := ctx.Err(); err != nil {
return err
}
return f.PrintingKubeClient.WaitForDeleteWithContext(ctx, resources)
}
@ -202,11 +189,6 @@ func (f *FailingKubeClient) WatchUntilReadyWithContext(ctx context.Context, reso
if f.WatchUntilReadyError != nil {
return f.WatchUntilReadyError
}
if err := ctx.Err(); err != nil {
return err
}
return f.PrintingKubeClient.WatchUntilReady(resources, 0)
}

@ -42,17 +42,21 @@ type waiter struct {
}
// timeFromCtx extracts time until deadline of ctx.
func timeFromCtx(ctx context.Context) time.Duration {
deadline, _ := ctx.Deadline()
func timeFromCtx(ctx context.Context) string {
deadline, ok := ctx.Deadline()
// No deadline means context won't timeout
if !ok {
return "none"
}
return time.Until(deadline)
return time.Until(deadline).String()
}
// waitForResources polls to get the current status of all pods, PVCs, Services and
// Jobs(optional) until all are ready or a timeout is reached
func (w *waiter) waitForResources(ctx context.Context, created ResourceList) error {
w.log("beginning wait for %d resources with timeout of %v", len(created), timeFromCtx(ctx))
w.log("beginning wait for %d resources with timeout of %q", len(created), timeFromCtx(ctx))
return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) {
for _, v := range created {
@ -67,7 +71,7 @@ func (w *waiter) waitForResources(ctx context.Context, created ResourceList) err
// waitForDeletedResources polls to check if all the resources are deleted or a timeout is reached
func (w *waiter) waitForDeletedResources(ctx context.Context, deleted ResourceList) error {
w.log("beginning wait for %d resources to be deleted with timeout of %v", len(deleted), timeFromCtx(ctx))
w.log("beginning wait for %d resources to be deleted with timeout of %q", len(deleted), timeFromCtx(ctx))
return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) {
for _, v := range deleted {

Loading…
Cancel
Save