update to get waiter instead of set

Signed-off-by: Austin Abro <AustinAbro321@gmail.com>
pull/13604/head
Austin Abro 6 months ago
parent d78b7e401a
commit 386523bdbc
No known key found for this signature in database
GPG Key ID: 92EB5159E403F9D6

@ -375,10 +375,7 @@ func (cfg *Configuration) recordRelease(r *release.Release) {
// Init initializes the action configuration // Init initializes the action configuration
func (cfg *Configuration) Init(getter genericclioptions.RESTClientGetter, namespace, helmDriver string, log DebugLog) error { func (cfg *Configuration) Init(getter genericclioptions.RESTClientGetter, namespace, helmDriver string, log DebugLog) error {
kc, err := kube.New(getter) kc := kube.New(getter)
if err != nil {
return err
}
kc.Log = log kc.Log = log
lazyClient := &lazyClient{ lazyClient := &lazyClient{

@ -35,7 +35,7 @@ import (
) )
// execHook executes all of the hooks for the given hook event. // execHook executes all of the hooks for the given hook event.
func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent, timeout time.Duration) error { func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent, waitStrategy kube.WaitStrategy, timeout time.Duration) error {
executingHooks := []*release.Hook{} executingHooks := []*release.Hook{}
for _, h := range rl.Hooks { for _, h := range rl.Hooks {
@ -59,7 +59,7 @@ func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent,
h.DeletePolicies = []release.HookDeletePolicy{release.HookBeforeHookCreation} h.DeletePolicies = []release.HookDeletePolicy{release.HookBeforeHookCreation}
} }
if err := cfg.deleteHookByPolicy(h, release.HookBeforeHookCreation, timeout); err != nil { if err := cfg.deleteHookByPolicy(h, release.HookBeforeHookCreation, waitStrategy, timeout); err != nil {
return err return err
} }
@ -87,8 +87,12 @@ func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent,
return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path) return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path)
} }
waiter, err := cfg.KubeClient.GetWaiter(waitStrategy)
if err != nil {
return errors.Wrapf(err, "unable to get waiter")
}
// Watch hook resources until they have completed // Watch hook resources until they have completed
err = cfg.KubeClient.WatchUntilReady(resources, timeout) err = waiter.WatchUntilReady(resources, timeout)
// Note the time of success/failure // Note the time of success/failure
h.LastRun.CompletedAt = helmtime.Now() h.LastRun.CompletedAt = helmtime.Now()
// Mark hook as succeeded or failed // Mark hook as succeeded or failed
@ -101,7 +105,7 @@ func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent,
} }
// If a hook is failed, check the annotation of the hook to determine whether the hook should be deleted // If a hook is failed, check the annotation of the hook to determine whether the hook should be deleted
// under failed condition. If so, then clear the corresponding resource object in the hook // under failed condition. If so, then clear the corresponding resource object in the hook
if errDeleting := cfg.deleteHookByPolicy(h, release.HookFailed, timeout); errDeleting != nil { if errDeleting := cfg.deleteHookByPolicy(h, release.HookFailed, waitStrategy, timeout); errDeleting != nil {
// We log the error here as we want to propagate the hook failure upwards to the release object. // We log the error here as we want to propagate the hook failure upwards to the release object.
log.Printf("error deleting the hook resource on hook failure: %v", errDeleting) log.Printf("error deleting the hook resource on hook failure: %v", errDeleting)
} }
@ -118,7 +122,7 @@ func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent,
// We log here as we still want to attempt hook resource deletion even if output logging fails. // We log here as we still want to attempt hook resource deletion even if output logging fails.
log.Printf("error outputting logs for hook failure: %v", err) log.Printf("error outputting logs for hook failure: %v", err)
} }
if err := cfg.deleteHookByPolicy(h, release.HookSucceeded, timeout); err != nil { if err := cfg.deleteHookByPolicy(h, release.HookSucceeded, waitStrategy, timeout); err != nil {
return err return err
} }
} }
@ -139,7 +143,7 @@ func (x hookByWeight) Less(i, j int) bool {
} }
// deleteHookByPolicy deletes a hook if the hook policy instructs it to // deleteHookByPolicy deletes a hook if the hook policy instructs it to
func (cfg *Configuration) deleteHookByPolicy(h *release.Hook, policy release.HookDeletePolicy, timeout time.Duration) error { func (cfg *Configuration) deleteHookByPolicy(h *release.Hook, policy release.HookDeletePolicy, waitStrategy kube.WaitStrategy, timeout time.Duration) error {
// Never delete CustomResourceDefinitions; this could cause lots of // Never delete CustomResourceDefinitions; this could cause lots of
// cascading garbage collection. // cascading garbage collection.
if h.Kind == "CustomResourceDefinition" { if h.Kind == "CustomResourceDefinition" {
@ -155,7 +159,11 @@ func (cfg *Configuration) deleteHookByPolicy(h *release.Hook, policy release.Hoo
return errors.New(joinErrors(errs)) return errors.New(joinErrors(errs))
} }
if err := cfg.KubeClient.WaitForDelete(resources, timeout); err != nil { waiter, err := cfg.KubeClient.GetWaiter(waitStrategy)
if err != nil {
return err
}
if err := waiter.WaitForDelete(resources, timeout); err != nil {
return err return err
} }
} }

@ -79,7 +79,7 @@ type Install struct {
HideSecret bool HideSecret bool
DisableHooks bool DisableHooks bool
Replace bool Replace bool
Wait kube.WaitStrategy WaitStrategy kube.WaitStrategy
WaitForJobs bool WaitForJobs bool
Devel bool Devel bool
DependencyUpdate bool DependencyUpdate bool
@ -180,8 +180,12 @@ func (i *Install) installCRDs(crds []chart.CRD) error {
totalItems = append(totalItems, res...) totalItems = append(totalItems, res...)
} }
if len(totalItems) > 0 { if len(totalItems) > 0 {
waiter, err := i.cfg.KubeClient.GetWaiter(i.WaitStrategy)
if err != nil {
return errors.Wrapf(err, "unable to get waiter")
}
// Give time for the CRD to be recognized. // Give time for the CRD to be recognized.
if err := i.cfg.KubeClient.Wait(totalItems, 60*time.Second); err != nil { if err := waiter.Wait(totalItems, 60*time.Second); err != nil {
return err return err
} }
@ -289,11 +293,8 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma
// Make sure if Atomic is set, that wait is set as well. This makes it so // Make sure if Atomic is set, that wait is set as well. This makes it so
// the user doesn't have to specify both // the user doesn't have to specify both
if i.Wait == kube.HookOnlyStrategy && i.Atomic { if i.WaitStrategy == kube.HookOnlyStrategy && i.Atomic {
i.Wait = kube.StatusWatcherStrategy i.WaitStrategy = kube.StatusWatcherStrategy
}
if err := i.cfg.KubeClient.SetWaiter(i.Wait); err != nil {
return nil, fmt.Errorf("failed to set kube client waiter: %w", err)
} }
caps, err := i.cfg.getCapabilities() caps, err := i.cfg.getCapabilities()
@ -453,7 +454,7 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource
var err error var err error
// pre-install hooks // pre-install hooks
if !i.DisableHooks { if !i.DisableHooks {
if err := i.cfg.execHook(rel, release.HookPreInstall, i.Timeout); err != nil { if err := i.cfg.execHook(rel, release.HookPreInstall, i.WaitStrategy, i.Timeout); err != nil {
return rel, fmt.Errorf("failed pre-install: %s", err) return rel, fmt.Errorf("failed pre-install: %s", err)
} }
} }
@ -470,17 +471,22 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource
return rel, err return rel, err
} }
waiter, err := i.cfg.KubeClient.GetWaiter(i.WaitStrategy)
if err != nil {
return rel, fmt.Errorf("failed to get waiter: %w", err)
}
if i.WaitForJobs { if i.WaitForJobs {
err = i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout) err = waiter.WaitWithJobs(resources, i.Timeout)
} else { } else {
err = i.cfg.KubeClient.Wait(resources, i.Timeout) err = waiter.Wait(resources, i.Timeout)
} }
if err != nil { if err != nil {
return rel, err return rel, err
} }
if !i.DisableHooks { if !i.DisableHooks {
if err := i.cfg.execHook(rel, release.HookPostInstall, i.Timeout); err != nil { if err := i.cfg.execHook(rel, release.HookPostInstall, i.WaitStrategy, i.Timeout); err != nil {
return rel, fmt.Errorf("failed post-install: %s", err) return rel, fmt.Errorf("failed post-install: %s", err)
} }
} }

