diff --git a/cmd/helm/install.go b/cmd/helm/install.go index 6ffc968ce..ae68be121 100644 --- a/cmd/helm/install.go +++ b/cmd/helm/install.go @@ -167,6 +167,7 @@ func addInstallFlags(cmd *cobra.Command, f *pflag.FlagSet, client *action.Instal f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)") f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout") f.BoolVar(&client.WaitForJobs, "wait-for-jobs", false, "if set and --wait enabled, will wait until all Jobs have been completed before marking the release as successful. It will wait for as long as --timeout") + f.IntVar(&client.WaitRetries, "wait-retries", 0, "if set and --wait enabled, will retry any failed check on resource state subject to the specified number of retries") f.BoolVarP(&client.GenerateName, "generate-name", "g", false, "generate the name (and omit the NAME parameter)") f.StringVar(&client.NameTemplate, "name-template", "", "specify template used to name the release") f.StringVar(&client.Description, "description", "", "add a custom description") diff --git a/cmd/helm/upgrade.go b/cmd/helm/upgrade.go index 145d342b7..8b7634619 100644 --- a/cmd/helm/upgrade.go +++ b/cmd/helm/upgrade.go @@ -117,6 +117,7 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command { instClient.Timeout = client.Timeout instClient.Wait = client.Wait instClient.WaitForJobs = client.WaitForJobs + instClient.WaitRetries = client.WaitRetries instClient.Devel = client.Devel instClient.Namespace = client.Namespace instClient.Atomic = client.Atomic @@ -233,6 +234,7 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command { f.BoolVar(&client.ReuseValues, "reuse-values", false, "when upgrading, reuse the last release's values and merge in any overrides from the command line via --set and -f. If '--reset-values' is specified, this is ignored") f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout") f.BoolVar(&client.WaitForJobs, "wait-for-jobs", false, "if set and --wait enabled, will wait until all Jobs have been completed before marking the release as successful. It will wait for as long as --timeout") + f.IntVar(&client.WaitRetries, "wait-retries", 0, "if set and --wait enabled, will retry any failed check on resource state, except if HTTP status code < 500 is received, subject to the specified number of retries") f.BoolVar(&client.Atomic, "atomic", false, "if set, upgrade process rolls back changes made in case of failed upgrade. The --wait flag will be set automatically if --atomic is used") f.IntVar(&client.MaxHistory, "history-max", settings.MaxHistory, "limit the maximum number of revisions saved per release. Use 0 for no limit") f.BoolVar(&client.CleanupOnFail, "cleanup-on-fail", false, "allow deletion of new resources created in this upgrade when upgrade fails") diff --git a/pkg/action/install.go b/pkg/action/install.go index 5fb7e6736..63abc7dbd 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -77,6 +77,7 @@ type Install struct { Replace bool Wait bool WaitForJobs bool + WaitRetries int Devel bool DependencyUpdate bool Timeout time.Duration @@ -413,17 +414,24 @@ func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, t } if i.Wait { + var err error if i.WaitForJobs { - if err := i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout); err != nil { - i.reportToRun(c, rel, err) - return + if kubeClient, ok := i.cfg.KubeClient.(kube.InterfaceWithRetry); ok { + err = kubeClient.WaitWithJobsWithRetry(resources, i.Timeout, i.WaitRetries) + } else { + err = i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout) } } else { - if err := i.cfg.KubeClient.Wait(resources, i.Timeout); err != nil { - i.reportToRun(c, rel, err) - return + if kubeClient, ok := i.cfg.KubeClient.(kube.InterfaceWithRetry); ok { + err = kubeClient.WaitWithRetry(resources, i.Timeout, i.WaitRetries) + } else { + err = i.cfg.KubeClient.Wait(resources, i.Timeout) } } + if err != nil { + i.reportToRun(c, rel, err) + return + } } if !i.DisableHooks { diff --git a/pkg/action/install_test.go b/pkg/action/install_test.go index dd0cdb54d..438f12439 100644 --- a/pkg/action/install_test.go +++ b/pkg/action/install_test.go @@ -393,6 +393,22 @@ func TestInstallRelease_Wait_Interrupted(t *testing.T) { is.Contains(res.Info.Description, "Release \"interrupted-release\" failed: context canceled") is.Equal(res.Info.Status, release.StatusFailed) } +func TestInstallRelease_Wait_With_Retries(t *testing.T) { + is := assert.New(t) + instAction := installAction(t) + instAction.ReleaseName = "come-fail-away" + failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + failer.WaitError = fmt.Errorf("I timed out") + instAction.cfg.KubeClient = failer + instAction.Wait = true + instAction.WaitRetries = 2 + vals := map[string]interface{}{} + + res, err := instAction.Run(buildChart(), vals) + is.Error(err) + is.Contains(res.Info.Description, "I timed out") + is.Equal(res.Info.Status, release.StatusFailed) +} func TestInstallRelease_WaitForJobs(t *testing.T) { is := assert.New(t) instAction := installAction(t) diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index 784df4013..a35fe90ff 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -68,6 +68,8 @@ type Upgrade struct { Wait bool // WaitForJobs determines whether the wait operation for the Jobs should be performed after the upgrade is requested. WaitForJobs bool + // WaitRetries determines whether any failed resource state checks will be retried during the wait operation. + WaitRetries int // DisableHooks disables hook processing if set to true. DisableHooks bool // DryRun controls whether the operation is prepared, but not executed. @@ -402,19 +404,25 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele u.cfg.Log( "waiting for release %s resources (created: %d updated: %d deleted: %d)", upgradedRelease.Name, len(results.Created), len(results.Updated), len(results.Deleted)) + var err error if u.WaitForJobs { - if err := u.cfg.KubeClient.WaitWithJobs(target, u.Timeout); err != nil { - u.cfg.recordRelease(originalRelease) - u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) - return + if kubeClient, ok := u.cfg.KubeClient.(kube.InterfaceWithRetry); ok { + err = kubeClient.WaitWithJobsWithRetry(target, u.Timeout, u.WaitRetries) + } else { + err = u.cfg.KubeClient.WaitWithJobs(target, u.Timeout) } } else { - if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil { - u.cfg.recordRelease(originalRelease) - u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) - return + if kubeClient, ok := u.cfg.KubeClient.(kube.InterfaceWithRetry); ok { + err = kubeClient.WaitWithRetry(target, u.Timeout, u.WaitRetries) + } else { + err = u.cfg.KubeClient.Wait(target, u.Timeout) } } + if err != nil { + u.cfg.recordRelease(originalRelease) + u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) + return + } } // post-upgrade hooks diff --git a/pkg/action/upgrade_test.go b/pkg/action/upgrade_test.go index 62922b373..62e4fa411 100644 --- a/pkg/action/upgrade_test.go +++ b/pkg/action/upgrade_test.go @@ -88,6 +88,28 @@ func TestUpgradeRelease_Wait(t *testing.T) { is.Contains(res.Info.Description, "I timed out") is.Equal(res.Info.Status, release.StatusFailed) } +func TestUpgradeRelease_Wait_With_Retries(t *testing.T) { + is := assert.New(t) + req := require.New(t) + + upAction := upgradeAction(t) + rel := releaseStub() + rel.Name = "come-fail-away" + rel.Info.Status = release.StatusDeployed + upAction.cfg.Releases.Create(rel) + + failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + failer.WaitError = fmt.Errorf("I timed out") + upAction.cfg.KubeClient = failer + upAction.Wait = true + upAction.WaitRetries = 2 + vals := map[string]interface{}{} + + res, err := upAction.Run(rel.Name, buildChart(), vals) + req.Error(err) + is.Contains(res.Info.Description, "I timed out") + is.Equal(res.Info.Status, release.StatusFailed) +} func TestUpgradeRelease_WaitForJobs(t *testing.T) { is := assert.New(t) @@ -112,6 +134,30 @@ func TestUpgradeRelease_WaitForJobs(t *testing.T) { is.Equal(res.Info.Status, release.StatusFailed) } +func TestUpgradeRelease_WaitForJobs_With_Retries(t *testing.T) { + is := assert.New(t) + req := require.New(t) + + upAction := upgradeAction(t) + rel := releaseStub() + rel.Name = "come-fail-away" + rel.Info.Status = release.StatusDeployed + upAction.cfg.Releases.Create(rel) + + failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + failer.WaitError = fmt.Errorf("I timed out") + upAction.cfg.KubeClient = failer + upAction.Wait = true + upAction.WaitForJobs = true + upAction.WaitRetries = 2 + vals := map[string]interface{}{} + + res, err := upAction.Run(rel.Name, buildChart(), vals) + req.Error(err) + is.Contains(res.Info.Description, "I timed out") + is.Equal(res.Info.Status, release.StatusFailed) +} + func TestUpgradeRelease_CleanupOnFail(t *testing.T) { is := assert.New(t) req := require.New(t) diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 7b3c803f9..a2c1a7a40 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -275,6 +275,13 @@ func getResource(info *resource.Info) (runtime.Object, error) { // Wait waits up to the given timeout for the specified resources to be ready. func (c *Client) Wait(resources ResourceList, timeout time.Duration) error { + return c.WaitWithRetry(resources, timeout, 0) +} + +// WaitWithRetry waits up to the given timeout for the specified resources to be ready. If an error +// is encountered when checking on the status of a resource then retries will be performed subject +// to the given maximum number of retries +func (c *Client) WaitWithRetry(resources ResourceList, timeout time.Duration, waitRetries int) error { cs, err := c.getKubeClient() if err != nil { return err @@ -285,11 +292,18 @@ func (c *Client) Wait(resources ResourceList, timeout time.Duration) error { log: c.Log, timeout: timeout, } - return w.waitForResources(resources) + return w.waitForResources(resources, waitRetries) } // WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs. func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) error { + return c.WaitWithJobsWithRetry(resources, timeout, 0) +} + +// WaitWithJobsWithRetry waits up to the given timeout for the specified resources to be ready, including jobs. +// If an error is encountered when checking on the status of a resource then retries will be performed subject +// to the given maximum number of retries +func (c *Client) WaitWithJobsWithRetry(resources ResourceList, timeout time.Duration, waitRetries int) error { cs, err := c.getKubeClient() if err != nil { return err @@ -300,7 +314,7 @@ func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) err log: c.Log, timeout: timeout, } - return w.waitForResources(resources) + return w.waitForResources(resources, waitRetries) } // WaitForDelete wait up to the given timeout for the specified resources to be deleted. diff --git a/pkg/kube/fake/fake.go b/pkg/kube/fake/fake.go index 267020d57..4b351fa70 100644 --- a/pkg/kube/fake/fake.go +++ b/pkg/kube/fake/fake.go @@ -74,6 +74,15 @@ func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration) e return f.PrintingKubeClient.Wait(resources, d) } +// Waits the amount of time defined on f.WaitDuration, then returns the configured error if set or prints. +func (f *FailingKubeClient) WaitWithRetry(resources kube.ResourceList, d time.Duration, waitRetries int) error { + time.Sleep(f.WaitDuration) + if f.WaitError != nil { + return f.WaitError + } + return f.PrintingKubeClient.WaitWithRetry(resources, d, waitRetries) +} + // WaitWithJobs returns the configured error if set or prints func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Duration) error { if f.WaitError != nil { @@ -82,6 +91,14 @@ func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Dur return f.PrintingKubeClient.WaitWithJobs(resources, d) } +// WaitWithJobs returns the configured error if set or prints +func (f *FailingKubeClient) WaitWithJobsWithRetry(resources kube.ResourceList, d time.Duration, waitRetries int) error { + if f.WaitError != nil { + return f.WaitError + } + return f.PrintingKubeClient.WaitWithJobsWithRetry(resources, d, waitRetries) +} + // WaitForDelete returns the configured error if set or prints func (f *FailingKubeClient) WaitForDelete(resources kube.ResourceList, d time.Duration) error { if f.WaitError != nil { diff --git a/pkg/kube/fake/printer.go b/pkg/kube/fake/printer.go index e6c4b6207..23c746686 100644 --- a/pkg/kube/fake/printer.go +++ b/pkg/kube/fake/printer.go @@ -62,11 +62,21 @@ func (p *PrintingKubeClient) Wait(resources kube.ResourceList, _ time.Duration) return err } +func (p *PrintingKubeClient) WaitWithRetry(resources kube.ResourceList, _ time.Duration, _ int) error { + _, err := io.Copy(p.Out, bufferize(resources)) + return err +} + func (p *PrintingKubeClient) WaitWithJobs(resources kube.ResourceList, _ time.Duration) error { _, err := io.Copy(p.Out, bufferize(resources)) return err } +func (p *PrintingKubeClient) WaitWithJobsWithRetry(resources kube.ResourceList, _ time.Duration, _ int) error { + _, err := io.Copy(p.Out, bufferize(resources)) + return err +} + func (p *PrintingKubeClient) WaitForDelete(resources kube.ResourceList, _ time.Duration) error { _, err := io.Copy(p.Out, bufferize(resources)) return err diff --git a/pkg/kube/interface.go b/pkg/kube/interface.go index ce42ed950..8c799985f 100644 --- a/pkg/kube/interface.go +++ b/pkg/kube/interface.go @@ -110,7 +110,19 @@ type InterfaceResources interface { BuildTable(reader io.Reader, validate bool) (ResourceList, error) } +// InterfaceWithRetry is introduced to avoid breaking backwards compatibility for Interface implementers. +// +// TODO Helm 4: Remove InterfaceWithRetry and integrate its method(s) into the Interface. +type InterfaceWithRetry interface { + // Wait waits up to the given timeout for the specified resources to be ready. + WaitWithRetry(resources ResourceList, timeout time.Duration, waitRetries int) error + + // WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs. + WaitWithJobsWithRetry(resources ResourceList, timeout time.Duration, waitRetries int) error +} + var _ Interface = (*Client)(nil) var _ InterfaceExt = (*Client)(nil) var _ InterfaceDeletionPropagation = (*Client)(nil) var _ InterfaceResources = (*Client)(nil) +var _ InterfaceWithRetry = (*Client)(nil) diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index ecdd38940..a392c99c9 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -32,6 +32,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/cli-runtime/pkg/resource" "k8s.io/apimachinery/pkg/util/wait" ) @@ -42,18 +43,35 @@ type waiter struct { log func(string, ...interface{}) } -// waitForResources polls to get the current status of all pods, PVCs, Services and -// Jobs(optional) until all are ready or a timeout is reached -func (w *waiter) waitForResources(created ResourceList) error { - w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) +// Jobs(optional) until all are ready or a timeout is reached. +// If an error is encountered when checking on the status of a resource then retries +// will be performed subject to the given maximum number of retries +func (w *waiter) waitForResources(created ResourceList, waitRetries int) error { + w.log("beginning wait for %d resources with timeout of %v, maximum retries %d", len(created), w.timeout, waitRetries) ctx, cancel := context.WithTimeout(context.Background(), w.timeout) defer cancel() + numberOfErrors := make([]int, len(created)) + for i := range numberOfErrors { + numberOfErrors[i] = 0 + } + return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { - for _, v := range created { + for i, v := range created { ready, err := w.c.IsReady(ctx, v) - if !ready || err != nil { + + if waitRetries > 0 && w.isRetryableError(err, v) { + numberOfErrors[i]++ + if numberOfErrors[i] > waitRetries { + w.log("Max number of retries reached") + return false, err + } + w.log("Retrying as current number of retries %d less than max number of retries %d", numberOfErrors[i]-1, waitRetries) + return false, nil + } + numberOfErrors[i] = 0 + if !ready { return false, err } } @@ -61,6 +79,21 @@ func (w *waiter) waitForResources(created ResourceList) error { }) } +func (w *waiter) isRetryableError(err error, resource *resource.Info) bool { + if err == nil { + return false + } + w.log("Error received when checking status of resource %s. Error: '%s', Resource details: '%s'", resource.Name, err, resource) + if ev, ok := err.(*apierrors.StatusError); ok { + statusCode := ev.Status().Code + retryable := statusCode >= 500 + w.log("Status code received: %d. Retryable error? %t", statusCode, retryable) + return retryable + } + w.log("Retryable error? %t", true) + return true +} + // waitForDeletedResources polls to check if all the resources are deleted or a timeout is reached func (w *waiter) waitForDeletedResources(deleted ResourceList) error { w.log("beginning wait for %d resources to be deleted with timeout of %v", len(deleted), w.timeout)