diff --git a/pkg/kube/converter.go b/pkg/kube/converter.go index 3bf0e358c..6cedb4a78 100644 --- a/pkg/kube/converter.go +++ b/pkg/kube/converter.go @@ -19,6 +19,8 @@ package kube // import "helm.sh/helm/v3/pkg/kube" import ( "sync" + "github.com/pkg/errors" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" "k8s.io/apimachinery/pkg/api/meta" @@ -67,3 +69,15 @@ func kubernetesNativeScheme() *runtime.Scheme { }) return k8sNativeScheme } + +// ConvertUnstructuredObject converts runtime.Unstructured to concrete resource representation. +func ConvertUnstructuredObject(obj runtime.Object, concrete interface{}) error { + unstructObj, ok := obj.(runtime.Unstructured) + if !ok { + return errors.Errorf("object expected to be runtime.Unstrcutured, but got %T", obj) + } + return runtime.DefaultUnstructuredConverter.FromUnstructured( + unstructObj.UnstructuredContent(), + concrete, + ) +} diff --git a/pkg/kube/ready.go b/pkg/kube/ready.go index 0554c1729..e76479311 100644 --- a/pkg/kube/ready.go +++ b/pkg/kube/ready.go @@ -19,6 +19,7 @@ package kube // import "helm.sh/helm/v3/pkg/kube" import ( "context" + "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" appsv1beta1 "k8s.io/api/apps/v1beta1" appsv1beta2 "k8s.io/api/apps/v1beta2" @@ -30,6 +31,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -81,6 +83,40 @@ type ReadyChecker struct { pausedAsReady bool } +// waitJobsReady waits until all jobs from resource list are ready. +// Returns error if context is done or a job was deleted without being ready. +// If resource list has no jobs it returns true without error. +func (c *ReadyChecker) waitJobsReady(ctx context.Context, resources ResourceList) (bool, error) { + for _, v := range resources { + switch job := AsVersioned(v).(type) { + case *batchv1.Job: + jobWatch, err := v.Watch(v.ResourceVersion) + if err != nil { + return c.jobReady(job), err + } + defer jobWatch.Stop() + for !c.jobReady(job) { + select { + case <-ctx.Done(): + return c.jobReady(job), ctx.Err() + case event, ok := <-jobWatch.ResultChan(): + if !ok { + break + } + err = ConvertUnstructuredObject(event.Object, &job) + if err != nil { + return false, err + } + if event.Type == watch.Deleted && !c.jobReady(job) { + return false, errors.Errorf("job %v/%v is deleted but wasn't ready", job.GetNamespace(), job.GetName()) + } + } + } + } + } + return true, nil +} + // IsReady checks if v is ready. It supports checking readiness for pods, // deployments, persistent volume claims, services, daemon sets, custom // resource definitions, stateful sets, replication controllers, and replica @@ -102,13 +138,6 @@ func (c *ReadyChecker) IsReady(ctx context.Context, v *resource.Info) (bool, err if err != nil || !c.isPodReady(pod) { return false, err } - case *batchv1.Job: - if c.checkJobs { - job, err := c.client.BatchV1().Jobs(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) - if err != nil || !c.jobReady(job) { - return false, err - } - } case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment: currentDeployment, err := c.client.AppsV1().Deployments(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) if err != nil { diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index 8928d6745..2a140f4ad 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -42,15 +42,15 @@ type waiter struct { log func(string, ...interface{}) } -// waitForResources polls to get the current status of all pods, PVCs, Services and -// Jobs(optional) until all are ready or a timeout is reached +// waitForResources polls to get the current status of all pods, PVCs, Services and until all are ready or a timeout is reached. +// For jobs (optional) it watches for events and checks if the job is (or was) ready at the time of event. func (w *waiter) waitForResources(created ResourceList) error { w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) ctx, cancel := context.WithTimeout(context.Background(), w.timeout) defer cancel() - return wait.PollImmediateUntil(2*time.Second, func() (bool, error) { + err := wait.PollImmediateUntil(2*time.Second, func() (bool, error) { for _, v := range created { ready, err := w.c.IsReady(ctx, v) if !ready || err != nil { @@ -59,6 +59,21 @@ func (w *waiter) waitForResources(created ResourceList) error { } return true, nil }, ctx.Done()) + + if err != nil { + return err + } + + if w.c.checkJobs { + jobsReady, err := w.c.waitJobsReady(ctx, created) + if err != nil { + return err + } + if !jobsReady { + return errors.New("jobs not ready") + } + } + return nil } // waitForDeletedResources polls to check if all the resources are deleted or a timeout is reached