From 957d2a2bf978b06cb148b9429b8dd9258c24b887 Mon Sep 17 00:00:00 2001 From: zh168654 Date: Tue, 3 Nov 2020 19:48:29 +0800 Subject: [PATCH] add wait-for-jobs flag Signed-off-by: zh168654 --- cmd/helm/install.go | 1 + cmd/helm/install_test.go | 6 +++++ cmd/helm/rollback.go | 1 + cmd/helm/rollback_test.go | 5 ++++ .../output/install-with-wait-for-jobs.txt | 6 +++++ .../output/rollback-wait-for-jobs.txt | 1 + .../output/upgrade-with-wait-for-jobs.txt | 7 ++++++ cmd/helm/upgrade.go | 2 ++ cmd/helm/upgrade_test.go | 6 +++++ pkg/action/install.go | 6 ++--- pkg/action/install_test.go | 17 ++++++++++++++ pkg/action/rollback.go | 3 ++- pkg/action/upgrade.go | 5 +++- pkg/action/upgrade_test.go | 23 +++++++++++++++++++ pkg/kube/client.go | 4 ++-- pkg/kube/fake/fake.go | 4 ++-- pkg/kube/fake/printer.go | 2 +- pkg/kube/interface.go | 2 +- pkg/kube/wait.go | 14 ++++++----- 19 files changed, 98 insertions(+), 17 deletions(-) create mode 100644 cmd/helm/testdata/output/install-with-wait-for-jobs.txt create mode 100644 cmd/helm/testdata/output/rollback-wait-for-jobs.txt create mode 100644 cmd/helm/testdata/output/upgrade-with-wait-for-jobs.txt diff --git a/cmd/helm/install.go b/cmd/helm/install.go index 7edd98091..fac2131c1 100644 --- a/cmd/helm/install.go +++ b/cmd/helm/install.go @@ -140,6 +140,7 @@ func addInstallFlags(cmd *cobra.Command, f *pflag.FlagSet, client *action.Instal f.BoolVar(&client.Replace, "replace", false, "re-use the given name, only if that name is a deleted release which remains in the history. This is unsafe in production") f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)") f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout") + f.BoolVar(&client.WaitForJobs, "wait-for-jobs", false, "if set and --wait enabled, will wait until all Jobs have been completed before marking the release as successful. It will wait for as long as --timeout") f.BoolVarP(&client.GenerateName, "generate-name", "g", false, "generate the name (and omit the NAME parameter)") f.StringVar(&client.NameTemplate, "name-template", "", "specify template used to name the release") f.StringVar(&client.Description, "description", "", "add a custom description") diff --git a/cmd/helm/install_test.go b/cmd/helm/install_test.go index 6892fcd86..0fae79534 100644 --- a/cmd/helm/install_test.go +++ b/cmd/helm/install_test.go @@ -85,6 +85,12 @@ func TestInstall(t *testing.T) { cmd: "install apollo testdata/testcharts/empty --wait", golden: "output/install-with-wait.txt", }, + // Install, with wait-for-jobs + { + name: "install with wait-for-jobs", + cmd: "install apollo testdata/testcharts/empty --wait --wait-for-jobs", + golden: "output/install-with-wait-for-jobs.txt", + }, // Install, using the name-template { name: "install with name-template", diff --git a/cmd/helm/rollback.go b/cmd/helm/rollback.go index 2cd6fa2cb..9699b9c05 100644 --- a/cmd/helm/rollback.go +++ b/cmd/helm/rollback.go @@ -82,6 +82,7 @@ func newRollbackCmd(cfg *action.Configuration, out io.Writer) *cobra.Command { f.BoolVar(&client.DisableHooks, "no-hooks", false, "prevent hooks from running during rollback") f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)") f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout") + f.BoolVar(&client.WaitForJobs, "wait-for-jobs", false, "if set and --wait enabled, will wait until all Jobs have been completed before marking the release as successful. It will wait for as long as --timeout") f.BoolVar(&client.CleanupOnFail, "cleanup-on-fail", false, "allow deletion of new resources created in this rollback when rollback fails") f.IntVar(&client.MaxHistory, "history-max", settings.MaxHistory, "limit the maximum number of revisions saved per release. Use 0 for no limit") diff --git a/cmd/helm/rollback_test.go b/cmd/helm/rollback_test.go index b39378f92..9ca921557 100644 --- a/cmd/helm/rollback_test.go +++ b/cmd/helm/rollback_test.go @@ -54,6 +54,11 @@ func TestRollbackCmd(t *testing.T) { cmd: "rollback funny-honey 1 --wait", golden: "output/rollback-wait.txt", rels: rels, + }, { + name: "rollback a release with wait-for-jobs", + cmd: "rollback funny-honey 1 --wait --wait-for-jobs", + golden: "output/rollback-wait-for-jobs.txt", + rels: rels, }, { name: "rollback a release without revision", cmd: "rollback funny-honey", diff --git a/cmd/helm/testdata/output/install-with-wait-for-jobs.txt b/cmd/helm/testdata/output/install-with-wait-for-jobs.txt new file mode 100644 index 000000000..7ce22d4ec --- /dev/null +++ b/cmd/helm/testdata/output/install-with-wait-for-jobs.txt @@ -0,0 +1,6 @@ +NAME: apollo +LAST DEPLOYED: Fri Sep 2 22:04:05 1977 +NAMESPACE: default +STATUS: deployed +REVISION: 1 +TEST SUITE: None diff --git a/cmd/helm/testdata/output/rollback-wait-for-jobs.txt b/cmd/helm/testdata/output/rollback-wait-for-jobs.txt new file mode 100644 index 000000000..ae3c6f1c4 --- /dev/null +++ b/cmd/helm/testdata/output/rollback-wait-for-jobs.txt @@ -0,0 +1 @@ +Rollback was a success! Happy Helming! diff --git a/cmd/helm/testdata/output/upgrade-with-wait-for-jobs.txt b/cmd/helm/testdata/output/upgrade-with-wait-for-jobs.txt new file mode 100644 index 000000000..500d07a11 --- /dev/null +++ b/cmd/helm/testdata/output/upgrade-with-wait-for-jobs.txt @@ -0,0 +1,7 @@ +Release "crazy-bunny" has been upgraded. Happy Helming! +NAME: crazy-bunny +LAST DEPLOYED: Fri Sep 2 22:04:05 1977 +NAMESPACE: default +STATUS: deployed +REVISION: 3 +TEST SUITE: None diff --git a/cmd/helm/upgrade.go b/cmd/helm/upgrade.go index 12d797545..c2e92fb36 100644 --- a/cmd/helm/upgrade.go +++ b/cmd/helm/upgrade.go @@ -103,6 +103,7 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command { instClient.SkipCRDs = client.SkipCRDs instClient.Timeout = client.Timeout instClient.Wait = client.Wait + instClient.WaitForJobs = client.WaitForJobs instClient.Devel = client.Devel instClient.Namespace = client.Namespace instClient.Atomic = client.Atomic @@ -179,6 +180,7 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command { f.BoolVar(&client.ResetValues, "reset-values", false, "when upgrading, reset the values to the ones built into the chart") f.BoolVar(&client.ReuseValues, "reuse-values", false, "when upgrading, reuse the last release's values and merge in any overrides from the command line via --set and -f. If '--reset-values' is specified, this is ignored") f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout") + f.BoolVar(&client.WaitForJobs, "wait-for-jobs", false, "if set and --wait enabled, will wait until all Jobs have been completed before marking the release as successful. It will wait for as long as --timeout") f.BoolVar(&client.Atomic, "atomic", false, "if set, upgrade process rolls back changes made in case of failed upgrade. The --wait flag will be set automatically if --atomic is used") f.IntVar(&client.MaxHistory, "history-max", settings.MaxHistory, "limit the maximum number of revisions saved per release. Use 0 for no limit") f.BoolVar(&client.CleanupOnFail, "cleanup-on-fail", false, "allow deletion of new resources created in this upgrade when upgrade fails") diff --git a/cmd/helm/upgrade_test.go b/cmd/helm/upgrade_test.go index 6fe79ebce..e952a5933 100644 --- a/cmd/helm/upgrade_test.go +++ b/cmd/helm/upgrade_test.go @@ -131,6 +131,12 @@ func TestUpgradeCmd(t *testing.T) { golden: "output/upgrade-with-wait.txt", rels: []*release.Release{relMock("crazy-bunny", 2, ch2)}, }, + { + name: "upgrade a release with wait-for-jobs", + cmd: fmt.Sprintf("upgrade crazy-bunny --wait --wait-for-jobs '%s'", chartPath), + golden: "output/upgrade-with-wait-for-jobs.txt", + rels: []*release.Release{relMock("crazy-bunny", 2, ch2)}, + }, { name: "upgrade a release with missing dependencies", cmd: fmt.Sprintf("upgrade bonkers-bunny %s", missingDepsPath), diff --git a/pkg/action/install.go b/pkg/action/install.go index caeefca68..6ef754a45 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -77,6 +77,7 @@ type Install struct { DisableHooks bool Replace bool Wait bool + WaitForJobs bool Devel bool DependencyUpdate bool Timeout time.Duration @@ -156,7 +157,7 @@ func (i *Install) installCRDs(crds []chart.CRD) error { discoveryClient.Invalidate() // Give time for the CRD to be recognized. - if err := i.cfg.KubeClient.Wait(totalItems, 60*time.Second); err != nil { + if err := i.cfg.KubeClient.Wait(totalItems, 60*time.Second, false); err != nil { return err } @@ -345,10 +346,9 @@ func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release. } if i.Wait { - if err := i.cfg.KubeClient.Wait(resources, i.Timeout); err != nil { + if err := i.cfg.KubeClient.Wait(resources, i.Timeout, i.WaitForJobs); err != nil { return i.failRelease(rel, err) } - } if !i.DisableHooks { diff --git a/pkg/action/install_test.go b/pkg/action/install_test.go index 6c4012cfd..466b15c51 100644 --- a/pkg/action/install_test.go +++ b/pkg/action/install_test.go @@ -362,6 +362,23 @@ func TestInstallRelease_Wait(t *testing.T) { is.Equal(res.Info.Status, release.StatusFailed) } +func TestInstallRelease_WaitForJobs(t *testing.T) { + is := assert.New(t) + instAction := installAction(t) + instAction.ReleaseName = "come-fail-away" + failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + failer.WaitError = fmt.Errorf("I timed out") + instAction.cfg.KubeClient = failer + instAction.Wait = true + instAction.WaitForJobs = true + vals := map[string]interface{}{} + + res, err := instAction.Run(buildChart(), vals) + is.Error(err) + is.Contains(res.Info.Description, "I timed out") + is.Equal(res.Info.Status, release.StatusFailed) +} + func TestInstallRelease_Atomic(t *testing.T) { is := assert.New(t) diff --git a/pkg/action/rollback.go b/pkg/action/rollback.go index 542acefae..5c3fabaee 100644 --- a/pkg/action/rollback.go +++ b/pkg/action/rollback.go @@ -38,6 +38,7 @@ type Rollback struct { Version int Timeout time.Duration Wait bool + WaitForJobs bool DisableHooks bool DryRun bool Recreate bool // will (if true) recreate pods after a rollback. @@ -199,7 +200,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas } if r.Wait { - if err := r.cfg.KubeClient.Wait(target, r.Timeout); err != nil { + if err := r.cfg.KubeClient.Wait(target, r.Timeout, r.WaitForJobs); err != nil { targetRelease.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", targetRelease.Name, err.Error())) r.cfg.recordRelease(currentRelease) r.cfg.recordRelease(targetRelease) diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index c439af79d..db74e1ece 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -64,6 +64,8 @@ type Upgrade struct { Timeout time.Duration // Wait determines whether the wait operation should be performed after the upgrade is requested. Wait bool + // WaitForJobs determines whether the wait operation for the Jobs should be performed after the upgrade is requested. + WaitForJobs bool // DisableHooks disables hook processing if set to true. DisableHooks bool // DryRun controls whether the operation is prepared, but not executed. @@ -329,7 +331,7 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea } if u.Wait { - if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil { + if err := u.cfg.KubeClient.Wait(target, u.Timeout, u.WaitForJobs); err != nil { u.cfg.recordRelease(originalRelease) return u.failRelease(upgradedRelease, results.Created, err) } @@ -400,6 +402,7 @@ func (u *Upgrade) failRelease(rel *release.Release, created kube.ResourceList, e rollin := NewRollback(u.cfg) rollin.Version = filteredHistory[0].Version rollin.Wait = true + rollin.WaitForJobs = u.WaitForJobs rollin.DisableHooks = u.DisableHooks rollin.Recreate = u.Recreate rollin.Force = u.Force diff --git a/pkg/action/upgrade_test.go b/pkg/action/upgrade_test.go index f16de6479..5cca7ca1a 100644 --- a/pkg/action/upgrade_test.go +++ b/pkg/action/upgrade_test.go @@ -60,6 +60,29 @@ func TestUpgradeRelease_Wait(t *testing.T) { is.Equal(res.Info.Status, release.StatusFailed) } +func TestUpgradeRelease_WaitForJobs(t *testing.T) { + is := assert.New(t) + req := require.New(t) + + upAction := upgradeAction(t) + rel := releaseStub() + rel.Name = "come-fail-away" + rel.Info.Status = release.StatusDeployed + upAction.cfg.Releases.Create(rel) + + failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + failer.WaitError = fmt.Errorf("I timed out") + upAction.cfg.KubeClient = failer + upAction.Wait = true + upAction.WaitForJobs = true + vals := map[string]interface{}{} + + res, err := upAction.Run(rel.Name, buildChart(), vals) + req.Error(err) + is.Contains(res.Info.Description, "I timed out") + is.Equal(res.Info.Status, release.StatusFailed) +} + func TestUpgradeRelease_CleanupOnFail(t *testing.T) { is := assert.New(t) req := require.New(t) diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 6fd3336c9..d1681a534 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -127,7 +127,7 @@ func (c *Client) Create(resources ResourceList) (*Result, error) { } // Wait up to the given timeout for the specified resources to be ready -func (c *Client) Wait(resources ResourceList, timeout time.Duration) error { +func (c *Client) Wait(resources ResourceList, timeout time.Duration, waitForJobsEnabled bool) error { cs, err := c.getKubeClient() if err != nil { return err @@ -137,7 +137,7 @@ func (c *Client) Wait(resources ResourceList, timeout time.Duration) error { log: c.Log, timeout: timeout, } - return w.waitForResources(resources) + return w.waitForResources(resources, waitForJobsEnabled) } func (c *Client) namespace() string { diff --git a/pkg/kube/fake/fake.go b/pkg/kube/fake/fake.go index b3f7a393b..55b887ab3 100644 --- a/pkg/kube/fake/fake.go +++ b/pkg/kube/fake/fake.go @@ -51,11 +51,11 @@ func (f *FailingKubeClient) Create(resources kube.ResourceList) (*kube.Result, e } // Wait returns the configured error if set or prints -func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration) error { +func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration, waitForJobsEnabled bool) error { if f.WaitError != nil { return f.WaitError } - return f.PrintingKubeClient.Wait(resources, d) + return f.PrintingKubeClient.Wait(resources, d, waitForJobsEnabled) } // Delete returns the configured error if set or prints diff --git a/pkg/kube/fake/printer.go b/pkg/kube/fake/printer.go index 58b389ab5..b5f869c71 100644 --- a/pkg/kube/fake/printer.go +++ b/pkg/kube/fake/printer.go @@ -47,7 +47,7 @@ func (p *PrintingKubeClient) Create(resources kube.ResourceList) (*kube.Result, return &kube.Result{Created: resources}, nil } -func (p *PrintingKubeClient) Wait(resources kube.ResourceList, _ time.Duration) error { +func (p *PrintingKubeClient) Wait(resources kube.ResourceList, _ time.Duration, _ bool) error { _, err := io.Copy(p.Out, bufferize(resources)) return err } diff --git a/pkg/kube/interface.go b/pkg/kube/interface.go index 4bf61211e..d89abed34 100644 --- a/pkg/kube/interface.go +++ b/pkg/kube/interface.go @@ -30,7 +30,7 @@ type Interface interface { // Create creates one or more resources. Create(resources ResourceList) (*Result, error) - Wait(resources ResourceList, timeout time.Duration) error + Wait(resources ResourceList, timeout time.Duration, waitForJobsEnabled bool) error // Delete destroys one or more resources. Delete(resources ResourceList) (*Result, []error) diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index 3381b7881..40f7b7a6e 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -47,9 +47,9 @@ type waiter struct { log func(string, ...interface{}) } -// waitForResources polls to get the current status of all pods, PVCs, and Services -// until all are ready or a timeout is reached -func (w *waiter) waitForResources(created ResourceList) error { +// waitForResources polls to get the current status of all pods, PVCs, Services and +// Jobs(optional) until all are ready or a timeout is reached +func (w *waiter) waitForResources(created ResourceList, waitForJobsEnabled bool) error { w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) return wait.Poll(2*time.Second, w.timeout, func() (bool, error) { @@ -68,9 +68,11 @@ func (w *waiter) waitForResources(created ResourceList) error { return false, err } case *batchv1.Job: - job, err := w.c.BatchV1().Jobs(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{}) - if err != nil || !w.jobReady(job) { - return false, err + if waitForJobsEnabled { + job, err := w.c.BatchV1().Jobs(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{}) + if err != nil || !w.jobReady(job) { + return false, err + } } case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment: currentDeployment, err := w.c.AppsV1().Deployments(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{})