From 5825112a8b385ca48e67c4782ba77656c6f4fba5 Mon Sep 17 00:00:00 2001 From: zh168654 Date: Mon, 29 Jun 2020 01:53:43 +0800 Subject: [PATCH 1/6] helm upgrade with --wait support jobs in manifest to be completed Signed-off-by: zh168654 --- pkg/kube/wait.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index c3beb232d..3381b7881 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -67,6 +67,11 @@ func (w *waiter) waitForResources(created ResourceList) error { if err != nil || !w.isPodReady(pod) { return false, err } + case *batchv1.Job: + job, err := w.c.BatchV1().Jobs(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{}) + if err != nil || !w.jobReady(job) { + return false, err + } case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment: currentDeployment, err := w.c.AppsV1().Deployments(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{}) if err != nil { @@ -182,6 +187,18 @@ func (w *waiter) isPodReady(pod *corev1.Pod) bool { return false } +func (w *waiter) jobReady(job *batchv1.Job) bool { + if job.Status.Failed >= *job.Spec.BackoffLimit { + w.log("Job is failed: %s/%s", job.GetNamespace(), job.GetName()) + return false + } + if job.Status.Succeeded < *job.Spec.Completions { + w.log("Job is not completed: %s/%s", job.GetNamespace(), job.GetName()) + return false + } + return true +} + func (w *waiter) serviceReady(s *corev1.Service) bool { // ExternalName Services are external to cluster so helm shouldn't be checking to see if they're 'ready' (i.e. have an IP Set) if s.Spec.Type == corev1.ServiceTypeExternalName { From 8d498d58e7b383b6c50d43cc5a55eae4955d354e Mon Sep 17 00:00:00 2001 From: zh168654 Date: Thu, 9 Jul 2020 01:10:24 +0800 Subject: [PATCH 2/6] add test cases Signed-off-by: zh168654 --- pkg/kube/wait_test.go | 514 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 514 insertions(+) create mode 100644 pkg/kube/wait_test.go diff --git a/pkg/kube/wait_test.go b/pkg/kube/wait_test.go new file mode 100644 index 000000000..e84924f2e --- /dev/null +++ b/pkg/kube/wait_test.go @@ -0,0 +1,514 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kube + +import ( + "context" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes/fake" + "testing" +) + +const defaultNamespace = metav1.NamespaceDefault + +func Test_waiter_deploymentReady(t *testing.T) { + type args struct { + rs *appsv1.ReplicaSet + dep *appsv1.Deployment + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "deployment is ready", + args: args{ + rs: newReplicaSet("foo", 1, 1), + dep: newDeployment("foo", 1, 1, 0), + }, + want: true, + }, + { + name: "deployment is not ready", + args: args{ + rs: newReplicaSet("foo", 0, 0), + dep: newDeployment("foo", 1, 1, 0), + }, + want: false, + }, + { + name: "deployment is ready when maxUnavailable is set", + args: args{ + rs: newReplicaSet("foo", 2, 1), + dep: newDeployment("foo", 2, 1, 1), + }, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w := &waiter{ + c: fake.NewSimpleClientset(), + log: nopLogger, + } + if got := w.deploymentReady(tt.args.rs, tt.args.dep); got != tt.want { + t.Errorf("deploymentReady() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_waiter_daemonSetReady(t *testing.T) { + type args struct { + ds *appsv1.DaemonSet + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "daemonset is ready", + args: args{ + ds: newDaemonSet("foo", 0, 1, 1, 1), + }, + want: true, + }, + { + name: "daemonset is not ready", + args: args{ + ds: newDaemonSet("foo", 0, 0, 1, 1), + }, + want: false, + }, + { + name: "daemonset pods have not been scheduled successfully", + args: args{ + ds: newDaemonSet("foo", 0, 0, 1, 0), + }, + want: false, + }, + { + name: "daemonset is ready when maxUnavailable is set", + args: args{ + ds: newDaemonSet("foo", 1, 1, 2, 2), + }, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w := &waiter{ + c: fake.NewSimpleClientset(), + log: nopLogger, + } + if got := w.daemonSetReady(tt.args.ds); got != tt.want { + t.Errorf("daemonSetReady() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_waiter_statefulSetReady(t *testing.T) { + type args struct { + sts *appsv1.StatefulSet + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "statefulset is ready", + args: args{ + sts: newStatefulSet("foo", 1, 0, 1, 1), + }, + want: true, + }, + { + name: "statefulset is not ready", + args: args{ + sts: newStatefulSet("foo", 1, 0, 0, 1), + }, + want: false, + }, + { + name: "statefulset is ready when partition is specified", + args: args{ + sts: newStatefulSet("foo", 2, 1, 2, 1), + }, + want: true, + }, + { + name: "statefulset is not ready when partition is set", + args: args{ + sts: newStatefulSet("foo", 1, 1, 1, 1), + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w := &waiter{ + c: fake.NewSimpleClientset(), + log: nopLogger, + } + if got := w.statefulSetReady(tt.args.sts); got != tt.want { + t.Errorf("statefulSetReady() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_waiter_podsReadyForObject(t *testing.T) { + type args struct { + namespace string + obj runtime.Object + } + tests := []struct { + name string + args args + existPods []corev1.Pod + want bool + wantErr bool + }{ + { + name: "pods ready for a replicaset", + args: args{ + namespace: defaultNamespace, + obj: newReplicaSet("foo", 1, 1), + }, + existPods: []corev1.Pod{ + *newPodWithCondition("foo", corev1.ConditionTrue), + }, + want: true, + wantErr: false, + }, + { + name: "pods not ready for a replicaset", + args: args{ + namespace: defaultNamespace, + obj: newReplicaSet("foo", 1, 1), + }, + existPods: []corev1.Pod{ + *newPodWithCondition("foo", corev1.ConditionFalse), + }, + want: false, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w := &waiter{ + c: fake.NewSimpleClientset(), + log: nopLogger, + } + for _, pod := range tt.existPods { + if _, err := w.c.CoreV1().Pods(defaultNamespace).Create(context.TODO(), &pod, metav1.CreateOptions{}); err != nil { + t.Errorf("Failed to create Pod error: %v", err) + return + } + } + got, err := w.podsReadyForObject(tt.args.namespace, tt.args.obj) + if (err != nil) != tt.wantErr { + t.Errorf("podsReadyForObject() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("podsReadyForObject() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_waiter_jobReady(t *testing.T) { + type args struct { + job *batchv1.Job + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "job is completed", + args: args{job: newJob("foo", 1, 1, 1, 0)}, + want: true, + }, + { + name: "job is incomplete", + args: args{job: newJob("foo", 1, 1, 0, 0)}, + want: false, + }, + { + name: "job is failed", + args: args{job: newJob("foo", 1, 1, 0, 1)}, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w := &waiter{ + c: fake.NewSimpleClientset(), + log: nopLogger, + } + if got := w.jobReady(tt.args.job); got != tt.want { + t.Errorf("jobReady() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_waiter_volumeReady(t *testing.T) { + type args struct { + v *corev1.PersistentVolumeClaim + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "pvc is bound", + args: args{ + v: newPersistentVolumeClaim("foo", corev1.ClaimBound), + }, + want: true, + }, + { + name: "pvc is not ready", + args: args{ + v: newPersistentVolumeClaim("foo", corev1.ClaimPending), + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w := &waiter{ + c: fake.NewSimpleClientset(), + log: nopLogger, + } + if got := w.volumeReady(tt.args.v); got != tt.want { + t.Errorf("volumeReady() = %v, want %v", got, tt.want) + } + }) + } +} + +func newDaemonSet(name string, maxUnavailable, numberReady, desiredNumberScheduled, updatedNumberScheduled int) *appsv1.DaemonSet { + return &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNamespace, + }, + Spec: appsv1.DaemonSetSpec{ + UpdateStrategy: appsv1.DaemonSetUpdateStrategy{ + Type: appsv1.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &appsv1.RollingUpdateDaemonSet{ + MaxUnavailable: func() *intstr.IntOrString { i := intstr.FromInt(maxUnavailable); return &i }(), + }, + }, + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": name}}, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{"name": name}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "nginx", + }, + }, + }, + }, + }, + Status: appsv1.DaemonSetStatus{ + DesiredNumberScheduled: int32(desiredNumberScheduled), + NumberReady: int32(numberReady), + UpdatedNumberScheduled: int32(updatedNumberScheduled), + }, + } +} + +func newStatefulSet(name string, replicas, partition, readyReplicas, updatedReplicas int) *appsv1.StatefulSet { + return &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNamespace, + }, + Spec: appsv1.StatefulSetSpec{ + UpdateStrategy: appsv1.StatefulSetUpdateStrategy{ + Type: appsv1.RollingUpdateStatefulSetStrategyType, + RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{ + Partition: intToInt32(partition), + }, + }, + Replicas: intToInt32(replicas), + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": name}}, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{"name": name}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "nginx", + }, + }, + }, + }, + }, + Status: appsv1.StatefulSetStatus{ + UpdatedReplicas: int32(updatedReplicas), + ReadyReplicas: int32(readyReplicas), + }, + } +} + +func newDeployment(name string, replicas, maxSurge, maxUnavailable int) *appsv1.Deployment { + return &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNamespace, + }, + Spec: appsv1.DeploymentSpec{ + Strategy: appsv1.DeploymentStrategy{ + Type: appsv1.RollingUpdateDeploymentStrategyType, + RollingUpdate: &appsv1.RollingUpdateDeployment{ + MaxUnavailable: func() *intstr.IntOrString { i := intstr.FromInt(maxUnavailable); return &i }(), + MaxSurge: func() *intstr.IntOrString { i := intstr.FromInt(maxSurge); return &i }(), + }, + }, + Replicas: intToInt32(replicas), + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": name}}, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{"name": name}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "nginx", + }, + }, + }, + }, + }, + } +} + +func newReplicaSet(name string, replicas int, readyReplicas int) *appsv1.ReplicaSet { + d := newDeployment(name, replicas, 0, 0) + return &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNamespace, + Labels: d.Spec.Selector.MatchLabels, + OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(d, d.GroupVersionKind())}, + }, + Spec: appsv1.ReplicaSetSpec{ + Selector: d.Spec.Selector, + Replicas: intToInt32(replicas), + Template: d.Spec.Template, + }, + Status: appsv1.ReplicaSetStatus{ + ReadyReplicas: int32(readyReplicas), + }, + } +} + +func newPodWithCondition(name string, podReadyCondition corev1.ConditionStatus) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNamespace, + Labels: map[string]string{"name": name}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "nginx", + }, + }, + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: podReadyCondition, + }, + }, + }, + } +} + +func newPersistentVolumeClaim(name string, phase corev1.PersistentVolumeClaimPhase) *corev1.PersistentVolumeClaim { + return &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNamespace, + }, + Status: corev1.PersistentVolumeClaimStatus{ + Phase: phase, + }, + } +} + +func newJob(name string, backoffLimit, completions, succeeded, failed int) *batchv1.Job { + return &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: defaultNamespace, + }, + Spec: batchv1.JobSpec{ + BackoffLimit: intToInt32(backoffLimit), + Completions: intToInt32(completions), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{"name": name}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Image: "nginx", + }, + }, + }, + }, + }, + Status: batchv1.JobStatus{ + Succeeded: int32(succeeded), + Failed: int32(failed), + }, + } +} + +func intToInt32(i int) *int32 { + i32 := int32(i) + return &i32 +} From c96dc48f21adbc79e410fafc63f6f6daa221c424 Mon Sep 17 00:00:00 2001 From: zhangye15 Date: Thu, 9 Jul 2020 01:44:18 +0800 Subject: [PATCH 3/6] fix test-style error Signed-off-by: zhangye15 --- pkg/kube/wait_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kube/wait_test.go b/pkg/kube/wait_test.go index e84924f2e..c6ce494c3 100644 --- a/pkg/kube/wait_test.go +++ b/pkg/kube/wait_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package kube +package kube // import "helm.sh/helm/v3/pkg/kube" import ( "context" From bd03e1b5c70cffd13e740f40ef1c0e8c3a49e092 Mon Sep 17 00:00:00 2001 From: zhangye15 Date: Thu, 9 Jul 2020 01:52:24 +0800 Subject: [PATCH 4/6] fix style conformance Signed-off-by: zhangye15 --- pkg/kube/wait_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/kube/wait_test.go b/pkg/kube/wait_test.go index c6ce494c3..3f7b86710 100644 --- a/pkg/kube/wait_test.go +++ b/pkg/kube/wait_test.go @@ -18,6 +18,8 @@ package kube // import "helm.sh/helm/v3/pkg/kube" import ( "context" + "testing" + appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -25,7 +27,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes/fake" - "testing" ) const defaultNamespace = metav1.NamespaceDefault From 957d2a2bf978b06cb148b9429b8dd9258c24b887 Mon Sep 17 00:00:00 2001 From: zh168654 Date: Tue, 3 Nov 2020 19:48:29 +0800 Subject: [PATCH 5/6] add wait-for-jobs flag Signed-off-by: zh168654 --- cmd/helm/install.go | 1 + cmd/helm/install_test.go | 6 +++++ cmd/helm/rollback.go | 1 + cmd/helm/rollback_test.go | 5 ++++ .../output/install-with-wait-for-jobs.txt | 6 +++++ .../output/rollback-wait-for-jobs.txt | 1 + .../output/upgrade-with-wait-for-jobs.txt | 7 ++++++ cmd/helm/upgrade.go | 2 ++ cmd/helm/upgrade_test.go | 6 +++++ pkg/action/install.go | 6 ++--- pkg/action/install_test.go | 17 ++++++++++++++ pkg/action/rollback.go | 3 ++- pkg/action/upgrade.go | 5 +++- pkg/action/upgrade_test.go | 23 +++++++++++++++++++ pkg/kube/client.go | 4 ++-- pkg/kube/fake/fake.go | 4 ++-- pkg/kube/fake/printer.go | 2 +- pkg/kube/interface.go | 2 +- pkg/kube/wait.go | 14 ++++++----- 19 files changed, 98 insertions(+), 17 deletions(-) create mode 100644 cmd/helm/testdata/output/install-with-wait-for-jobs.txt create mode 100644 cmd/helm/testdata/output/rollback-wait-for-jobs.txt create mode 100644 cmd/helm/testdata/output/upgrade-with-wait-for-jobs.txt diff --git a/cmd/helm/install.go b/cmd/helm/install.go index 7edd98091..fac2131c1 100644 --- a/cmd/helm/install.go +++ b/cmd/helm/install.go @@ -140,6 +140,7 @@ func addInstallFlags(cmd *cobra.Command, f *pflag.FlagSet, client *action.Instal f.BoolVar(&client.Replace, "replace", false, "re-use the given name, only if that name is a deleted release which remains in the history. This is unsafe in production") f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)") f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout") + f.BoolVar(&client.WaitForJobs, "wait-for-jobs", false, "if set and --wait enabled, will wait until all Jobs have been completed before marking the release as successful. It will wait for as long as --timeout") f.BoolVarP(&client.GenerateName, "generate-name", "g", false, "generate the name (and omit the NAME parameter)") f.StringVar(&client.NameTemplate, "name-template", "", "specify template used to name the release") f.StringVar(&client.Description, "description", "", "add a custom description") diff --git a/cmd/helm/install_test.go b/cmd/helm/install_test.go index 6892fcd86..0fae79534 100644 --- a/cmd/helm/install_test.go +++ b/cmd/helm/install_test.go @@ -85,6 +85,12 @@ func TestInstall(t *testing.T) { cmd: "install apollo testdata/testcharts/empty --wait", golden: "output/install-with-wait.txt", }, + // Install, with wait-for-jobs + { + name: "install with wait-for-jobs", + cmd: "install apollo testdata/testcharts/empty --wait --wait-for-jobs", + golden: "output/install-with-wait-for-jobs.txt", + }, // Install, using the name-template { name: "install with name-template", diff --git a/cmd/helm/rollback.go b/cmd/helm/rollback.go index 2cd6fa2cb..9699b9c05 100644 --- a/cmd/helm/rollback.go +++ b/cmd/helm/rollback.go @@ -82,6 +82,7 @@ func newRollbackCmd(cfg *action.Configuration, out io.Writer) *cobra.Command { f.BoolVar(&client.DisableHooks, "no-hooks", false, "prevent hooks from running during rollback") f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)") f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout") + f.BoolVar(&client.WaitForJobs, "wait-for-jobs", false, "if set and --wait enabled, will wait until all Jobs have been completed before marking the release as successful. It will wait for as long as --timeout") f.BoolVar(&client.CleanupOnFail, "cleanup-on-fail", false, "allow deletion of new resources created in this rollback when rollback fails") f.IntVar(&client.MaxHistory, "history-max", settings.MaxHistory, "limit the maximum number of revisions saved per release. Use 0 for no limit") diff --git a/cmd/helm/rollback_test.go b/cmd/helm/rollback_test.go index b39378f92..9ca921557 100644 --- a/cmd/helm/rollback_test.go +++ b/cmd/helm/rollback_test.go @@ -54,6 +54,11 @@ func TestRollbackCmd(t *testing.T) { cmd: "rollback funny-honey 1 --wait", golden: "output/rollback-wait.txt", rels: rels, + }, { + name: "rollback a release with wait-for-jobs", + cmd: "rollback funny-honey 1 --wait --wait-for-jobs", + golden: "output/rollback-wait-for-jobs.txt", + rels: rels, }, { name: "rollback a release without revision", cmd: "rollback funny-honey", diff --git a/cmd/helm/testdata/output/install-with-wait-for-jobs.txt b/cmd/helm/testdata/output/install-with-wait-for-jobs.txt new file mode 100644 index 000000000..7ce22d4ec --- /dev/null +++ b/cmd/helm/testdata/output/install-with-wait-for-jobs.txt @@ -0,0 +1,6 @@ +NAME: apollo +LAST DEPLOYED: Fri Sep 2 22:04:05 1977 +NAMESPACE: default +STATUS: deployed +REVISION: 1 +TEST SUITE: None diff --git a/cmd/helm/testdata/output/rollback-wait-for-jobs.txt b/cmd/helm/testdata/output/rollback-wait-for-jobs.txt new file mode 100644 index 000000000..ae3c6f1c4 --- /dev/null +++ b/cmd/helm/testdata/output/rollback-wait-for-jobs.txt @@ -0,0 +1 @@ +Rollback was a success! Happy Helming! diff --git a/cmd/helm/testdata/output/upgrade-with-wait-for-jobs.txt b/cmd/helm/testdata/output/upgrade-with-wait-for-jobs.txt new file mode 100644 index 000000000..500d07a11 --- /dev/null +++ b/cmd/helm/testdata/output/upgrade-with-wait-for-jobs.txt @@ -0,0 +1,7 @@ +Release "crazy-bunny" has been upgraded. Happy Helming! +NAME: crazy-bunny +LAST DEPLOYED: Fri Sep 2 22:04:05 1977 +NAMESPACE: default +STATUS: deployed +REVISION: 3 +TEST SUITE: None diff --git a/cmd/helm/upgrade.go b/cmd/helm/upgrade.go index 12d797545..c2e92fb36 100644 --- a/cmd/helm/upgrade.go +++ b/cmd/helm/upgrade.go @@ -103,6 +103,7 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command { instClient.SkipCRDs = client.SkipCRDs instClient.Timeout = client.Timeout instClient.Wait = client.Wait + instClient.WaitForJobs = client.WaitForJobs instClient.Devel = client.Devel instClient.Namespace = client.Namespace instClient.Atomic = client.Atomic @@ -179,6 +180,7 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command { f.BoolVar(&client.ResetValues, "reset-values", false, "when upgrading, reset the values to the ones built into the chart") f.BoolVar(&client.ReuseValues, "reuse-values", false, "when upgrading, reuse the last release's values and merge in any overrides from the command line via --set and -f. If '--reset-values' is specified, this is ignored") f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout") + f.BoolVar(&client.WaitForJobs, "wait-for-jobs", false, "if set and --wait enabled, will wait until all Jobs have been completed before marking the release as successful. It will wait for as long as --timeout") f.BoolVar(&client.Atomic, "atomic", false, "if set, upgrade process rolls back changes made in case of failed upgrade. The --wait flag will be set automatically if --atomic is used") f.IntVar(&client.MaxHistory, "history-max", settings.MaxHistory, "limit the maximum number of revisions saved per release. Use 0 for no limit") f.BoolVar(&client.CleanupOnFail, "cleanup-on-fail", false, "allow deletion of new resources created in this upgrade when upgrade fails") diff --git a/cmd/helm/upgrade_test.go b/cmd/helm/upgrade_test.go index 6fe79ebce..e952a5933 100644 --- a/cmd/helm/upgrade_test.go +++ b/cmd/helm/upgrade_test.go @@ -131,6 +131,12 @@ func TestUpgradeCmd(t *testing.T) { golden: "output/upgrade-with-wait.txt", rels: []*release.Release{relMock("crazy-bunny", 2, ch2)}, }, + { + name: "upgrade a release with wait-for-jobs", + cmd: fmt.Sprintf("upgrade crazy-bunny --wait --wait-for-jobs '%s'", chartPath), + golden: "output/upgrade-with-wait-for-jobs.txt", + rels: []*release.Release{relMock("crazy-bunny", 2, ch2)}, + }, { name: "upgrade a release with missing dependencies", cmd: fmt.Sprintf("upgrade bonkers-bunny %s", missingDepsPath), diff --git a/pkg/action/install.go b/pkg/action/install.go index caeefca68..6ef754a45 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -77,6 +77,7 @@ type Install struct { DisableHooks bool Replace bool Wait bool + WaitForJobs bool Devel bool DependencyUpdate bool Timeout time.Duration @@ -156,7 +157,7 @@ func (i *Install) installCRDs(crds []chart.CRD) error { discoveryClient.Invalidate() // Give time for the CRD to be recognized. - if err := i.cfg.KubeClient.Wait(totalItems, 60*time.Second); err != nil { + if err := i.cfg.KubeClient.Wait(totalItems, 60*time.Second, false); err != nil { return err } @@ -345,10 +346,9 @@ func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release. } if i.Wait { - if err := i.cfg.KubeClient.Wait(resources, i.Timeout); err != nil { + if err := i.cfg.KubeClient.Wait(resources, i.Timeout, i.WaitForJobs); err != nil { return i.failRelease(rel, err) } - } if !i.DisableHooks { diff --git a/pkg/action/install_test.go b/pkg/action/install_test.go index 6c4012cfd..466b15c51 100644 --- a/pkg/action/install_test.go +++ b/pkg/action/install_test.go @@ -362,6 +362,23 @@ func TestInstallRelease_Wait(t *testing.T) { is.Equal(res.Info.Status, release.StatusFailed) } +func TestInstallRelease_WaitForJobs(t *testing.T) { + is := assert.New(t) + instAction := installAction(t) + instAction.ReleaseName = "come-fail-away" + failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + failer.WaitError = fmt.Errorf("I timed out") + instAction.cfg.KubeClient = failer + instAction.Wait = true + instAction.WaitForJobs = true + vals := map[string]interface{}{} + + res, err := instAction.Run(buildChart(), vals) + is.Error(err) + is.Contains(res.Info.Description, "I timed out") + is.Equal(res.Info.Status, release.StatusFailed) +} + func TestInstallRelease_Atomic(t *testing.T) { is := assert.New(t) diff --git a/pkg/action/rollback.go b/pkg/action/rollback.go index 542acefae..5c3fabaee 100644 --- a/pkg/action/rollback.go +++ b/pkg/action/rollback.go @@ -38,6 +38,7 @@ type Rollback struct { Version int Timeout time.Duration Wait bool + WaitForJobs bool DisableHooks bool DryRun bool Recreate bool // will (if true) recreate pods after a rollback. @@ -199,7 +200,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas } if r.Wait { - if err := r.cfg.KubeClient.Wait(target, r.Timeout); err != nil { + if err := r.cfg.KubeClient.Wait(target, r.Timeout, r.WaitForJobs); err != nil { targetRelease.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", targetRelease.Name, err.Error())) r.cfg.recordRelease(currentRelease) r.cfg.recordRelease(targetRelease) diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index c439af79d..db74e1ece 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -64,6 +64,8 @@ type Upgrade struct { Timeout time.Duration // Wait determines whether the wait operation should be performed after the upgrade is requested. Wait bool + // WaitForJobs determines whether the wait operation for the Jobs should be performed after the upgrade is requested. + WaitForJobs bool // DisableHooks disables hook processing if set to true. DisableHooks bool // DryRun controls whether the operation is prepared, but not executed. @@ -329,7 +331,7 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea } if u.Wait { - if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil { + if err := u.cfg.KubeClient.Wait(target, u.Timeout, u.WaitForJobs); err != nil { u.cfg.recordRelease(originalRelease) return u.failRelease(upgradedRelease, results.Created, err) } @@ -400,6 +402,7 @@ func (u *Upgrade) failRelease(rel *release.Release, created kube.ResourceList, e rollin := NewRollback(u.cfg) rollin.Version = filteredHistory[0].Version rollin.Wait = true + rollin.WaitForJobs = u.WaitForJobs rollin.DisableHooks = u.DisableHooks rollin.Recreate = u.Recreate rollin.Force = u.Force diff --git a/pkg/action/upgrade_test.go b/pkg/action/upgrade_test.go index f16de6479..5cca7ca1a 100644 --- a/pkg/action/upgrade_test.go +++ b/pkg/action/upgrade_test.go @@ -60,6 +60,29 @@ func TestUpgradeRelease_Wait(t *testing.T) { is.Equal(res.Info.Status, release.StatusFailed) } +func TestUpgradeRelease_WaitForJobs(t *testing.T) { + is := assert.New(t) + req := require.New(t) + + upAction := upgradeAction(t) + rel := releaseStub() + rel.Name = "come-fail-away" + rel.Info.Status = release.StatusDeployed + upAction.cfg.Releases.Create(rel) + + failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + failer.WaitError = fmt.Errorf("I timed out") + upAction.cfg.KubeClient = failer + upAction.Wait = true + upAction.WaitForJobs = true + vals := map[string]interface{}{} + + res, err := upAction.Run(rel.Name, buildChart(), vals) + req.Error(err) + is.Contains(res.Info.Description, "I timed out") + is.Equal(res.Info.Status, release.StatusFailed) +} + func TestUpgradeRelease_CleanupOnFail(t *testing.T) { is := assert.New(t) req := require.New(t) diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 6fd3336c9..d1681a534 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -127,7 +127,7 @@ func (c *Client) Create(resources ResourceList) (*Result, error) { } // Wait up to the given timeout for the specified resources to be ready -func (c *Client) Wait(resources ResourceList, timeout time.Duration) error { +func (c *Client) Wait(resources ResourceList, timeout time.Duration, waitForJobsEnabled bool) error { cs, err := c.getKubeClient() if err != nil { return err @@ -137,7 +137,7 @@ func (c *Client) Wait(resources ResourceList, timeout time.Duration) error { log: c.Log, timeout: timeout, } - return w.waitForResources(resources) + return w.waitForResources(resources, waitForJobsEnabled) } func (c *Client) namespace() string { diff --git a/pkg/kube/fake/fake.go b/pkg/kube/fake/fake.go index b3f7a393b..55b887ab3 100644 --- a/pkg/kube/fake/fake.go +++ b/pkg/kube/fake/fake.go @@ -51,11 +51,11 @@ func (f *FailingKubeClient) Create(resources kube.ResourceList) (*kube.Result, e } // Wait returns the configured error if set or prints -func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration) error { +func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration, waitForJobsEnabled bool) error { if f.WaitError != nil { return f.WaitError } - return f.PrintingKubeClient.Wait(resources, d) + return f.PrintingKubeClient.Wait(resources, d, waitForJobsEnabled) } // Delete returns the configured error if set or prints diff --git a/pkg/kube/fake/printer.go b/pkg/kube/fake/printer.go index 58b389ab5..b5f869c71 100644 --- a/pkg/kube/fake/printer.go +++ b/pkg/kube/fake/printer.go @@ -47,7 +47,7 @@ func (p *PrintingKubeClient) Create(resources kube.ResourceList) (*kube.Result, return &kube.Result{Created: resources}, nil } -func (p *PrintingKubeClient) Wait(resources kube.ResourceList, _ time.Duration) error { +func (p *PrintingKubeClient) Wait(resources kube.ResourceList, _ time.Duration, _ bool) error { _, err := io.Copy(p.Out, bufferize(resources)) return err } diff --git a/pkg/kube/interface.go b/pkg/kube/interface.go index 4bf61211e..d89abed34 100644 --- a/pkg/kube/interface.go +++ b/pkg/kube/interface.go @@ -30,7 +30,7 @@ type Interface interface { // Create creates one or more resources. Create(resources ResourceList) (*Result, error) - Wait(resources ResourceList, timeout time.Duration) error + Wait(resources ResourceList, timeout time.Duration, waitForJobsEnabled bool) error // Delete destroys one or more resources. Delete(resources ResourceList) (*Result, []error) diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index 3381b7881..40f7b7a6e 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -47,9 +47,9 @@ type waiter struct { log func(string, ...interface{}) } -// waitForResources polls to get the current status of all pods, PVCs, and Services -// until all are ready or a timeout is reached -func (w *waiter) waitForResources(created ResourceList) error { +// waitForResources polls to get the current status of all pods, PVCs, Services and +// Jobs(optional) until all are ready or a timeout is reached +func (w *waiter) waitForResources(created ResourceList, waitForJobsEnabled bool) error { w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) return wait.Poll(2*time.Second, w.timeout, func() (bool, error) { @@ -68,9 +68,11 @@ func (w *waiter) waitForResources(created ResourceList) error { return false, err } case *batchv1.Job: - job, err := w.c.BatchV1().Jobs(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{}) - if err != nil || !w.jobReady(job) { - return false, err + if waitForJobsEnabled { + job, err := w.c.BatchV1().Jobs(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{}) + if err != nil || !w.jobReady(job) { + return false, err + } } case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment: currentDeployment, err := w.c.AppsV1().Deployments(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{}) From bfc575dec2f6ed5ce897c38d0d89b0fe936606c0 Mon Sep 17 00:00:00 2001 From: zh168654 Date: Thu, 5 Nov 2020 17:13:15 +0800 Subject: [PATCH 6/6] add waitwithjobs instead of changing wait api Signed-off-by: zh168654 --- pkg/action/install.go | 12 +++++++++--- pkg/action/rollback.go | 19 ++++++++++++++----- pkg/action/upgrade.go | 13 ++++++++++--- pkg/kube/client.go | 18 ++++++++++++++++-- pkg/kube/fake/fake.go | 12 ++++++++++-- pkg/kube/fake/printer.go | 7 ++++++- pkg/kube/interface.go | 4 +++- 7 files changed, 68 insertions(+), 17 deletions(-) diff --git a/pkg/action/install.go b/pkg/action/install.go index 6ef754a45..4de0b64e6 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -157,7 +157,7 @@ func (i *Install) installCRDs(crds []chart.CRD) error { discoveryClient.Invalidate() // Give time for the CRD to be recognized. - if err := i.cfg.KubeClient.Wait(totalItems, 60*time.Second, false); err != nil { + if err := i.cfg.KubeClient.Wait(totalItems, 60*time.Second); err != nil { return err } @@ -346,8 +346,14 @@ func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release. } if i.Wait { - if err := i.cfg.KubeClient.Wait(resources, i.Timeout, i.WaitForJobs); err != nil { - return i.failRelease(rel, err) + if i.WaitForJobs { + if err := i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout); err != nil { + return i.failRelease(rel, err) + } + } else { + if err := i.cfg.KubeClient.Wait(resources, i.Timeout); err != nil { + return i.failRelease(rel, err) + } } } diff --git a/pkg/action/rollback.go b/pkg/action/rollback.go index 5c3fabaee..f3f958f3d 100644 --- a/pkg/action/rollback.go +++ b/pkg/action/rollback.go @@ -200,11 +200,20 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas } if r.Wait { - if err := r.cfg.KubeClient.Wait(target, r.Timeout, r.WaitForJobs); err != nil { - targetRelease.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", targetRelease.Name, err.Error())) - r.cfg.recordRelease(currentRelease) - r.cfg.recordRelease(targetRelease) - return targetRelease, errors.Wrapf(err, "release %s failed", targetRelease.Name) + if r.WaitForJobs { + if err := r.cfg.KubeClient.WaitWithJobs(target, r.Timeout); err != nil { + targetRelease.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", targetRelease.Name, err.Error())) + r.cfg.recordRelease(currentRelease) + r.cfg.recordRelease(targetRelease) + return targetRelease, errors.Wrapf(err, "release %s failed", targetRelease.Name) + } + } else { + if err := r.cfg.KubeClient.Wait(target, r.Timeout); err != nil { + targetRelease.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", targetRelease.Name, err.Error())) + r.cfg.recordRelease(currentRelease) + r.cfg.recordRelease(targetRelease) + return targetRelease, errors.Wrapf(err, "release %s failed", targetRelease.Name) + } } } diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index db74e1ece..b0f294cae 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -331,9 +331,16 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea } if u.Wait { - if err := u.cfg.KubeClient.Wait(target, u.Timeout, u.WaitForJobs); err != nil { - u.cfg.recordRelease(originalRelease) - return u.failRelease(upgradedRelease, results.Created, err) + if u.WaitForJobs { + if err := u.cfg.KubeClient.WaitWithJobs(target, u.Timeout); err != nil { + u.cfg.recordRelease(originalRelease) + return u.failRelease(upgradedRelease, results.Created, err) + } + } else { + if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil { + u.cfg.recordRelease(originalRelease) + return u.failRelease(upgradedRelease, results.Created, err) + } } } diff --git a/pkg/kube/client.go b/pkg/kube/client.go index d1681a534..afa0cb5b3 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -127,7 +127,7 @@ func (c *Client) Create(resources ResourceList) (*Result, error) { } // Wait up to the given timeout for the specified resources to be ready -func (c *Client) Wait(resources ResourceList, timeout time.Duration, waitForJobsEnabled bool) error { +func (c *Client) Wait(resources ResourceList, timeout time.Duration) error { cs, err := c.getKubeClient() if err != nil { return err @@ -137,7 +137,21 @@ func (c *Client) Wait(resources ResourceList, timeout time.Duration, waitForJobs log: c.Log, timeout: timeout, } - return w.waitForResources(resources, waitForJobsEnabled) + return w.waitForResources(resources, false) +} + +// WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs. +func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) error { + cs, err := c.getKubeClient() + if err != nil { + return err + } + w := waiter{ + c: cs, + log: c.Log, + timeout: timeout, + } + return w.waitForResources(resources, true) } func (c *Client) namespace() string { diff --git a/pkg/kube/fake/fake.go b/pkg/kube/fake/fake.go index 55b887ab3..ff800864c 100644 --- a/pkg/kube/fake/fake.go +++ b/pkg/kube/fake/fake.go @@ -51,11 +51,19 @@ func (f *FailingKubeClient) Create(resources kube.ResourceList) (*kube.Result, e } // Wait returns the configured error if set or prints -func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration, waitForJobsEnabled bool) error { +func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration) error { if f.WaitError != nil { return f.WaitError } - return f.PrintingKubeClient.Wait(resources, d, waitForJobsEnabled) + return f.PrintingKubeClient.Wait(resources, d) +} + +// WaitWithJobs returns the configured error if set or prints +func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Duration) error { + if f.WaitError != nil { + return f.WaitError + } + return f.PrintingKubeClient.Wait(resources, d) } // Delete returns the configured error if set or prints diff --git a/pkg/kube/fake/printer.go b/pkg/kube/fake/printer.go index b5f869c71..e8bd1845b 100644 --- a/pkg/kube/fake/printer.go +++ b/pkg/kube/fake/printer.go @@ -47,7 +47,12 @@ func (p *PrintingKubeClient) Create(resources kube.ResourceList) (*kube.Result, return &kube.Result{Created: resources}, nil } -func (p *PrintingKubeClient) Wait(resources kube.ResourceList, _ time.Duration, _ bool) error { +func (p *PrintingKubeClient) Wait(resources kube.ResourceList, _ time.Duration) error { + _, err := io.Copy(p.Out, bufferize(resources)) + return err +} + +func (p *PrintingKubeClient) WaitWithJobs(resources kube.ResourceList, _ time.Duration) error { _, err := io.Copy(p.Out, bufferize(resources)) return err } diff --git a/pkg/kube/interface.go b/pkg/kube/interface.go index d89abed34..545985996 100644 --- a/pkg/kube/interface.go +++ b/pkg/kube/interface.go @@ -30,7 +30,9 @@ type Interface interface { // Create creates one or more resources. Create(resources ResourceList) (*Result, error) - Wait(resources ResourceList, timeout time.Duration, waitForJobsEnabled bool) error + Wait(resources ResourceList, timeout time.Duration) error + + WaitWithJobs(resources ResourceList, timeout time.Duration) error // Delete destroys one or more resources. Delete(resources ResourceList) (*Result, []error)