fix: support waiting for jobs with ttlSecondsAfterFinished

Rely on job events instead of polling while waiting for job readiness. This way job completion can be observed even if the job was removed because of expired TTL

Signed-off-by: Vladislav Koriakov <Dkfl12@github.com>
Signed-off-by: Vladislav Koriakov <dkfl12@yahoo.com>
pull/11347/head
Vladislav Koriakov 3 years ago
parent bed23120b0
commit affe47aca9

@ -19,6 +19,8 @@ package kube // import "helm.sh/helm/v3/pkg/kube"
import ( import (
"sync" "sync"
"github.com/pkg/errors"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
@ -67,3 +69,15 @@ func kubernetesNativeScheme() *runtime.Scheme {
}) })
return k8sNativeScheme return k8sNativeScheme
} }
// ConvertUnstructuredObject converts runtime.Unstructured to concrete resource representation.
func ConvertUnstructuredObject(obj runtime.Object, concrete interface{}) error {
unstructObj, ok := obj.(runtime.Unstructured)
if !ok {
return errors.Errorf("object expected to be runtime.Unstrcutured, but got %T", obj)
}
return runtime.DefaultUnstructuredConverter.FromUnstructured(
unstructObj.UnstructuredContent(),
concrete,
)
}

@ -19,6 +19,7 @@ package kube // import "helm.sh/helm/v3/pkg/kube"
import ( import (
"context" "context"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1" appsv1 "k8s.io/api/apps/v1"
appsv1beta1 "k8s.io/api/apps/v1beta1" appsv1beta1 "k8s.io/api/apps/v1beta1"
appsv1beta2 "k8s.io/api/apps/v1beta2" appsv1beta2 "k8s.io/api/apps/v1beta2"
@ -30,6 +31,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/resource" "k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
@ -81,6 +83,40 @@ type ReadyChecker struct {
pausedAsReady bool pausedAsReady bool
} }
// waitJobsReady waits until all jobs from resource list are ready.
// Returns error if context is done or a job was deleted without being ready.
// If resource list has no jobs it returns true without error.
func (c *ReadyChecker) waitJobsReady(ctx context.Context, resources ResourceList) (bool, error) {
for _, v := range resources {
switch job := AsVersioned(v).(type) {
case *batchv1.Job:
jobWatch, err := v.Watch(v.ResourceVersion)
if err != nil {
return c.jobReady(job), err
}
defer jobWatch.Stop()
for !c.jobReady(job) {
select {
case <-ctx.Done():
return c.jobReady(job), ctx.Err()
case event, ok := <-jobWatch.ResultChan():
if !ok {
break
}
err = ConvertUnstructuredObject(event.Object, &job)
if err != nil {
return false, err
}
if event.Type == watch.Deleted && !c.jobReady(job) {
return false, errors.Errorf("job %v/%v is deleted but wasn't ready", job.GetNamespace(), job.GetName())
}
}
}
}
}
return true, nil
}
// IsReady checks if v is ready. It supports checking readiness for pods, // IsReady checks if v is ready. It supports checking readiness for pods,
// deployments, persistent volume claims, services, daemon sets, custom // deployments, persistent volume claims, services, daemon sets, custom
// resource definitions, stateful sets, replication controllers, and replica // resource definitions, stateful sets, replication controllers, and replica
@ -102,13 +138,6 @@ func (c *ReadyChecker) IsReady(ctx context.Context, v *resource.Info) (bool, err
if err != nil || !c.isPodReady(pod) { if err != nil || !c.isPodReady(pod) {
return false, err return false, err
} }
case *batchv1.Job:
if c.checkJobs {
job, err := c.client.BatchV1().Jobs(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
if err != nil || !c.jobReady(job) {
return false, err
}
}
case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment: case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment:
currentDeployment, err := c.client.AppsV1().Deployments(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) currentDeployment, err := c.client.AppsV1().Deployments(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
if err != nil { if err != nil {

@ -42,15 +42,15 @@ type waiter struct {
log func(string, ...interface{}) log func(string, ...interface{})
} }
// waitForResources polls to get the current status of all pods, PVCs, Services and // waitForResources polls to get the current status of all pods, PVCs, Services and until all are ready or a timeout is reached.
// Jobs(optional) until all are ready or a timeout is reached // For jobs (optional) it watches for events and checks if the job is (or was) ready at the time of event.
func (w *waiter) waitForResources(created ResourceList) error { func (w *waiter) waitForResources(created ResourceList) error {
w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout)
ctx, cancel := context.WithTimeout(context.Background(), w.timeout) ctx, cancel := context.WithTimeout(context.Background(), w.timeout)
defer cancel() defer cancel()
return wait.PollImmediateUntil(2*time.Second, func() (bool, error) { err := wait.PollImmediateUntil(2*time.Second, func() (bool, error) {
for _, v := range created { for _, v := range created {
ready, err := w.c.IsReady(ctx, v) ready, err := w.c.IsReady(ctx, v)
if !ready || err != nil { if !ready || err != nil {
@ -59,6 +59,21 @@ func (w *waiter) waitForResources(created ResourceList) error {
} }
return true, nil return true, nil
}, ctx.Done()) }, ctx.Done())
if err != nil {
return err
}
if w.c.checkJobs {
jobsReady, err := w.c.waitJobsReady(ctx, created)
if err != nil {
return err
}
if !jobsReady {
return errors.New("jobs not ready")
}
}
return nil
} }
// waitForDeletedResources polls to check if all the resources are deleted or a timeout is reached // waitForDeletedResources polls to check if all the resources are deleted or a timeout is reached

Loading…
Cancel
Save