diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 34079e7a0..2565d1832 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -132,12 +132,13 @@ func (c *Client) Wait(resources ResourceList, timeout time.Duration) error { if err != nil { return err } + checker := NewReadyChecker(cs, c.Log, PausedAsReady(true)) w := waiter{ - c: cs, + c: checker, log: c.Log, timeout: timeout, } - return w.waitForResources(resources, false) + return w.waitForResources(resources) } // WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs. @@ -146,12 +147,13 @@ func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) err if err != nil { return err } + checker := NewReadyChecker(cs, c.Log, PausedAsReady(true), CheckJobs(true)) w := waiter{ - c: cs, + c: checker, log: c.Log, timeout: timeout, } - return w.waitForResources(resources, true) + return w.waitForResources(resources) } func (c *Client) namespace() string { diff --git a/pkg/kube/ready.go b/pkg/kube/ready.go new file mode 100644 index 000000000..19b93e386 --- /dev/null +++ b/pkg/kube/ready.go @@ -0,0 +1,397 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kube // import "helm.sh/helm/v3/pkg/kube" + +import ( + "context" + + appsv1 "k8s.io/api/apps/v1" + appsv1beta1 "k8s.io/api/apps/v1beta1" + appsv1beta2 "k8s.io/api/apps/v1beta2" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + extensionsv1beta1 "k8s.io/api/extensions/v1beta1" + apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/cli-runtime/pkg/resource" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + + deploymentutil "helm.sh/helm/v3/internal/third_party/k8s.io/kubernetes/deployment/util" +) + +// ReadyCheckerOption is a function that configures a ReadyChecker. +type ReadyCheckerOption func(*ReadyChecker) + +// PausedAsReady returns a ReadyCheckerOption that configures a ReadyChecker +// to consider paused resources to be ready. For example a Deployment +// with spec.paused equal to true would be considered ready. +func PausedAsReady(pausedAsReady bool) ReadyCheckerOption { + return func(c *ReadyChecker) { + c.pausedAsReady = pausedAsReady + } +} + +// CheckJobs returns a ReadyCheckerOption that configures a ReadyChecker +// to consider readiness of Job resources. +func CheckJobs(checkJobs bool) ReadyCheckerOption { + return func(c *ReadyChecker) { + c.checkJobs = checkJobs + } +} + +// NewReadyChecker creates a new checker. Passed ReadyCheckerOptions can +// be used to override defaults. +func NewReadyChecker(cl kubernetes.Interface, log func(string, ...interface{}), opts ...ReadyCheckerOption) ReadyChecker { + c := ReadyChecker{ + client: cl, + log: log, + } + if c.log == nil { + c.log = nopLogger + } + for _, opt := range opts { + opt(&c) + } + return c +} + +// ReadyChecker is a type that can check core Kubernetes types for readiness. +type ReadyChecker struct { + client kubernetes.Interface + log func(string, ...interface{}) + checkJobs bool + pausedAsReady bool +} + +// IsReady checks if v is ready. It supports checking readiness for pods, +// deployments, persistent volume claims, services, daemon sets, custom +// resource definitions, stateful sets, replication controllers, and replica +// sets. All other resource kinds are always considered ready. +// +// IsReady will fetch the latest state of the object from the server prior to +// performing readiness checks, and it will return any error encountered. +func (c *ReadyChecker) IsReady(ctx context.Context, v *resource.Info) (bool, error) { + var ( + // This defaults to true, otherwise we get to a point where + // things will always return false unless one of the objects + // that manages pods has been hit + ok = true + err error + ) + switch value := AsVersioned(v).(type) { + case *corev1.Pod: + pod, err := c.client.CoreV1().Pods(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil || !c.isPodReady(pod) { + return false, err + } + case *batchv1.Job: + if c.checkJobs { + job, err := c.client.BatchV1().Jobs(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil || !c.jobReady(job) { + return false, err + } + } + case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment: + currentDeployment, err := c.client.AppsV1().Deployments(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + // If paused deployment will never be ready + if currentDeployment.Spec.Paused { + return c.pausedAsReady, nil + } + // Find RS associated with deployment + newReplicaSet, err := deploymentutil.GetNewReplicaSet(currentDeployment, c.client.AppsV1()) + if err != nil || newReplicaSet == nil { + return false, err + } + if !c.deploymentReady(newReplicaSet, currentDeployment) { + return false, nil + } + case *corev1.PersistentVolumeClaim: + claim, err := c.client.CoreV1().PersistentVolumeClaims(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if !c.volumeReady(claim) { + return false, nil + } + case *corev1.Service: + svc, err := c.client.CoreV1().Services(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if !c.serviceReady(svc) { + return false, nil + } + case *extensionsv1beta1.DaemonSet, *appsv1.DaemonSet, *appsv1beta2.DaemonSet: + ds, err := c.client.AppsV1().DaemonSets(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if !c.daemonSetReady(ds) { + return false, nil + } + case *apiextv1beta1.CustomResourceDefinition: + if err := v.Get(); err != nil { + return false, err + } + crd := &apiextv1beta1.CustomResourceDefinition{} + if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil { + return false, err + } + if !c.crdBetaReady(*crd) { + return false, nil + } + case *apiextv1.CustomResourceDefinition: + if err := v.Get(); err != nil { + return false, err + } + crd := &apiextv1.CustomResourceDefinition{} + if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil { + return false, err + } + if !c.crdReady(*crd) { + return false, nil + } + case *appsv1.StatefulSet, *appsv1beta1.StatefulSet, *appsv1beta2.StatefulSet: + sts, err := c.client.AppsV1().StatefulSets(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if !c.statefulSetReady(sts) { + return false, nil + } + case *corev1.ReplicationController, *extensionsv1beta1.ReplicaSet, *appsv1beta2.ReplicaSet, *appsv1.ReplicaSet: + ok, err = c.podsReadyForObject(ctx, v.Namespace, value) + } + if !ok || err != nil { + return false, err + } + return true, nil +} + +func (c *ReadyChecker) podsReadyForObject(ctx context.Context, namespace string, obj runtime.Object) (bool, error) { + pods, err := c.podsforObject(ctx, namespace, obj) + if err != nil { + return false, err + } + for _, pod := range pods { + if !c.isPodReady(&pod) { + return false, nil + } + } + return true, nil +} + +func (c *ReadyChecker) podsforObject(ctx context.Context, namespace string, obj runtime.Object) ([]corev1.Pod, error) { + selector, err := SelectorsForObject(obj) + if err != nil { + return nil, err + } + list, err := getPods(ctx, c.client, namespace, selector.String()) + return list, err +} + +// isPodReady returns true if a pod is ready; false otherwise. +func (c *ReadyChecker) isPodReady(pod *corev1.Pod) bool { + for _, c := range pod.Status.Conditions { + if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue { + return true + } + } + c.log("Pod is not ready: %s/%s", pod.GetNamespace(), pod.GetName()) + return false +} + +func (c *ReadyChecker) jobReady(job *batchv1.Job) bool { + if job.Status.Failed > *job.Spec.BackoffLimit { + c.log("Job is failed: %s/%s", job.GetNamespace(), job.GetName()) + return false + } + if job.Status.Succeeded < *job.Spec.Completions { + c.log("Job is not completed: %s/%s", job.GetNamespace(), job.GetName()) + return false + } + return true +} + +func (c *ReadyChecker) serviceReady(s *corev1.Service) bool { + // ExternalName Services are external to cluster so helm shouldn't be checking to see if they're 'ready' (i.e. have an IP Set) + if s.Spec.Type == corev1.ServiceTypeExternalName { + return true + } + + // Ensure that the service cluster IP is not empty + if s.Spec.ClusterIP == "" { + c.log("Service does not have cluster IP address: %s/%s", s.GetNamespace(), s.GetName()) + return false + } + + // This checks if the service has a LoadBalancer and that balancer has an Ingress defined + if s.Spec.Type == corev1.ServiceTypeLoadBalancer { + // do not wait when at least 1 external IP is set + if len(s.Spec.ExternalIPs) > 0 { + c.log("Service %s/%s has external IP addresses (%v), marking as ready", s.GetNamespace(), s.GetName(), s.Spec.ExternalIPs) + return true + } + + if s.Status.LoadBalancer.Ingress == nil { + c.log("Service does not have load balancer ingress IP address: %s/%s", s.GetNamespace(), s.GetName()) + return false + } + } + + return true +} + +func (c *ReadyChecker) volumeReady(v *corev1.PersistentVolumeClaim) bool { + if v.Status.Phase != corev1.ClaimBound { + c.log("PersistentVolumeClaim is not bound: %s/%s", v.GetNamespace(), v.GetName()) + return false + } + return true +} + +func (c *ReadyChecker) deploymentReady(rs *appsv1.ReplicaSet, dep *appsv1.Deployment) bool { + expectedReady := *dep.Spec.Replicas - deploymentutil.MaxUnavailable(*dep) + if !(rs.Status.ReadyReplicas >= expectedReady) { + c.log("Deployment is not ready: %s/%s. %d out of %d expected pods are ready", dep.Namespace, dep.Name, rs.Status.ReadyReplicas, expectedReady) + return false + } + return true +} + +func (c *ReadyChecker) daemonSetReady(ds *appsv1.DaemonSet) bool { + // If the update strategy is not a rolling update, there will be nothing to wait for + if ds.Spec.UpdateStrategy.Type != appsv1.RollingUpdateDaemonSetStrategyType { + return true + } + + // Make sure all the updated pods have been scheduled + if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled { + c.log("DaemonSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", ds.Namespace, ds.Name, ds.Status.UpdatedNumberScheduled, ds.Status.DesiredNumberScheduled) + return false + } + maxUnavailable, err := intstr.GetValueFromIntOrPercent(ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, int(ds.Status.DesiredNumberScheduled), true) + if err != nil { + // If for some reason the value is invalid, set max unavailable to the + // number of desired replicas. This is the same behavior as the + // `MaxUnavailable` function in deploymentutil + maxUnavailable = int(ds.Status.DesiredNumberScheduled) + } + + expectedReady := int(ds.Status.DesiredNumberScheduled) - maxUnavailable + if !(int(ds.Status.NumberReady) >= expectedReady) { + c.log("DaemonSet is not ready: %s/%s. %d out of %d expected pods are ready", ds.Namespace, ds.Name, ds.Status.NumberReady, expectedReady) + return false + } + return true +} + +// Because the v1 extensions API is not available on all supported k8s versions +// yet and because Go doesn't support generics, we need to have a duplicate +// function to support the v1beta1 types +func (c *ReadyChecker) crdBetaReady(crd apiextv1beta1.CustomResourceDefinition) bool { + for _, cond := range crd.Status.Conditions { + switch cond.Type { + case apiextv1beta1.Established: + if cond.Status == apiextv1beta1.ConditionTrue { + return true + } + case apiextv1beta1.NamesAccepted: + if cond.Status == apiextv1beta1.ConditionFalse { + // This indicates a naming conflict, but it's probably not the + // job of this function to fail because of that. Instead, + // we treat it as a success, since the process should be able to + // continue. + return true + } + } + } + return false +} + +func (c *ReadyChecker) crdReady(crd apiextv1.CustomResourceDefinition) bool { + for _, cond := range crd.Status.Conditions { + switch cond.Type { + case apiextv1.Established: + if cond.Status == apiextv1.ConditionTrue { + return true + } + case apiextv1.NamesAccepted: + if cond.Status == apiextv1.ConditionFalse { + // This indicates a naming conflict, but it's probably not the + // job of this function to fail because of that. Instead, + // we treat it as a success, since the process should be able to + // continue. + return true + } + } + } + return false +} + +func (c *ReadyChecker) statefulSetReady(sts *appsv1.StatefulSet) bool { + // If the update strategy is not a rolling update, there will be nothing to wait for + if sts.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType { + return true + } + + // Dereference all the pointers because StatefulSets like them + var partition int + // 1 is the default for replicas if not set + var replicas = 1 + // For some reason, even if the update strategy is a rolling update, the + // actual rollingUpdate field can be nil. If it is, we can safely assume + // there is no partition value + if sts.Spec.UpdateStrategy.RollingUpdate != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil { + partition = int(*sts.Spec.UpdateStrategy.RollingUpdate.Partition) + } + if sts.Spec.Replicas != nil { + replicas = int(*sts.Spec.Replicas) + } + + // Because an update strategy can use partitioning, we need to calculate the + // number of updated replicas we should have. For example, if the replicas + // is set to 3 and the partition is 2, we'd expect only one pod to be + // updated + expectedReplicas := replicas - partition + + // Make sure all the updated pods have been scheduled + if int(sts.Status.UpdatedReplicas) != expectedReplicas { + c.log("StatefulSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", sts.Namespace, sts.Name, sts.Status.UpdatedReplicas, expectedReplicas) + return false + } + + if int(sts.Status.ReadyReplicas) != replicas { + c.log("StatefulSet is not ready: %s/%s. %d out of %d expected pods are ready", sts.Namespace, sts.Name, sts.Status.ReadyReplicas, replicas) + return false + } + return true +} + +func getPods(ctx context.Context, client kubernetes.Interface, namespace, selector string) ([]corev1.Pod, error) { + list, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: selector, + }) + return list.Items, err +} diff --git a/pkg/kube/wait_test.go b/pkg/kube/ready_test.go similarity index 90% rename from pkg/kube/wait_test.go rename to pkg/kube/ready_test.go index 7864f5e00..8fd20bd9f 100644 --- a/pkg/kube/wait_test.go +++ b/pkg/kube/ready_test.go @@ -31,7 +31,7 @@ import ( const defaultNamespace = metav1.NamespaceDefault -func Test_waiter_deploymentReady(t *testing.T) { +func Test_ReadyChecker_deploymentReady(t *testing.T) { type args struct { rs *appsv1.ReplicaSet dep *appsv1.Deployment @@ -68,18 +68,15 @@ func Test_waiter_deploymentReady(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - w := &waiter{ - c: fake.NewSimpleClientset(), - log: nopLogger, - } - if got := w.deploymentReady(tt.args.rs, tt.args.dep); got != tt.want { + c := NewReadyChecker(fake.NewSimpleClientset(), nil) + if got := c.deploymentReady(tt.args.rs, tt.args.dep); got != tt.want { t.Errorf("deploymentReady() = %v, want %v", got, tt.want) } }) } } -func Test_waiter_daemonSetReady(t *testing.T) { +func Test_ReadyChecker_daemonSetReady(t *testing.T) { type args struct { ds *appsv1.DaemonSet } @@ -119,18 +116,15 @@ func Test_waiter_daemonSetReady(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - w := &waiter{ - c: fake.NewSimpleClientset(), - log: nopLogger, - } - if got := w.daemonSetReady(tt.args.ds); got != tt.want { + c := NewReadyChecker(fake.NewSimpleClientset(), nil) + if got := c.daemonSetReady(tt.args.ds); got != tt.want { t.Errorf("daemonSetReady() = %v, want %v", got, tt.want) } }) } } -func Test_waiter_statefulSetReady(t *testing.T) { +func Test_ReadyChecker_statefulSetReady(t *testing.T) { type args struct { sts *appsv1.StatefulSet } @@ -170,18 +164,15 @@ func Test_waiter_statefulSetReady(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - w := &waiter{ - c: fake.NewSimpleClientset(), - log: nopLogger, - } - if got := w.statefulSetReady(tt.args.sts); got != tt.want { + c := NewReadyChecker(fake.NewSimpleClientset(), nil) + if got := c.statefulSetReady(tt.args.sts); got != tt.want { t.Errorf("statefulSetReady() = %v, want %v", got, tt.want) } }) } } -func Test_waiter_podsReadyForObject(t *testing.T) { +func Test_ReadyChecker_podsReadyForObject(t *testing.T) { type args struct { namespace string obj runtime.Object @@ -220,17 +211,14 @@ func Test_waiter_podsReadyForObject(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - w := &waiter{ - c: fake.NewSimpleClientset(), - log: nopLogger, - } + c := NewReadyChecker(fake.NewSimpleClientset(), nil) for _, pod := range tt.existPods { - if _, err := w.c.CoreV1().Pods(defaultNamespace).Create(context.TODO(), &pod, metav1.CreateOptions{}); err != nil { + if _, err := c.client.CoreV1().Pods(defaultNamespace).Create(context.TODO(), &pod, metav1.CreateOptions{}); err != nil { t.Errorf("Failed to create Pod error: %v", err) return } } - got, err := w.podsReadyForObject(tt.args.namespace, tt.args.obj) + got, err := c.podsReadyForObject(context.TODO(), tt.args.namespace, tt.args.obj) if (err != nil) != tt.wantErr { t.Errorf("podsReadyForObject() error = %v, wantErr %v", err, tt.wantErr) return @@ -242,7 +230,7 @@ func Test_waiter_podsReadyForObject(t *testing.T) { } } -func Test_waiter_jobReady(t *testing.T) { +func Test_ReadyChecker_jobReady(t *testing.T) { type args struct { job *batchv1.Job } @@ -289,18 +277,15 @@ func Test_waiter_jobReady(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - w := &waiter{ - c: fake.NewSimpleClientset(), - log: nopLogger, - } - if got := w.jobReady(tt.args.job); got != tt.want { + c := NewReadyChecker(fake.NewSimpleClientset(), nil) + if got := c.jobReady(tt.args.job); got != tt.want { t.Errorf("jobReady() = %v, want %v", got, tt.want) } }) } } -func Test_waiter_volumeReady(t *testing.T) { +func Test_ReadyChecker_volumeReady(t *testing.T) { type args struct { v *corev1.PersistentVolumeClaim } @@ -326,11 +311,8 @@ func Test_waiter_volumeReady(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - w := &waiter{ - c: fake.NewSimpleClientset(), - log: nopLogger, - } - if got := w.volumeReady(tt.args.v); got != tt.want { + c := NewReadyChecker(fake.NewSimpleClientset(), nil) + if got := c.volumeReady(tt.args.v); got != tt.want { t.Errorf("volumeReady() = %v, want %v", got, tt.want) } }) diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index 489dd8132..f9ea2ea85 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -28,339 +28,36 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" extensionsv1beta1 "k8s.io/api/extensions/v1beta1" - apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" - apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - deploymentutil "helm.sh/helm/v3/internal/third_party/k8s.io/kubernetes/deployment/util" + "k8s.io/apimachinery/pkg/util/wait" ) type waiter struct { - c kubernetes.Interface + c ReadyChecker timeout time.Duration log func(string, ...interface{}) } // waitForResources polls to get the current status of all pods, PVCs, Services and // Jobs(optional) until all are ready or a timeout is reached -func (w *waiter) waitForResources(created ResourceList, waitForJobsEnabled bool) error { +func (w *waiter) waitForResources(created ResourceList) error { w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) - return wait.Poll(2*time.Second, w.timeout, func() (bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), w.timeout) + defer cancel() + + return wait.PollImmediateUntil(2*time.Second, func() (bool, error) { for _, v := range created { - var ( - // This defaults to true, otherwise we get to a point where - // things will always return false unless one of the objects - // that manages pods has been hit - ok = true - err error - ) - switch value := AsVersioned(v).(type) { - case *corev1.Pod: - pod, err := w.c.CoreV1().Pods(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{}) - if err != nil || !w.isPodReady(pod) { - return false, err - } - case *batchv1.Job: - if waitForJobsEnabled { - job, err := w.c.BatchV1().Jobs(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{}) - if err != nil || !w.jobReady(job) { - return false, err - } - } - case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment: - currentDeployment, err := w.c.AppsV1().Deployments(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{}) - if err != nil { - return false, err - } - // If paused deployment will never be ready - if currentDeployment.Spec.Paused { - continue - } - // Find RS associated with deployment - newReplicaSet, err := deploymentutil.GetNewReplicaSet(currentDeployment, w.c.AppsV1()) - if err != nil || newReplicaSet == nil { - return false, err - } - if !w.deploymentReady(newReplicaSet, currentDeployment) { - return false, nil - } - case *corev1.PersistentVolumeClaim: - claim, err := w.c.CoreV1().PersistentVolumeClaims(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{}) - if err != nil { - return false, err - } - if !w.volumeReady(claim) { - return false, nil - } - case *corev1.Service: - svc, err := w.c.CoreV1().Services(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{}) - if err != nil { - return false, err - } - if !w.serviceReady(svc) { - return false, nil - } - case *extensionsv1beta1.DaemonSet, *appsv1.DaemonSet, *appsv1beta2.DaemonSet: - ds, err := w.c.AppsV1().DaemonSets(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{}) - if err != nil { - return false, err - } - if !w.daemonSetReady(ds) { - return false, nil - } - case *apiextv1beta1.CustomResourceDefinition: - if err := v.Get(); err != nil { - return false, err - } - crd := &apiextv1beta1.CustomResourceDefinition{} - if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil { - return false, err - } - if !w.crdBetaReady(*crd) { - return false, nil - } - case *apiextv1.CustomResourceDefinition: - if err := v.Get(); err != nil { - return false, err - } - crd := &apiextv1.CustomResourceDefinition{} - if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil { - return false, err - } - if !w.crdReady(*crd) { - return false, nil - } - case *appsv1.StatefulSet, *appsv1beta1.StatefulSet, *appsv1beta2.StatefulSet: - sts, err := w.c.AppsV1().StatefulSets(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{}) - if err != nil { - return false, err - } - if !w.statefulSetReady(sts) { - return false, nil - } - case *corev1.ReplicationController, *extensionsv1beta1.ReplicaSet, *appsv1beta2.ReplicaSet, *appsv1.ReplicaSet: - ok, err = w.podsReadyForObject(v.Namespace, value) - } - if !ok || err != nil { + ready, err := w.c.IsReady(ctx, v) + if !ready || err != nil { return false, err } } return true, nil - }) -} - -func (w *waiter) podsReadyForObject(namespace string, obj runtime.Object) (bool, error) { - pods, err := w.podsforObject(namespace, obj) - if err != nil { - return false, err - } - for _, pod := range pods { - if !w.isPodReady(&pod) { - return false, nil - } - } - return true, nil -} - -func (w *waiter) podsforObject(namespace string, obj runtime.Object) ([]corev1.Pod, error) { - selector, err := SelectorsForObject(obj) - if err != nil { - return nil, err - } - list, err := getPods(w.c, namespace, selector.String()) - return list, err -} - -// isPodReady returns true if a pod is ready; false otherwise. -func (w *waiter) isPodReady(pod *corev1.Pod) bool { - for _, c := range pod.Status.Conditions { - if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue { - return true - } - } - w.log("Pod is not ready: %s/%s", pod.GetNamespace(), pod.GetName()) - return false -} - -func (w *waiter) jobReady(job *batchv1.Job) bool { - if job.Status.Failed > *job.Spec.BackoffLimit { - w.log("Job is failed: %s/%s", job.GetNamespace(), job.GetName()) - return false - } - if job.Status.Succeeded < *job.Spec.Completions { - w.log("Job is not completed: %s/%s", job.GetNamespace(), job.GetName()) - return false - } - return true -} - -func (w *waiter) serviceReady(s *corev1.Service) bool { - // ExternalName Services are external to cluster so helm shouldn't be checking to see if they're 'ready' (i.e. have an IP Set) - if s.Spec.Type == corev1.ServiceTypeExternalName { - return true - } - - // Ensure that the service cluster IP is not empty - if s.Spec.ClusterIP == "" { - w.log("Service does not have cluster IP address: %s/%s", s.GetNamespace(), s.GetName()) - return false - } - - // This checks if the service has a LoadBalancer and that balancer has an Ingress defined - if s.Spec.Type == corev1.ServiceTypeLoadBalancer { - // do not wait when at least 1 external IP is set - if len(s.Spec.ExternalIPs) > 0 { - w.log("Service %s/%s has external IP addresses (%v), marking as ready", s.GetNamespace(), s.GetName(), s.Spec.ExternalIPs) - return true - } - - if s.Status.LoadBalancer.Ingress == nil { - w.log("Service does not have load balancer ingress IP address: %s/%s", s.GetNamespace(), s.GetName()) - return false - } - } - - return true -} - -func (w *waiter) volumeReady(v *corev1.PersistentVolumeClaim) bool { - if v.Status.Phase != corev1.ClaimBound { - w.log("PersistentVolumeClaim is not bound: %s/%s", v.GetNamespace(), v.GetName()) - return false - } - return true -} - -func (w *waiter) deploymentReady(rs *appsv1.ReplicaSet, dep *appsv1.Deployment) bool { - expectedReady := *dep.Spec.Replicas - deploymentutil.MaxUnavailable(*dep) - if !(rs.Status.ReadyReplicas >= expectedReady) { - w.log("Deployment is not ready: %s/%s. %d out of %d expected pods are ready", dep.Namespace, dep.Name, rs.Status.ReadyReplicas, expectedReady) - return false - } - return true -} - -func (w *waiter) daemonSetReady(ds *appsv1.DaemonSet) bool { - // If the update strategy is not a rolling update, there will be nothing to wait for - if ds.Spec.UpdateStrategy.Type != appsv1.RollingUpdateDaemonSetStrategyType { - return true - } - - // Make sure all the updated pods have been scheduled - if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled { - w.log("DaemonSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", ds.Namespace, ds.Name, ds.Status.UpdatedNumberScheduled, ds.Status.DesiredNumberScheduled) - return false - } - maxUnavailable, err := intstr.GetValueFromIntOrPercent(ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, int(ds.Status.DesiredNumberScheduled), true) - if err != nil { - // If for some reason the value is invalid, set max unavailable to the - // number of desired replicas. This is the same behavior as the - // `MaxUnavailable` function in deploymentutil - maxUnavailable = int(ds.Status.DesiredNumberScheduled) - } - - expectedReady := int(ds.Status.DesiredNumberScheduled) - maxUnavailable - if !(int(ds.Status.NumberReady) >= expectedReady) { - w.log("DaemonSet is not ready: %s/%s. %d out of %d expected pods are ready", ds.Namespace, ds.Name, ds.Status.NumberReady, expectedReady) - return false - } - return true -} - -// Because the v1 extensions API is not available on all supported k8s versions -// yet and because Go doesn't support generics, we need to have a duplicate -// function to support the v1beta1 types -func (w *waiter) crdBetaReady(crd apiextv1beta1.CustomResourceDefinition) bool { - for _, cond := range crd.Status.Conditions { - switch cond.Type { - case apiextv1beta1.Established: - if cond.Status == apiextv1beta1.ConditionTrue { - return true - } - case apiextv1beta1.NamesAccepted: - if cond.Status == apiextv1beta1.ConditionFalse { - // This indicates a naming conflict, but it's probably not the - // job of this function to fail because of that. Instead, - // we treat it as a success, since the process should be able to - // continue. - return true - } - } - } - return false -} - -func (w *waiter) crdReady(crd apiextv1.CustomResourceDefinition) bool { - for _, cond := range crd.Status.Conditions { - switch cond.Type { - case apiextv1.Established: - if cond.Status == apiextv1.ConditionTrue { - return true - } - case apiextv1.NamesAccepted: - if cond.Status == apiextv1.ConditionFalse { - // This indicates a naming conflict, but it's probably not the - // job of this function to fail because of that. Instead, - // we treat it as a success, since the process should be able to - // continue. - return true - } - } - } - return false -} - -func (w *waiter) statefulSetReady(sts *appsv1.StatefulSet) bool { - // If the update strategy is not a rolling update, there will be nothing to wait for - if sts.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType { - return true - } - - // Dereference all the pointers because StatefulSets like them - var partition int - // 1 is the default for replicas if not set - var replicas = 1 - // For some reason, even if the update strategy is a rolling update, the - // actual rollingUpdate field can be nil. If it is, we can safely assume - // there is no partition value - if sts.Spec.UpdateStrategy.RollingUpdate != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil { - partition = int(*sts.Spec.UpdateStrategy.RollingUpdate.Partition) - } - if sts.Spec.Replicas != nil { - replicas = int(*sts.Spec.Replicas) - } - - // Because an update strategy can use partitioning, we need to calculate the - // number of updated replicas we should have. For example, if the replicas - // is set to 3 and the partition is 2, we'd expect only one pod to be - // updated - expectedReplicas := replicas - partition - - // Make sure all the updated pods have been scheduled - if int(sts.Status.UpdatedReplicas) != expectedReplicas { - w.log("StatefulSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", sts.Namespace, sts.Name, sts.Status.UpdatedReplicas, expectedReplicas) - return false - } - - if int(sts.Status.ReadyReplicas) != replicas { - w.log("StatefulSet is not ready: %s/%s. %d out of %d expected pods are ready", sts.Namespace, sts.Name, sts.Status.ReadyReplicas, replicas) - return false - } - return true -} - -func getPods(client kubernetes.Interface, namespace, selector string) ([]corev1.Pod, error) { - list, err := client.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{ - LabelSelector: selector, - }) - return list.Items, err + }, ctx.Done()) } // SelectorsForObject returns the pod label selector for a given object