@ -412,7 +412,7 @@ func TestInstallRelease_Wait(t *testing.T) {
failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitError = fmt.Errorf("I timed out") failer.WaitError = fmt.Errorf("I timed out")
instAction.cfg.KubeClient = failer instAction.cfg.KubeClient = failer
instAction.Wait = kube.StatusWatcherStrategy instAction.WaitStrategy = kube.StatusWatcherStrategy
vals := map[string]interface{}{} vals := map[string]interface{}{}
goroutines := runtime.NumGoroutine() goroutines := runtime.NumGoroutine()
@ -431,7 +431,7 @@ func TestInstallRelease_Wait_Interrupted(t *testing.T) {
failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitDuration = 10 * time.Second failer.WaitDuration = 10 * time.Second
instAction.cfg.KubeClient = failer instAction.cfg.KubeClient = failer
instAction.Wait = kube.StatusWatcherStrategy instAction.WaitStrategy = kube.StatusWatcherStrategy
vals := map[string]interface{}{} vals := map[string]interface{}{}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -454,7 +454,7 @@ func TestInstallRelease_WaitForJobs(t *testing.T) {
failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitError = fmt.Errorf("I timed out") failer.WaitError = fmt.Errorf("I timed out")
instAction.cfg.KubeClient = failer instAction.cfg.KubeClient = failer
instAction.Wait = kube.StatusWatcherStrategy instAction.WaitStrategy = kube.StatusWatcherStrategy
instAction.WaitForJobs = true instAction.WaitForJobs = true
vals := map[string]interface{}{} vals := map[string]interface{}{}

@ -28,6 +28,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
chartutil "helm.sh/helm/v4/pkg/chart/v2/util" chartutil "helm.sh/helm/v4/pkg/chart/v2/util"
"helm.sh/helm/v4/pkg/kube"
release "helm.sh/helm/v4/pkg/release/v1" release "helm.sh/helm/v4/pkg/release/v1"
) )
@ -96,7 +97,7 @@ func (r *ReleaseTesting) Run(name string) (*release.Release, error) {
rel.Hooks = executingHooks rel.Hooks = executingHooks
} }
if err := r.cfg.execHook(rel, release.HookTest, r.Timeout); err != nil { if err := r.cfg.execHook(rel, release.HookTest, kube.StatusWatcherStrategy, r.Timeout); err != nil {
rel.Hooks = append(skippedHooks, rel.Hooks...) rel.Hooks = append(skippedHooks, rel.Hooks...)
r.cfg.Releases.Update(rel) r.cfg.Releases.Update(rel)
return rel, err return rel, err

@ -38,7 +38,7 @@ type Rollback struct {
Version int Version int
Timeout time.Duration Timeout time.Duration
Wait kube.WaitStrategy WaitStrategy kube.WaitStrategy
WaitForJobs bool WaitForJobs bool
DisableHooks bool DisableHooks bool
DryRun bool DryRun bool
@ -61,10 +61,6 @@ func (r *Rollback) Run(name string) error {
return err return err
} }
if err := r.cfg.KubeClient.SetWaiter(r.Wait); err != nil {
return fmt.Errorf("failed to set kube client waiter: %w", err)
}
r.cfg.Releases.MaxHistory = r.MaxHistory r.cfg.Releases.MaxHistory = r.MaxHistory
r.cfg.Log("preparing rollback of %s", name) r.cfg.Log("preparing rollback of %s", name)
@ -181,7 +177,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
// pre-rollback hooks // pre-rollback hooks
if !r.DisableHooks { if !r.DisableHooks {
if err := r.cfg.execHook(targetRelease, release.HookPreRollback, r.Timeout); err != nil { if err := r.cfg.execHook(targetRelease, release.HookPreRollback, r.WaitStrategy, r.Timeout); err != nil {
return targetRelease, err return targetRelease, err
} }
} else { } else {
@ -227,16 +223,19 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
r.cfg.Log(err.Error()) r.cfg.Log(err.Error())
} }
} }
waiter, err := r.cfg.KubeClient.GetWaiter(r.WaitStrategy)
if err != nil {
return nil, errors.Wrap(err, "unable to set metadata visitor from target release")
}
if r.WaitForJobs { if r.WaitForJobs {
if err := r.cfg.KubeClient.WaitWithJobs(target, r.Timeout); err != nil { if err := waiter.WaitWithJobs(target, r.Timeout); err != nil {
targetRelease.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", targetRelease.Name, err.Error())) targetRelease.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", targetRelease.Name, err.Error()))
r.cfg.recordRelease(currentRelease) r.cfg.recordRelease(currentRelease)
r.cfg.recordRelease(targetRelease) r.cfg.recordRelease(targetRelease)
return targetRelease, errors.Wrapf(err, "release %s failed", targetRelease.Name) return targetRelease, errors.Wrapf(err, "release %s failed", targetRelease.Name)
} }
} else { } else {
if err := r.cfg.KubeClient.Wait(target, r.Timeout); err != nil { if err := waiter.Wait(target, r.Timeout); err != nil {
targetRelease.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", targetRelease.Name, err.Error())) targetRelease.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", targetRelease.Name, err.Error()))
r.cfg.recordRelease(currentRelease) r.cfg.recordRelease(currentRelease)
r.cfg.recordRelease(targetRelease) r.cfg.recordRelease(targetRelease)
@ -246,7 +245,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
// post-rollback hooks // post-rollback hooks
if !r.DisableHooks { if !r.DisableHooks {
if err := r.cfg.execHook(targetRelease, release.HookPostRollback, r.Timeout); err != nil { if err := r.cfg.execHook(targetRelease, release.HookPostRollback, r.WaitStrategy, r.Timeout); err != nil {
return targetRelease, err return targetRelease, err
} }
} }

@ -17,7 +17,6 @@ limitations under the License.
package action package action
import ( import (
"fmt"
"strings" "strings"
"time" "time"
@ -42,7 +41,7 @@ type Uninstall struct {
DryRun bool DryRun bool
IgnoreNotFound bool IgnoreNotFound bool
KeepHistory bool KeepHistory bool
Wait kube.WaitStrategy WaitStrategy kube.WaitStrategy
DeletionPropagation string DeletionPropagation string
Timeout time.Duration Timeout time.Duration
Description string Description string
@ -61,8 +60,9 @@ func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error)
return nil, err return nil, err
} }
if err := u.cfg.KubeClient.SetWaiter(u.Wait); err != nil { waiter, err := u.cfg.KubeClient.GetWaiter(u.WaitStrategy)
return nil, fmt.Errorf("failed to set kube client waiter: %w", err) if err != nil {
return nil, err
} }
if u.DryRun { if u.DryRun {
@ -111,7 +111,7 @@ func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error)
res := &release.UninstallReleaseResponse{Release: rel} res := &release.UninstallReleaseResponse{Release: rel}
if !u.DisableHooks { if !u.DisableHooks {
if err := u.cfg.execHook(rel, release.HookPreDelete, u.Timeout); err != nil { if err := u.cfg.execHook(rel, release.HookPreDelete, u.WaitStrategy, u.Timeout); err != nil {
return res, err return res, err
} }
} else { } else {
@ -135,12 +135,12 @@ func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error)
} }
res.Info = kept res.Info = kept
if err := u.cfg.KubeClient.WaitForDelete(deletedResources, u.Timeout); err != nil { if err := waiter.WaitForDelete(deletedResources, u.Timeout); err != nil {
errs = append(errs, err) errs = append(errs, err)
} }
if !u.DisableHooks { if !u.DisableHooks {
if err := u.cfg.execHook(rel, release.HookPostDelete, u.Timeout); err != nil { if err := u.cfg.execHook(rel, release.HookPostDelete, u.WaitStrategy, u.Timeout); err != nil {
errs = append(errs, err) errs = append(errs, err)
} }
} }

@ -83,7 +83,7 @@ func TestUninstallRelease_Wait(t *testing.T) {
unAction := uninstallAction(t) unAction := uninstallAction(t)
unAction.DisableHooks = true unAction.DisableHooks = true
unAction.DryRun = false unAction.DryRun = false
unAction.Wait = kube.StatusWatcherStrategy unAction.WaitStrategy = kube.StatusWatcherStrategy
rel := releaseStub() rel := releaseStub()
rel.Name = "come-fail-away" rel.Name = "come-fail-away"
@ -114,7 +114,7 @@ func TestUninstallRelease_Cascade(t *testing.T) {
unAction := uninstallAction(t) unAction := uninstallAction(t)
unAction.DisableHooks = true unAction.DisableHooks = true
unAction.DryRun = false unAction.DryRun = false
unAction.Wait = kube.HookOnlyStrategy unAction.WaitStrategy = kube.HookOnlyStrategy
unAction.DeletionPropagation = "foreground" unAction.DeletionPropagation = "foreground"
rel := releaseStub() rel := releaseStub()

@ -64,8 +64,8 @@ type Upgrade struct {
SkipCRDs bool SkipCRDs bool
// Timeout is the timeout for this operation // Timeout is the timeout for this operation
Timeout time.Duration Timeout time.Duration
// Wait determines whether the wait operation should be performed and what type of wait. // WaitStrategy determines what type of waiting should be done
Wait kube.WaitStrategy WaitStrategy kube.WaitStrategy
// WaitForJobs determines whether the wait operation for the Jobs should be performed after the upgrade is requested. // WaitForJobs determines whether the wait operation for the Jobs should be performed after the upgrade is requested.
WaitForJobs bool WaitForJobs bool
// DisableHooks disables hook processing if set to true. // DisableHooks disables hook processing if set to true.
@ -155,11 +155,8 @@ func (u *Upgrade) RunWithContext(ctx context.Context, name string, chart *chart.
// Make sure if Atomic is set, that wait is set as well. This makes it so // Make sure if Atomic is set, that wait is set as well. This makes it so
// the user doesn't have to specify both // the user doesn't have to specify both
if u.Wait == kube.HookOnlyStrategy && u.Atomic { if u.WaitStrategy == kube.HookOnlyStrategy && u.Atomic {
u.Wait = kube.StatusWatcherStrategy u.WaitStrategy = kube.StatusWatcherStrategy
}
if err := u.cfg.KubeClient.SetWaiter(u.Wait); err != nil {
return nil, fmt.Errorf("failed to set kube client waiter: %w", err)
} }
if err := chartutil.ValidateReleaseName(name); err != nil { if err := chartutil.ValidateReleaseName(name); err != nil {
@ -423,7 +420,7 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele
// pre-upgrade hooks // pre-upgrade hooks
if !u.DisableHooks { if !u.DisableHooks {
if err := u.cfg.execHook(upgradedRelease, release.HookPreUpgrade, u.Timeout); err != nil { if err := u.cfg.execHook(upgradedRelease, release.HookPreUpgrade, u.WaitStrategy, u.Timeout); err != nil {
u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %s", err)) u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %s", err))
return return
} }
@ -447,15 +444,20 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele
u.cfg.Log(err.Error()) u.cfg.Log(err.Error())
} }
} }
waiter, err := u.cfg.KubeClient.GetWaiter(u.WaitStrategy)
if err != nil {
u.cfg.recordRelease(originalRelease)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)
return
}
if u.WaitForJobs { if u.WaitForJobs {
if err := u.cfg.KubeClient.WaitWithJobs(target, u.Timeout); err != nil { if err := waiter.WaitWithJobs(target, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease) u.cfg.recordRelease(originalRelease)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)
return return
} }
} else { } else {
if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil { if err := waiter.Wait(target, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease) u.cfg.recordRelease(originalRelease)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)
return return
@ -464,7 +466,7 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele
// post-upgrade hooks // post-upgrade hooks
if !u.DisableHooks { if !u.DisableHooks {
if err := u.cfg.execHook(upgradedRelease, release.HookPostUpgrade, u.Timeout); err != nil { if err := u.cfg.execHook(upgradedRelease, release.HookPostUpgrade, u.WaitStrategy, u.Timeout); err != nil {
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %s", err)) u.reportToPerformUpgrade(c, upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %s", err))
return return
} }
@ -526,13 +528,8 @@ func (u *Upgrade) failRelease(rel *release.Release, created kube.ResourceList, e
rollin := NewRollback(u.cfg) rollin := NewRollback(u.cfg)
rollin.Version = filteredHistory[0].Version rollin.Version = filteredHistory[0].Version
if u.Wait == kube.HookOnlyStrategy { if u.WaitStrategy == kube.HookOnlyStrategy {
rollin.Wait = kube.StatusWatcherStrategy rollin.WaitStrategy = kube.StatusWatcherStrategy
}
// TODO pretty sure this is unnecessary as the waiter is already set if atomic at the start of upgrade
werr := u.cfg.KubeClient.SetWaiter(u.Wait)
if werr != nil {
return rel, errors.Wrapf(herr, "an error occurred while creating the waiter. original upgrade error: %s", err)
} }
rollin.WaitForJobs = u.WaitForJobs rollin.WaitForJobs = u.WaitForJobs
rollin.DisableHooks = u.DisableHooks rollin.DisableHooks = u.DisableHooks

@ -53,7 +53,7 @@ func TestUpgradeRelease_Success(t *testing.T) {
rel.Info.Status = release.StatusDeployed rel.Info.Status = release.StatusDeployed
req.NoError(upAction.cfg.Releases.Create(rel)) req.NoError(upAction.cfg.Releases.Create(rel))
upAction.Wait = kube.StatusWatcherStrategy upAction.WaitStrategy = kube.StatusWatcherStrategy
vals := map[string]interface{}{} vals := map[string]interface{}{}
ctx, done := context.WithCancel(context.Background()) ctx, done := context.WithCancel(context.Background())
@ -83,7 +83,7 @@ func TestUpgradeRelease_Wait(t *testing.T) {
failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitError = fmt.Errorf("I timed out") failer.WaitError = fmt.Errorf("I timed out")
upAction.cfg.KubeClient = failer upAction.cfg.KubeClient = failer
upAction.Wait = kube.StatusWatcherStrategy upAction.WaitStrategy = kube.StatusWatcherStrategy
vals := map[string]interface{}{} vals := map[string]interface{}{}
res, err := upAction.Run(rel.Name, buildChart(), vals) res, err := upAction.Run(rel.Name, buildChart(), vals)
@ -105,7 +105,7 @@ func TestUpgradeRelease_WaitForJobs(t *testing.T) {
failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitError = fmt.Errorf("I timed out") failer.WaitError = fmt.Errorf("I timed out")
upAction.cfg.KubeClient = failer upAction.cfg.KubeClient = failer
upAction.Wait = kube.StatusWatcherStrategy upAction.WaitStrategy = kube.StatusWatcherStrategy
upAction.WaitForJobs = true upAction.WaitForJobs = true
vals := map[string]interface{}{} vals := map[string]interface{}{}
@ -129,7 +129,7 @@ func TestUpgradeRelease_CleanupOnFail(t *testing.T) {
failer.WaitError = fmt.Errorf("I timed out") failer.WaitError = fmt.Errorf("I timed out")
failer.DeleteError = fmt.Errorf("I tried to delete nil") failer.DeleteError = fmt.Errorf("I tried to delete nil")
upAction.cfg.KubeClient = failer upAction.cfg.KubeClient = failer
upAction.Wait = kube.StatusWatcherStrategy upAction.WaitStrategy = kube.StatusWatcherStrategy
upAction.CleanupOnFail = true upAction.CleanupOnFail = true
vals := map[string]interface{}{} vals := map[string]interface{}{}
@ -396,7 +396,7 @@ func TestUpgradeRelease_Interrupted_Wait(t *testing.T) {
failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitDuration = 10 * time.Second failer.WaitDuration = 10 * time.Second
upAction.cfg.KubeClient = failer upAction.cfg.KubeClient = failer
upAction.Wait = kube.StatusWatcherStrategy upAction.WaitStrategy = kube.StatusWatcherStrategy
vals := map[string]interface{}{} vals := map[string]interface{}{}
ctx := context.Background() ctx := context.Background()

@ -211,7 +211,7 @@ func addInstallFlags(cmd *cobra.Command, f *pflag.FlagSet, client *action.Instal
f.BoolVar(&client.TakeOwnership, "take-ownership", false, "if set, install will ignore the check for helm annotations and take ownership of the existing resources") f.BoolVar(&client.TakeOwnership, "take-ownership", false, "if set, install will ignore the check for helm annotations and take ownership of the existing resources")
addValueOptionsFlags(f, valueOpts) addValueOptionsFlags(f, valueOpts)
addChartPathOptionsFlags(f, &client.ChartPathOptions) addChartPathOptionsFlags(f, &client.ChartPathOptions)
AddWaitFlag(cmd, &client.Wait) AddWaitFlag(cmd, &client.WaitStrategy)
err := cmd.RegisterFlagCompletionFunc("version", func(_ *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { err := cmd.RegisterFlagCompletionFunc("version", func(_ *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
requiredArgs := 2 requiredArgs := 2

@ -84,7 +84,7 @@ func newRollbackCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
f.BoolVar(&client.WaitForJobs, "wait-for-jobs", false, "if set and --wait enabled, will wait until all Jobs have been completed before marking the release as successful. It will wait for as long as --timeout") f.BoolVar(&client.WaitForJobs, "wait-for-jobs", false, "if set and --wait enabled, will wait until all Jobs have been completed before marking the release as successful. It will wait for as long as --timeout")
f.BoolVar(&client.CleanupOnFail, "cleanup-on-fail", false, "allow deletion of new resources created in this rollback when rollback fails") f.BoolVar(&client.CleanupOnFail, "cleanup-on-fail", false, "allow deletion of new resources created in this rollback when rollback fails")
f.IntVar(&client.MaxHistory, "history-max", settings.MaxHistory, "limit the maximum number of revisions saved per release. Use 0 for no limit") f.IntVar(&client.MaxHistory, "history-max", settings.MaxHistory, "limit the maximum number of revisions saved per release. Use 0 for no limit")
AddWaitFlag(cmd, &client.Wait) AddWaitFlag(cmd, &client.WaitStrategy)
return cmd return cmd
} }

@ -79,7 +79,7 @@ func newUninstallCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
f.StringVar(&client.DeletionPropagation, "cascade", "background", "Must be \"background\", \"orphan\", or \"foreground\". Selects the deletion cascading strategy for the dependents. Defaults to background.") f.StringVar(&client.DeletionPropagation, "cascade", "background", "Must be \"background\", \"orphan\", or \"foreground\". Selects the deletion cascading strategy for the dependents. Defaults to background.")
f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)") f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)")
f.StringVar(&client.Description, "description", "", "add a custom description") f.StringVar(&client.Description, "description", "", "add a custom description")
AddWaitFlag(cmd, &client.Wait) AddWaitFlag(cmd, &client.WaitStrategy)
return cmd return cmd
} }

@ -136,7 +136,7 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
instClient.DisableHooks = client.DisableHooks instClient.DisableHooks = client.DisableHooks
instClient.SkipCRDs = client.SkipCRDs instClient.SkipCRDs = client.SkipCRDs
instClient.Timeout = client.Timeout instClient.Timeout = client.Timeout
instClient.Wait = client.Wait instClient.WaitStrategy = client.WaitStrategy
instClient.WaitForJobs = client.WaitForJobs instClient.WaitForJobs = client.WaitForJobs
instClient.Devel = client.Devel instClient.Devel = client.Devel
instClient.Namespace = client.Namespace instClient.Namespace = client.Namespace
@ -294,7 +294,7 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
addValueOptionsFlags(f, valueOpts) addValueOptionsFlags(f, valueOpts)
bindOutputFlag(cmd, &outfmt) bindOutputFlag(cmd, &outfmt)
bindPostRenderFlag(cmd, &client.PostRenderer) bindPostRenderFlag(cmd, &client.PostRenderer)
AddWaitFlag(cmd, &client.Wait) AddWaitFlag(cmd, &client.WaitStrategy)
err := cmd.RegisterFlagCompletionFunc("version", func(_ *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { err := cmd.RegisterFlagCompletionFunc("version", func(_ *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
if len(args) != 2 { if len(args) != 2 {

@ -125,7 +125,7 @@ func (c *Client) newStatusWatcher() (*statusWaiter, error) {
}, nil }, nil
} }
func (c *Client) newWaiter(strategy WaitStrategy) (Waiter, error) { func (c *Client) GetWaiter(strategy WaitStrategy) (Waiter, error) {
switch strategy { switch strategy {
case LegacyStrategy: case LegacyStrategy:
kc, err := c.Factory.KubernetesClientSet() kc, err := c.Factory.KubernetesClientSet()
@ -148,7 +148,7 @@ func (c *Client) newWaiter(strategy WaitStrategy) (Waiter, error) {
func (c *Client) SetWaiter(ws WaitStrategy) error { func (c *Client) SetWaiter(ws WaitStrategy) error {
var err error var err error
c.Waiter, err = c.newWaiter(ws) c.Waiter, err = c.GetWaiter(ws)
if err != nil { if err != nil {
return err return err
} }
@ -156,7 +156,7 @@ func (c *Client) SetWaiter(ws WaitStrategy) error {
} }
// New creates a new Client. // New creates a new Client.
func New(getter genericclioptions.RESTClientGetter) (*Client, error) { func New(getter genericclioptions.RESTClientGetter) *Client {
if getter == nil { if getter == nil {
getter = genericclioptions.NewConfigFlags(true) getter = genericclioptions.NewConfigFlags(true)
} }
@ -165,12 +165,7 @@ func New(getter genericclioptions.RESTClientGetter) (*Client, error) {
Factory: factory, Factory: factory,
Log: nopLogger, Log: nopLogger,
} }
var err error return c
c.Waiter, err = c.newWaiter(HookOnlyStrategy)
if err != nil {
return nil, err
}
return c, nil
} }
var nopLogger = func(_ string, _ ...interface{}) {} var nopLogger = func(_ string, _ ...interface{}) {}

@ -516,7 +516,7 @@ func TestWait(t *testing.T) {
}), }),
} }
var err error var err error
c.Waiter, err = c.newWaiter(LegacyStrategy) c.Waiter, err = c.GetWaiter(LegacyStrategy)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -573,7 +573,7 @@ func TestWaitJob(t *testing.T) {
}), }),
} }
var err error var err error
c.Waiter, err = c.newWaiter(LegacyStrategy) c.Waiter, err = c.GetWaiter(LegacyStrategy)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -632,7 +632,7 @@ func TestWaitDelete(t *testing.T) {
}), }),
} }
var err error var err error
c.Waiter, err = c.newWaiter(LegacyStrategy) c.Waiter, err = c.GetWaiter(LegacyStrategy)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -662,10 +662,7 @@ func TestWaitDelete(t *testing.T) {
func TestReal(t *testing.T) { func TestReal(t *testing.T) {
t.Skip("This is a live test, comment this line to run") t.Skip("This is a live test, comment this line to run")
c, err := New(nil) c := New(nil)
if err != nil {
t.Fatal(err)
}
resources, err := c.Build(strings.NewReader(guestbookManifest), false) resources, err := c.Build(strings.NewReader(guestbookManifest), false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -675,10 +672,7 @@ func TestReal(t *testing.T) {
} }
testSvcEndpointManifest := testServiceManifest + "\n---\n" + testEndpointManifest testSvcEndpointManifest := testServiceManifest + "\n---\n" + testEndpointManifest
c, err = New(nil) c = New(nil)
if err != nil {
t.Fatal(err)
}
resources, err = c.Build(strings.NewReader(testSvcEndpointManifest), false) resources, err = c.Build(strings.NewReader(testSvcEndpointManifest), false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

@ -35,19 +35,29 @@ type FailingKubeClient struct {
PrintingKubeClient PrintingKubeClient
CreateError error CreateError error
GetError error GetError error
WaitError error
WaitForDeleteError error
DeleteError error DeleteError error
DeleteWithPropagationError error DeleteWithPropagationError error
WatchUntilReadyError error
UpdateError error UpdateError error
BuildError error BuildError error
BuildTableError error BuildTableError error
BuildDummy bool BuildDummy bool
BuildUnstructuredError error BuildUnstructuredError error
WaitError error
WaitForDeleteError error
WatchUntilReadyError error
WaitDuration time.Duration WaitDuration time.Duration
} }
// FailingKubeWaiter implements kube.Waiter for testing purposes.
// It also has additional errors you can set to fail different functions, otherwise it delegates all its calls to `PrintingKubeWaiter`
type FailingKubeWaiter struct {
*PrintingKubeWaiter
waitError error
waitForDeleteError error
watchUntilReadyError error
waitDuration time.Duration
}
// Create returns the configured error if set or prints // Create returns the configured error if set or prints
func (f *FailingKubeClient) Create(resources kube.ResourceList) (*kube.Result, error) { func (f *FailingKubeClient) Create(resources kube.ResourceList) (*kube.Result, error) {
if f.CreateError != nil { if f.CreateError != nil {
@ -65,28 +75,28 @@ func (f *FailingKubeClient) Get(resources kube.ResourceList, related bool) (map[
} }
// Waits the amount of time defined on f.WaitDuration, then returns the configured error if set or prints. // Waits the amount of time defined on f.WaitDuration, then returns the configured error if set or prints.
func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration) error { func (f *FailingKubeWaiter) Wait(resources kube.ResourceList, d time.Duration) error {
time.Sleep(f.WaitDuration) time.Sleep(f.waitDuration)
if f.WaitError != nil { if f.waitError != nil {
return f.WaitError return f.waitError
} }
return f.PrintingKubeClient.Wait(resources, d) return f.PrintingKubeWaiter.Wait(resources, d)
} }
// WaitWithJobs returns the configured error if set or prints // WaitWithJobs returns the configured error if set or prints
func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Duration) error { func (f *FailingKubeWaiter) WaitWithJobs(resources kube.ResourceList, d time.Duration) error {
if f.WaitError != nil { if f.waitError != nil {
return f.WaitError return f.waitError
} }
return f.PrintingKubeClient.WaitWithJobs(resources, d) return f.PrintingKubeWaiter.WaitWithJobs(resources, d)
} }
// WaitForDelete returns the configured error if set or prints // WaitForDelete returns the configured error if set or prints
func (f *FailingKubeClient) WaitForDelete(resources kube.ResourceList, d time.Duration) error { func (f *FailingKubeWaiter) WaitForDelete(resources kube.ResourceList, d time.Duration) error {
if f.WaitForDeleteError != nil { if f.waitForDeleteError != nil {
return f.WaitForDeleteError return f.waitForDeleteError
} }
return f.PrintingKubeClient.WaitForDelete(resources, d) return f.PrintingKubeWaiter.WaitForDelete(resources, d)
} }
// Delete returns the configured error if set or prints // Delete returns the configured error if set or prints
@ -98,11 +108,11 @@ func (f *FailingKubeClient) Delete(resources kube.ResourceList) (*kube.Result, [
} }
// WatchUntilReady returns the configured error if set or prints // WatchUntilReady returns the configured error if set or prints
func (f *FailingKubeClient) WatchUntilReady(resources kube.ResourceList, d time.Duration) error { func (f *FailingKubeWaiter) WatchUntilReady(resources kube.ResourceList, d time.Duration) error {
if f.WatchUntilReadyError != nil { if f.watchUntilReadyError != nil {
return f.WatchUntilReadyError return f.watchUntilReadyError
} }
return f.PrintingKubeClient.WatchUntilReady(resources, d) return f.PrintingKubeWaiter.WatchUntilReady(resources, d)
} }
// Update returns the configured error if set or prints // Update returns the configured error if set or prints
@ -140,8 +150,16 @@ func (f *FailingKubeClient) DeleteWithPropagationPolicy(resources kube.ResourceL
return f.PrintingKubeClient.DeleteWithPropagationPolicy(resources, policy) return f.PrintingKubeClient.DeleteWithPropagationPolicy(resources, policy)
} }
func (f *FailingKubeClient) SetWaiter(_ kube.WaitStrategy) error { func (f *FailingKubeClient) GetWaiter(ws kube.WaitStrategy) (kube.Waiter, error) {
return nil waiter, _ := f.PrintingKubeClient.GetWaiter(ws)
printingKubeWaiter, _ := waiter.(*PrintingKubeWaiter)
return &FailingKubeWaiter{
PrintingKubeWaiter: printingKubeWaiter,
waitError: f.WaitError,
waitForDeleteError: f.WaitForDeleteError,
watchUntilReadyError: f.WatchUntilReadyError,
waitDuration: f.WaitDuration,
}, nil
} }
func createDummyResourceList() kube.ResourceList { func createDummyResourceList() kube.ResourceList {
@ -151,5 +169,4 @@ func createDummyResourceList() kube.ResourceList {
var resourceList kube.ResourceList var resourceList kube.ResourceList
resourceList.Append(&resInfo) resourceList.Append(&resInfo)
return resourceList return resourceList
} }

@ -37,6 +37,12 @@ type PrintingKubeClient struct {
LogOutput io.Writer LogOutput io.Writer
} }
// PrintingKubeWaiter implements kube.Waiter, but simply prints the reader to the given output
type PrintingKubeWaiter struct {
Out io.Writer
LogOutput io.Writer
}
// IsReachable checks if the cluster is reachable // IsReachable checks if the cluster is reachable
func (p *PrintingKubeClient) IsReachable() error { func (p *PrintingKubeClient) IsReachable() error {
return nil return nil
@ -59,17 +65,23 @@ func (p *PrintingKubeClient) Get(resources kube.ResourceList, _ bool) (map[strin
return make(map[string][]runtime.Object), nil return make(map[string][]runtime.Object), nil
} }
func (p *PrintingKubeClient) Wait(resources kube.ResourceList, _ time.Duration) error { func (p *PrintingKubeWaiter) Wait(resources kube.ResourceList, _ time.Duration) error {
_, err := io.Copy(p.Out, bufferize(resources)) _, err := io.Copy(p.Out, bufferize(resources))
return err return err
} }
func (p *PrintingKubeClient) WaitWithJobs(resources kube.ResourceList, _ time.Duration) error { func (p *PrintingKubeWaiter) WaitWithJobs(resources kube.ResourceList, _ time.Duration) error {
_, err := io.Copy(p.Out, bufferize(resources)) _, err := io.Copy(p.Out, bufferize(resources))
return err return err
} }
func (p *PrintingKubeClient) WaitForDelete(resources kube.ResourceList, _ time.Duration) error { func (p *PrintingKubeWaiter) WaitForDelete(resources kube.ResourceList, _ time.Duration) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
}
// WatchUntilReady implements KubeClient WatchUntilReady.
func (p *PrintingKubeWaiter) WatchUntilReady(resources kube.ResourceList, _ time.Duration) error {
_, err := io.Copy(p.Out, bufferize(resources)) _, err := io.Copy(p.Out, bufferize(resources))
return err return err
} }
@ -85,12 +97,6 @@ func (p *PrintingKubeClient) Delete(resources kube.ResourceList) (*kube.Result,
return &kube.Result{Deleted: resources}, nil return &kube.Result{Deleted: resources}, nil
} }
// WatchUntilReady implements KubeClient WatchUntilReady.
func (p *PrintingKubeClient) WatchUntilReady(resources kube.ResourceList, _ time.Duration) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
}
// Update implements KubeClient Update. // Update implements KubeClient Update.
func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ bool) (*kube.Result, error) { func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ bool) (*kube.Result, error) {
_, err := io.Copy(p.Out, bufferize(modified)) _, err := io.Copy(p.Out, bufferize(modified))
@ -140,8 +146,8 @@ func (p *PrintingKubeClient) DeleteWithPropagationPolicy(resources kube.Resource
return &kube.Result{Deleted: resources}, nil return &kube.Result{Deleted: resources}, nil
} }
func (p *PrintingKubeClient) SetWaiter(_ kube.WaitStrategy) error { func (p *PrintingKubeClient) GetWaiter(_ kube.WaitStrategy) (kube.Waiter, error) {
return nil return &PrintingKubeWaiter{Out: p.Out, LogOutput: p.LogOutput}, nil
} }
func bufferize(resources kube.ResourceList) io.Reader { func bufferize(resources kube.ResourceList) io.Reader {

@ -48,9 +48,9 @@ type Interface interface {
Build(reader io.Reader, validate bool) (ResourceList, error) Build(reader io.Reader, validate bool) (ResourceList, error)
// IsReachable checks whether the client is able to connect to the cluster. // IsReachable checks whether the client is able to connect to the cluster.
IsReachable() error IsReachable() error
// Set Waiter sets the Kube.Waiter
SetWaiter(ws WaitStrategy) error // Get Waiter gets the Kube.Waiter
Waiter GetWaiter(ws WaitStrategy) (Waiter, error)
} }
// Waiter defines methods related to waiting for resource states. // Waiter defines methods related to waiting for resource states.

Loading…
Cancel
Save