diff --git a/pkg/kube/ready.go b/pkg/kube/ready.go index 7172a42bc..33a21cb3d 100644 --- a/pkg/kube/ready.go +++ b/pkg/kube/ready.go @@ -29,8 +29,10 @@ import ( apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -82,6 +84,89 @@ type ReadyChecker struct { pausedAsReady bool } +// getResourceWatch attempts to get watch.Interface for the supplied resource. +// resource is expected to have a name. +func (c *ReadyChecker) getResourceWatch(ctx context.Context, resource *resource.Info) (watch.Interface, error) { + if resource.Name == "" { + return nil, fmt.Errorf("can't get watch for resource with empty name") + } + fieldNameSelector := fmt.Sprintf("metadata.name=%s", fields.EscapeValue(resource.Name)) + listOpts := metav1.ListOptions{ + FieldSelector: fieldNameSelector, + ResourceVersion: resource.ResourceVersion, + } + switch resourceType := AsVersioned(resource).(type) { + case *batchv1.Job: + return c.client.BatchV1().Jobs(resource.Namespace).Watch(ctx, listOpts) + default: + return nil, fmt.Errorf("can't get watch for resoure of type %T - not implemented", resourceType) + } +} + +// convert event.Object to *batchv1.Job or return an error. +func eventObjectAsJob(event watch.Event) (*batchv1.Job, error) { + job, ok := event.Object.(*batchv1.Job) + if !ok { + return nil, fmt.Errorf( + "expected runtime.Object type of type *batchv1.Job, got %T", + job, + ) + } + return job, nil +} + +// waitJobsReady waits until all jobs from resource list are ready. +// Returns error if context is done or a job wasn't ready. +// If resource list has no jobs it returns true without an error. +func (c *ReadyChecker) waitJobsReady(ctx context.Context, resources ResourceList) (bool, error) { + for _, resource := range resources { + switch job := AsVersioned(resource).(type) { + case *batchv1.Job: + lastSeen := job + jobWatch, err := c.getResourceWatch(ctx, resource) + if err != nil { + c.log("Falling back to polling, error watching job events: %v", err) + return c.IsReady(ctx, resource) + } + defer jobWatch.Stop() + isReady, err := c.jobReady(lastSeen) + if err != nil { + return isReady, err + } + for !isReady { + select { + case <-ctx.Done(): + isReady, err = c.jobReady(lastSeen) + if err != nil { + return isReady, err + } + return isReady, ctx.Err() + case event, ok := <-jobWatch.ResultChan(): + if !ok { + return c.jobReady(lastSeen) + } + lastSeen, err = eventObjectAsJob(event) + if err != nil { + return isReady, err + } + isReady, err = c.jobReady(lastSeen) + if err != nil { + return isReady, err + } + if event.Type == watch.Deleted && !isReady { + return false, fmt.Errorf( + "job %v/%v is deleted but wasn't ready", + lastSeen.GetNamespace(), + lastSeen.GetName(), + ) + } + } + } + } + } + return true, nil +} + // IsReady checks if v is ready. It supports checking readiness for pods, // deployments, persistent volume claims, services, daemon sets, custom // resource definitions, stateful sets, replication controllers, jobs (optional), diff --git a/pkg/kube/ready_test.go b/pkg/kube/ready_test.go index e8e71d8aa..a2e504e17 100644 --- a/pkg/kube/ready_test.go +++ b/pkg/kube/ready_test.go @@ -17,7 +17,9 @@ package kube // import "helm.sh/helm/v3/pkg/kube" import ( "context" + "fmt" "testing" + "time" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" @@ -25,7 +27,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/kubernetes/fake" + k8stest "k8s.io/client-go/testing" ) const defaultNamespace = metav1.NamespaceDefault @@ -264,6 +269,158 @@ func Test_ReadyChecker_podsReadyForObject(t *testing.T) { } } +func Test_ReadyChecker_waitJobReady(t *testing.T) { + type jobEventAction func(job batchv1.Job) (batchv1.Job, watch.EventType) + type args struct { + job *batchv1.Job + actions []jobEventAction + wantErr error + ctxTimeout *time.Duration + } + makeDuration := func(timeout time.Duration) *time.Duration { + return &timeout + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "job is succeeded on deletion", + args: args{ + job: newJob("foo", 1, intToInt32(1), 0, 0), + actions: []jobEventAction{ + func(job batchv1.Job) (batchv1.Job, watch.EventType) { + return job, watch.Added + }, + func(job batchv1.Job) (batchv1.Job, watch.EventType) { + job.Status.Succeeded = 1 + return job, watch.Deleted + }, + }, + }, + want: true, + }, + { + name: "job is succeeded on modification", + args: args{ + job: newJob("foo", 1, intToInt32(1), 0, 0), + actions: []jobEventAction{ + func(job batchv1.Job) (batchv1.Job, watch.EventType) { + return job, watch.Added + }, + func(job batchv1.Job) (batchv1.Job, watch.EventType) { + job.Status.Succeeded = 1 + return job, watch.Modified + }, + }, + }, + want: true, + }, + { + name: "job is failed", + args: args{job: newJob("foo", 1, intToInt32(1), 0, 1)}, + want: false, + }, + { + name: "job deleted never succeeded", + args: args{ + job: newJob("foo", 1, intToInt32(1), 0, 0), + actions: []jobEventAction{ + func(job batchv1.Job) (batchv1.Job, watch.EventType) { + return job, watch.Added + }, + func(job batchv1.Job) (batchv1.Job, watch.EventType) { + return job, watch.Deleted + }, + }, + wantErr: fmt.Errorf("job default/foo is deleted but wasn't ready"), + }, + }, + { + name: "job is succeeded and deleted", + args: args{ + job: newJob("foo", 0, intToInt32(1), 0, 0), + actions: []jobEventAction{ + func(job batchv1.Job) (batchv1.Job, watch.EventType) { + job.Status.Succeeded = 1 + return job, watch.Deleted + }, + }, + }, + want: true, + }, + { + name: "job with null completions", + args: args{job: newJob("foo", 0, nil, 1, 0)}, + want: true, + }, + { + name: "job is succeeded", + args: args{job: newJob("foo", 1, intToInt32(1), 1, 0)}, + want: true, + }, + { + name: "wait times out", + args: args{ + job: newJob("foo", 1, intToInt32(1), 0, 0), + ctxTimeout: makeDuration(time.Millisecond * 10), + }, + want: false, + }, + } + for _, tt := range tests { + testArgs := tt.args + t.Run(tt.name, func(t *testing.T) { + fakeClient := fake.NewSimpleClientset(testArgs.job) + watcher := watch.NewFake() + fakeClient.PrependWatchReactor( + "jobs", + k8stest.DefaultWatchReactor(watcher, nil), + ) + + go func(job batchv1.Job, eventActions []jobEventAction) { + defer watcher.Stop() + for _, jobAction := range eventActions { + job, event := jobAction(job) + watcher.Action(event, &job) + } + }(*testArgs.job, testArgs.actions) + + checker := NewReadyChecker(fakeClient, nil, CheckJobs(true)) + res := &resource.Info{ + Object: testArgs.job, + Namespace: testArgs.job.Namespace, + Name: testArgs.job.Name, + } + + var ( + ctx = context.Background() + cancel context.CancelFunc + ) + if testArgs.ctxTimeout != nil { + ctx, cancel = context.WithTimeout(ctx, *testArgs.ctxTimeout) + defer cancel() + } + + got, err := checker.waitJobsReady(ctx, ResourceList{res}) + if err != nil { + if testArgs.wantErr == nil { + t.Errorf("waitJobsReady() wanted no error, got '%v'", err) + } else if testArgs.wantErr.Error() != err.Error() { + t.Errorf("waitJobsReady() wanted error '%v', got '%v'", testArgs.wantErr, err) + } + } else if testArgs.wantErr != nil { + t.Errorf("waitJobsReady() wanted error '%v', got none", testArgs.wantErr) + } + + if got != tt.want { + t.Errorf("waitJobsReady() = %v, want %v", got, tt.want) + } + }) + } +} + func Test_ReadyChecker_jobReady(t *testing.T) { type args struct { job *batchv1.Job @@ -551,6 +708,10 @@ func newPersistentVolumeClaim(name string, phase corev1.PersistentVolumeClaimPha func newJob(name string, backoffLimit int, completions *int32, succeeded int, failed int) *batchv1.Job { return &batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + Kind: "Job", + APIVersion: "batch/v1", + }, ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: defaultNamespace, diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index ecdd38940..04bfecf87 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -42,15 +42,15 @@ type waiter struct { log func(string, ...interface{}) } -// waitForResources polls to get the current status of all pods, PVCs, Services and -// Jobs(optional) until all are ready or a timeout is reached +// waitForResources polls to get the current status of all pods, PVCs, Services and until all are ready or a timeout is reached. +// For jobs (optional) it watches for events and checks if the job is (or was) ready at the time of event. func (w *waiter) waitForResources(created ResourceList) error { w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) ctx, cancel := context.WithTimeout(context.Background(), w.timeout) defer cancel() - return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { + err := wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { for _, v := range created { ready, err := w.c.IsReady(ctx, v) if !ready || err != nil { @@ -59,6 +59,21 @@ func (w *waiter) waitForResources(created ResourceList) error { } return true, nil }) + + if err != nil { + return err + } + + if w.c.checkJobs { + jobsReady, err := w.c.waitJobsReady(ctx, created) + if err != nil { + return err + } + if !jobsReady { + return errors.New("jobs not ready") + } + } + return nil } // waitForDeletedResources polls to check if all the resources are deleted or a timeout is reached