From affe47aca9350441ff0a76b71ee9f719a5da6e3f Mon Sep 17 00:00:00 2001 From: Vladislav Koriakov Date: Thu, 15 Sep 2022 22:08:53 +0300 Subject: [PATCH] fix: support waiting for jobs with ttlSecondsAfterFinished Rely on job events instead of polling while waiting for job readiness. This way job completion can be observed even if the job was removed because of expired TTL Signed-off-by: Vladislav Koriakov Signed-off-by: Vladislav Koriakov --- pkg/kube/converter.go | 14 ++++++++++++++ pkg/kube/ready.go | 43 ++++++++++++++++++++++++++++++++++++------- pkg/kube/wait.go | 21 ++++++++++++++++++--- 3 files changed, 68 insertions(+), 10 deletions(-) diff --git a/pkg/kube/converter.go b/pkg/kube/converter.go index 3bf0e358c..6cedb4a78 100644 --- a/pkg/kube/converter.go +++ b/pkg/kube/converter.go @@ -19,6 +19,8 @@ package kube // import "helm.sh/helm/v3/pkg/kube" import ( "sync" + "github.com/pkg/errors" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" "k8s.io/apimachinery/pkg/api/meta" @@ -67,3 +69,15 @@ func kubernetesNativeScheme() *runtime.Scheme { }) return k8sNativeScheme } + +// ConvertUnstructuredObject converts runtime.Unstructured to concrete resource representation. +func ConvertUnstructuredObject(obj runtime.Object, concrete interface{}) error { + unstructObj, ok := obj.(runtime.Unstructured) + if !ok { + return errors.Errorf("object expected to be runtime.Unstrcutured, but got %T", obj) + } + return runtime.DefaultUnstructuredConverter.FromUnstructured( + unstructObj.UnstructuredContent(), + concrete, + ) +} diff --git a/pkg/kube/ready.go b/pkg/kube/ready.go index 0554c1729..e76479311 100644 --- a/pkg/kube/ready.go +++ b/pkg/kube/ready.go @@ -19,6 +19,7 @@ package kube // import "helm.sh/helm/v3/pkg/kube" import ( "context" + "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" appsv1beta1 "k8s.io/api/apps/v1beta1" appsv1beta2 "k8s.io/api/apps/v1beta2" @@ -30,6 +31,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -81,6 +83,40 @@ type ReadyChecker struct { pausedAsReady bool } +// waitJobsReady waits until all jobs from resource list are ready. +// Returns error if context is done or a job was deleted without being ready. +// If resource list has no jobs it returns true without error. +func (c *ReadyChecker) waitJobsReady(ctx context.Context, resources ResourceList) (bool, error) { + for _, v := range resources { + switch job := AsVersioned(v).(type) { + case *batchv1.Job: + jobWatch, err := v.Watch(v.ResourceVersion) + if err != nil { + return c.jobReady(job), err + } + defer jobWatch.Stop() + for !c.jobReady(job) { + select { + case <-ctx.Done(): + return c.jobReady(job), ctx.Err() + case event, ok := <-jobWatch.ResultChan(): + if !ok { + break + } + err = ConvertUnstructuredObject(event.Object, &job) + if err != nil { + return false, err + } + if event.Type == watch.Deleted && !c.jobReady(job) { + return false, errors.Errorf("job %v/%v is deleted but wasn't ready", job.GetNamespace(), job.GetName()) + } + } + } + } + } + return true, nil +} + // IsReady checks if v is ready. It supports checking readiness for pods, // deployments, persistent volume claims, services, daemon sets, custom // resource definitions, stateful sets, replication controllers, and replica @@ -102,13 +138,6 @@ func (c *ReadyChecker) IsReady(ctx context.Context, v *resource.Info) (bool, err if err != nil || !c.isPodReady(pod) { return false, err } - case *batchv1.Job: - if c.checkJobs { - job, err := c.client.BatchV1().Jobs(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) - if err != nil || !c.jobReady(job) { - return false, err - } - } case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment: currentDeployment, err := c.client.AppsV1().Deployments(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) if err != nil { diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index 8928d6745..2a140f4ad 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -42,15 +42,15 @@ type waiter struct { log func(string, ...interface{}) } -// waitForResources polls to get the current status of all pods, PVCs, Services and -// Jobs(optional) until all are ready or a timeout is reached +// waitForResources polls to get the current status of all pods, PVCs, Services and until all are ready or a timeout is reached. +// For jobs (optional) it watches for events and checks if the job is (or was) ready at the time of event. func (w *waiter) waitForResources(created ResourceList) error { w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) ctx, cancel := context.WithTimeout(context.Background(), w.timeout) defer cancel() - return wait.PollImmediateUntil(2*time.Second, func() (bool, error) { + err := wait.PollImmediateUntil(2*time.Second, func() (bool, error) { for _, v := range created { ready, err := w.c.IsReady(ctx, v) if !ready || err != nil { @@ -59,6 +59,21 @@ func (w *waiter) waitForResources(created ResourceList) error { } return true, nil }, ctx.Done()) + + if err != nil { + return err + } + + if w.c.checkJobs { + jobsReady, err := w.c.waitJobsReady(ctx, created) + if err != nil { + return err + } + if !jobsReady { + return errors.New("jobs not ready") + } + } + return nil } // waitForDeletedResources polls to check if all the resources are deleted or a timeout is reached