From 96867dcf2993b50615382da4449ab09174126266 Mon Sep 17 00:00:00 2001 From: Vladislav Koriakov Date: Thu, 15 Sep 2022 22:23:58 +0300 Subject: [PATCH] fix(helm): support waiting for jobs with ttlSecondsAfterFinished Rely on job events instead of polling while waiting for job readiness. This way job completion can be observed even if the job was removed because of expired TTL Signed-off-by: Vladislav Koriakov --- pkg/kube/ready.go | 85 ++++++++++++++++++++++ pkg/kube/ready_test.go | 161 +++++++++++++++++++++++++++++++++++++++++ pkg/kube/wait.go | 21 +++++- 3 files changed, 264 insertions(+), 3 deletions(-) diff --git a/pkg/kube/ready.go b/pkg/kube/ready.go index 7172a42bc..33a21cb3d 100644 --- a/pkg/kube/ready.go +++ b/pkg/kube/ready.go @@ -29,8 +29,10 @@ import ( apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -82,6 +84,89 @@ type ReadyChecker struct { pausedAsReady bool } +// getResourceWatch attempts to get watch.Interface for the supplied resource. +// resource is expected to have a name. +func (c *ReadyChecker) getResourceWatch(ctx context.Context, resource *resource.Info) (watch.Interface, error) { + if resource.Name == "" { + return nil, fmt.Errorf("can't get watch for resource with empty name") + } + fieldNameSelector := fmt.Sprintf("metadata.name=%s", fields.EscapeValue(resource.Name)) + listOpts := metav1.ListOptions{ + FieldSelector: fieldNameSelector, + ResourceVersion: resource.ResourceVersion, + } + switch resourceType := AsVersioned(resource).(type) { + case *batchv1.Job: + return c.client.BatchV1().Jobs(resource.Namespace).Watch(ctx, listOpts) + default: + return nil, fmt.Errorf("can't get watch for resoure of type %T - not implemented", resourceType) + } +} + +// convert event.Object to *batchv1.Job or return an error. +func eventObjectAsJob(event watch.Event) (*batchv1.Job, error) { + job, ok := event.Object.(*batchv1.Job) + if !ok { + return nil, fmt.Errorf( + "expected runtime.Object type of type *batchv1.Job, got %T", + job, + ) + } + return job, nil +} + +// waitJobsReady waits until all jobs from resource list are ready. +// Returns error if context is done or a job wasn't ready. +// If resource list has no jobs it returns true without an error. +func (c *ReadyChecker) waitJobsReady(ctx context.Context, resources ResourceList) (bool, error) { + for _, resource := range resources { + switch job := AsVersioned(resource).(type) { + case *batchv1.Job: + lastSeen := job + jobWatch, err := c.getResourceWatch(ctx, resource) + if err != nil { + c.log("Falling back to polling, error watching job events: %v", err) + return c.IsReady(ctx, resource) + } + defer jobWatch.Stop() + isReady, err := c.jobReady(lastSeen) + if err != nil { + return isReady, err + } + for !isReady { + select { + case <-ctx.Done(): + isReady, err = c.jobReady(lastSeen) + if err != nil { + return isReady, err + } + return isReady, ctx.Err() + case event, ok := <-jobWatch.ResultChan(): + if !ok { + return c.jobReady(lastSeen) + } + lastSeen, err = eventObjectAsJob(event) + if err != nil { + return isReady, err + } + isReady, err = c.jobReady(lastSeen) + if err != nil { + return isReady, err + } + if event.Type == watch.Deleted && !isReady { + return false, fmt.Errorf( + "job %v/%v is deleted but wasn't ready", + lastSeen.GetNamespace(), + lastSeen.GetName(), + ) + } + } + } + } + } + return true, nil +} + // IsReady checks if v is ready. It supports checking readiness for pods, // deployments, persistent volume claims, services, daemon sets, custom // resource definitions, stateful sets, replication controllers, jobs (optional), diff --git a/pkg/kube/ready_test.go b/pkg/kube/ready_test.go index e8e71d8aa..a2e504e17 100644 --- a/pkg/kube/ready_test.go +++ b/pkg/kube/ready_test.go @@ -17,7 +17,9 @@ package kube // import "helm.sh/helm/v3/pkg/kube" import ( "context" + "fmt" "testing" + "time" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" @@ -25,7 +27,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/kubernetes/fake" + k8stest "k8s.io/client-go/testing" ) const defaultNamespace = metav1.NamespaceDefault @@ -264,6 +269,158 @@ func Test_ReadyChecker_podsReadyForObject(t *testing.T) { } } +func Test_ReadyChecker_waitJobReady(t *testing.T) { + type jobEventAction func(job batchv1.Job) (batchv1.Job, watch.EventType) + type args struct { + job *batchv1.Job + actions []jobEventAction + wantErr error + ctxTimeout *time.Duration + } + makeDuration := func(timeout time.Duration) *time.Duration { + return &timeout + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "job is succeeded on deletion", + args: args{ + job: newJob("foo", 1, intToInt32(1), 0, 0), + actions: []jobEventAction{ + func(job batchv1.Job) (batchv1.Job, watch.EventType) { + return job, watch.Added + }, + func(job batchv1.Job) (batchv1.Job, watch.EventType) { + job.Status.Succeeded = 1 + return job, watch.Deleted + }, + }, + }, + want: true, + }, + { + name: "job is succeeded on modification", + args: args{ + job: newJob("foo", 1, intToInt32(1), 0, 0), + actions: []jobEventAction{ + func(job batchv1.Job) (batchv1.Job, watch.EventType) { + return job, watch.Added + }, + func(job batchv1.Job) (batchv1.Job, watch.EventType) { + job.Status.Succeeded = 1 + return job, watch.Modified + }, + }, + }, + want: true, + }, + { + name: "job is failed", + args: args{job: newJob("foo", 1, intToInt32(1), 0, 1)}, + want: false, + }, + { + name: "job deleted never succeeded", + args: args{ + job: newJob("foo", 1, intToInt32(1), 0, 0), + actions: []jobEventAction{ + func(job batchv1.Job) (batchv1.Job, watch.EventType) { + return job, watch.Added + }, + func(job batchv1.Job) (batchv1.Job, watch.EventType) { + return job, watch.Deleted + }, + }, + wantErr: fmt.Errorf("job default/foo is deleted but wasn't ready"), + }, + }, + { + name: "job is succeeded and deleted", + args: args{ + job: newJob("foo", 0, intToInt32(1), 0, 0), + actions: []jobEventAction{ + func(job batchv1.Job) (batchv1.Job, watch.EventType) { + job.Status.Succeeded = 1 + return job, watch.Deleted + }, + }, + }, + want: true, + }, + { + name: "job with null completions", + args: args{job: newJob("foo", 0, nil, 1, 0)}, + want: true, + }, + { + name: "job is succeeded", + args: args{job: newJob("foo", 1, intToInt32(1), 1, 0)}, + want: true, + }, + { + name: "wait times out", + args: args{ + job: newJob("foo", 1, intToInt32(1), 0, 0), + ctxTimeout: makeDuration(time.Millisecond * 10), + }, + want: false, + }, + } + for _, tt := range tests { + testArgs := tt.args + t.Run(tt.name, func(t *testing.T) { + fakeClient := fake.NewSimpleClientset(testArgs.job) + watcher := watch.NewFake() + fakeClient.PrependWatchReactor( + "jobs", + k8stest.DefaultWatchReactor(watcher, nil), + ) + + go func(job batchv1.Job, eventActions []jobEventAction) { + defer watcher.Stop() + for _, jobAction := range eventActions { + job, event := jobAction(job) + watcher.Action(event, &job) + } + }(*testArgs.job, testArgs.actions) + + checker := NewReadyChecker(fakeClient, nil, CheckJobs(true)) + res := &resource.Info{ + Object: testArgs.job, + Namespace: testArgs.job.Namespace, + Name: testArgs.job.Name, + } + + var ( + ctx = context.Background() + cancel context.CancelFunc + ) + if testArgs.ctxTimeout != nil { + ctx, cancel = context.WithTimeout(ctx, *testArgs.ctxTimeout) + defer cancel() + } + + got, err := checker.waitJobsReady(ctx, ResourceList{res}) + if err != nil { + if testArgs.wantErr == nil { + t.Errorf("waitJobsReady() wanted no error, got '%v'", err) + } else if testArgs.wantErr.Error() != err.Error() { + t.Errorf("waitJobsReady() wanted error '%v', got '%v'", testArgs.wantErr, err) + } + } else if testArgs.wantErr != nil { + t.Errorf("waitJobsReady() wanted error '%v', got none", testArgs.wantErr) + } + + if got != tt.want { + t.Errorf("waitJobsReady() = %v, want %v", got, tt.want) + } + }) + } +} + func Test_ReadyChecker_jobReady(t *testing.T) { type args struct { job *batchv1.Job @@ -551,6 +708,10 @@ func newPersistentVolumeClaim(name string, phase corev1.PersistentVolumeClaimPha func newJob(name string, backoffLimit int, completions *int32, succeeded int, failed int) *batchv1.Job { return &batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + Kind: "Job", + APIVersion: "batch/v1", + }, ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: defaultNamespace, diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index ecdd38940..04bfecf87 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -42,15 +42,15 @@ type waiter struct { log func(string, ...interface{}) } -// waitForResources polls to get the current status of all pods, PVCs, Services and -// Jobs(optional) until all are ready or a timeout is reached +// waitForResources polls to get the current status of all pods, PVCs, Services and until all are ready or a timeout is reached. +// For jobs (optional) it watches for events and checks if the job is (or was) ready at the time of event. func (w *waiter) waitForResources(created ResourceList) error { w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) ctx, cancel := context.WithTimeout(context.Background(), w.timeout) defer cancel() - return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { + err := wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { for _, v := range created { ready, err := w.c.IsReady(ctx, v) if !ready || err != nil { @@ -59,6 +59,21 @@ func (w *waiter) waitForResources(created ResourceList) error { } return true, nil }) + + if err != nil { + return err + } + + if w.c.checkJobs { + jobsReady, err := w.c.waitJobsReady(ctx, created) + if err != nil { + return err + } + if !jobsReady { + return errors.New("jobs not ready") + } + } + return nil } // waitForDeletedResources polls to check if all the resources are deleted or a timeout is reached