mirror of https://github.com/helm/helm
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
418 lines
15 KiB
418 lines
15 KiB
/*
|
|
Copyright The Helm Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package kube // import "helm.sh/helm/v3/pkg/kube"
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
appsv1 "k8s.io/api/apps/v1"
|
|
appsv1beta1 "k8s.io/api/apps/v1beta1"
|
|
appsv1beta2 "k8s.io/api/apps/v1beta2"
|
|
batchv1 "k8s.io/api/batch/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
|
|
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
|
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/util/intstr"
|
|
"k8s.io/cli-runtime/pkg/resource"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/kubernetes/scheme"
|
|
|
|
deploymentutil "helm.sh/helm/v3/internal/third_party/k8s.io/kubernetes/deployment/util"
|
|
)
|
|
|
|
// ReadyCheckerOption is a function that configures a ReadyChecker.
|
|
type ReadyCheckerOption func(*ReadyChecker)
|
|
|
|
// PausedAsReady returns a ReadyCheckerOption that configures a ReadyChecker
|
|
// to consider paused resources to be ready. For example a Deployment
|
|
// with spec.paused equal to true would be considered ready.
|
|
func PausedAsReady(pausedAsReady bool) ReadyCheckerOption {
|
|
return func(c *ReadyChecker) {
|
|
c.pausedAsReady = pausedAsReady
|
|
}
|
|
}
|
|
|
|
// CheckJobs returns a ReadyCheckerOption that configures a ReadyChecker
|
|
// to consider readiness of Job resources.
|
|
func CheckJobs(checkJobs bool) ReadyCheckerOption {
|
|
return func(c *ReadyChecker) {
|
|
c.checkJobs = checkJobs
|
|
}
|
|
}
|
|
|
|
// NewReadyChecker creates a new checker. Passed ReadyCheckerOptions can
|
|
// be used to override defaults.
|
|
func NewReadyChecker(cl kubernetes.Interface, log func(string, ...interface{}), opts ...ReadyCheckerOption) ReadyChecker {
|
|
c := ReadyChecker{
|
|
client: cl,
|
|
log: log,
|
|
}
|
|
if c.log == nil {
|
|
c.log = nopLogger
|
|
}
|
|
for _, opt := range opts {
|
|
opt(&c)
|
|
}
|
|
return c
|
|
}
|
|
|
|
// ReadyChecker is a type that can check core Kubernetes types for readiness.
|
|
type ReadyChecker struct {
|
|
client kubernetes.Interface
|
|
log func(string, ...interface{})
|
|
checkJobs bool
|
|
pausedAsReady bool
|
|
}
|
|
|
|
// IsReady checks if v is ready. It supports checking readiness for pods,
|
|
// deployments, persistent volume claims, services, daemon sets, custom
|
|
// resource definitions, stateful sets, replication controllers, jobs (optional),
|
|
// and replica sets. All other resource kinds are always considered ready.
|
|
//
|
|
// IsReady will fetch the latest state of the object from the server prior to
|
|
// performing readiness checks, and it will return any error encountered.
|
|
func (c *ReadyChecker) IsReady(ctx context.Context, v *resource.Info) (bool, error) {
|
|
var (
|
|
// This defaults to true, otherwise we get to a point where
|
|
// things will always return false unless one of the objects
|
|
// that manages pods has been hit
|
|
ok = true
|
|
err error
|
|
)
|
|
switch value := AsVersioned(v).(type) {
|
|
case *corev1.Pod:
|
|
pod, err := c.client.CoreV1().Pods(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
|
|
if err != nil || !c.isPodReady(pod) {
|
|
return false, err
|
|
}
|
|
case *batchv1.Job:
|
|
if c.checkJobs {
|
|
job, err := c.client.BatchV1().Jobs(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
ready, err := c.jobReady(job)
|
|
return ready, err
|
|
}
|
|
case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment:
|
|
currentDeployment, err := c.client.AppsV1().Deployments(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
// If paused deployment will never be ready
|
|
if currentDeployment.Spec.Paused {
|
|
return c.pausedAsReady, nil
|
|
}
|
|
// Find RS associated with deployment
|
|
newReplicaSet, err := deploymentutil.GetNewReplicaSet(currentDeployment, c.client.AppsV1())
|
|
if err != nil || newReplicaSet == nil {
|
|
return false, err
|
|
}
|
|
if !c.deploymentReady(newReplicaSet, currentDeployment) {
|
|
return false, nil
|
|
}
|
|
case *corev1.PersistentVolumeClaim:
|
|
claim, err := c.client.CoreV1().PersistentVolumeClaims(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if !c.volumeReady(claim) {
|
|
return false, nil
|
|
}
|
|
case *corev1.Service:
|
|
svc, err := c.client.CoreV1().Services(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if !c.serviceReady(svc) {
|
|
return false, nil
|
|
}
|
|
case *extensionsv1beta1.DaemonSet, *appsv1.DaemonSet, *appsv1beta2.DaemonSet:
|
|
ds, err := c.client.AppsV1().DaemonSets(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if !c.daemonSetReady(ds) {
|
|
return false, nil
|
|
}
|
|
case *apiextv1beta1.CustomResourceDefinition:
|
|
if err := v.Get(); err != nil {
|
|
return false, err
|
|
}
|
|
crd := &apiextv1beta1.CustomResourceDefinition{}
|
|
if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil {
|
|
return false, err
|
|
}
|
|
if !c.crdBetaReady(*crd) {
|
|
return false, nil
|
|
}
|
|
case *apiextv1.CustomResourceDefinition:
|
|
if err := v.Get(); err != nil {
|
|
return false, err
|
|
}
|
|
crd := &apiextv1.CustomResourceDefinition{}
|
|
if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil {
|
|
return false, err
|
|
}
|
|
if !c.crdReady(*crd) {
|
|
return false, nil
|
|
}
|
|
case *appsv1.StatefulSet, *appsv1beta1.StatefulSet, *appsv1beta2.StatefulSet:
|
|
sts, err := c.client.AppsV1().StatefulSets(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if !c.statefulSetReady(sts) {
|
|
return false, nil
|
|
}
|
|
case *corev1.ReplicationController, *extensionsv1beta1.ReplicaSet, *appsv1beta2.ReplicaSet, *appsv1.ReplicaSet:
|
|
ok, err = c.podsReadyForObject(ctx, v.Namespace, value)
|
|
}
|
|
if !ok || err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func (c *ReadyChecker) podsReadyForObject(ctx context.Context, namespace string, obj runtime.Object) (bool, error) {
|
|
pods, err := c.podsforObject(ctx, namespace, obj)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
for _, pod := range pods {
|
|
if !c.isPodReady(&pod) {
|
|
return false, nil
|
|
}
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func (c *ReadyChecker) podsforObject(ctx context.Context, namespace string, obj runtime.Object) ([]corev1.Pod, error) {
|
|
selector, err := SelectorsForObject(obj)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
list, err := getPods(ctx, c.client, namespace, selector.String())
|
|
return list, err
|
|
}
|
|
|
|
// isPodReady returns true if a pod is ready; false otherwise.
|
|
func (c *ReadyChecker) isPodReady(pod *corev1.Pod) bool {
|
|
for _, c := range pod.Status.Conditions {
|
|
if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue {
|
|
return true
|
|
}
|
|
}
|
|
c.log("Pod is not ready: %s/%s", pod.GetNamespace(), pod.GetName())
|
|
return false
|
|
}
|
|
|
|
func (c *ReadyChecker) jobReady(job *batchv1.Job) (bool, error) {
|
|
if job.Status.Failed > *job.Spec.BackoffLimit {
|
|
c.log("Job is failed: %s/%s", job.GetNamespace(), job.GetName())
|
|
// If a job is failed, it can't recover, so throw an error
|
|
return false, fmt.Errorf("job is failed: %s/%s", job.GetNamespace(), job.GetName())
|
|
}
|
|
if job.Spec.Completions != nil && job.Status.Succeeded < *job.Spec.Completions {
|
|
c.log("Job is not completed: %s/%s", job.GetNamespace(), job.GetName())
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func (c *ReadyChecker) serviceReady(s *corev1.Service) bool {
|
|
// ExternalName Services are external to cluster so helm shouldn't be checking to see if they're 'ready' (i.e. have an IP Set)
|
|
if s.Spec.Type == corev1.ServiceTypeExternalName {
|
|
return true
|
|
}
|
|
|
|
// Ensure that the service cluster IP is not empty
|
|
if s.Spec.ClusterIP == "" {
|
|
c.log("Service does not have cluster IP address: %s/%s", s.GetNamespace(), s.GetName())
|
|
return false
|
|
}
|
|
|
|
// This checks if the service has a LoadBalancer and that balancer has an Ingress defined
|
|
if s.Spec.Type == corev1.ServiceTypeLoadBalancer {
|
|
// do not wait when at least 1 external IP is set
|
|
if len(s.Spec.ExternalIPs) > 0 {
|
|
c.log("Service %s/%s has external IP addresses (%v), marking as ready", s.GetNamespace(), s.GetName(), s.Spec.ExternalIPs)
|
|
return true
|
|
}
|
|
|
|
if s.Status.LoadBalancer.Ingress == nil {
|
|
c.log("Service does not have load balancer ingress IP address: %s/%s", s.GetNamespace(), s.GetName())
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (c *ReadyChecker) volumeReady(v *corev1.PersistentVolumeClaim) bool {
|
|
if v.Status.Phase != corev1.ClaimBound {
|
|
c.log("PersistentVolumeClaim is not bound: %s/%s", v.GetNamespace(), v.GetName())
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (c *ReadyChecker) deploymentReady(rs *appsv1.ReplicaSet, dep *appsv1.Deployment) bool {
|
|
expectedReady := *dep.Spec.Replicas - deploymentutil.MaxUnavailable(*dep)
|
|
if !(rs.Status.ReadyReplicas >= expectedReady) {
|
|
c.log("Deployment is not ready: %s/%s. %d out of %d expected pods are ready", dep.Namespace, dep.Name, rs.Status.ReadyReplicas, expectedReady)
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (c *ReadyChecker) daemonSetReady(ds *appsv1.DaemonSet) bool {
|
|
// If the update strategy is not a rolling update, there will be nothing to wait for
|
|
if ds.Spec.UpdateStrategy.Type != appsv1.RollingUpdateDaemonSetStrategyType {
|
|
return true
|
|
}
|
|
|
|
// Make sure all the updated pods have been scheduled
|
|
if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled {
|
|
c.log("DaemonSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", ds.Namespace, ds.Name, ds.Status.UpdatedNumberScheduled, ds.Status.DesiredNumberScheduled)
|
|
return false
|
|
}
|
|
maxUnavailable, err := intstr.GetScaledValueFromIntOrPercent(ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, int(ds.Status.DesiredNumberScheduled), true)
|
|
if err != nil {
|
|
// If for some reason the value is invalid, set max unavailable to the
|
|
// number of desired replicas. This is the same behavior as the
|
|
// `MaxUnavailable` function in deploymentutil
|
|
maxUnavailable = int(ds.Status.DesiredNumberScheduled)
|
|
}
|
|
|
|
expectedReady := int(ds.Status.DesiredNumberScheduled) - maxUnavailable
|
|
if !(int(ds.Status.NumberReady) >= expectedReady) {
|
|
c.log("DaemonSet is not ready: %s/%s. %d out of %d expected pods are ready", ds.Namespace, ds.Name, ds.Status.NumberReady, expectedReady)
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Because the v1 extensions API is not available on all supported k8s versions
|
|
// yet and because Go doesn't support generics, we need to have a duplicate
|
|
// function to support the v1beta1 types
|
|
func (c *ReadyChecker) crdBetaReady(crd apiextv1beta1.CustomResourceDefinition) bool {
|
|
for _, cond := range crd.Status.Conditions {
|
|
switch cond.Type {
|
|
case apiextv1beta1.Established:
|
|
if cond.Status == apiextv1beta1.ConditionTrue {
|
|
return true
|
|
}
|
|
case apiextv1beta1.NamesAccepted:
|
|
if cond.Status == apiextv1beta1.ConditionFalse {
|
|
// This indicates a naming conflict, but it's probably not the
|
|
// job of this function to fail because of that. Instead,
|
|
// we treat it as a success, since the process should be able to
|
|
// continue.
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (c *ReadyChecker) crdReady(crd apiextv1.CustomResourceDefinition) bool {
|
|
for _, cond := range crd.Status.Conditions {
|
|
switch cond.Type {
|
|
case apiextv1.Established:
|
|
if cond.Status == apiextv1.ConditionTrue {
|
|
return true
|
|
}
|
|
case apiextv1.NamesAccepted:
|
|
if cond.Status == apiextv1.ConditionFalse {
|
|
// This indicates a naming conflict, but it's probably not the
|
|
// job of this function to fail because of that. Instead,
|
|
// we treat it as a success, since the process should be able to
|
|
// continue.
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (c *ReadyChecker) statefulSetReady(sts *appsv1.StatefulSet) bool {
|
|
// If the update strategy is not a rolling update, there will be nothing to wait for
|
|
if sts.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {
|
|
c.log("StatefulSet skipped ready check: %s/%s. updateStrategy is %v", sts.Namespace, sts.Name, sts.Spec.UpdateStrategy.Type)
|
|
return true
|
|
}
|
|
|
|
// Make sure the status is up-to-date with the StatefulSet changes
|
|
if sts.Status.ObservedGeneration < sts.Generation {
|
|
c.log("StatefulSet is not ready: %s/%s. update has not yet been observed", sts.Namespace, sts.Name)
|
|
return false
|
|
}
|
|
|
|
// Dereference all the pointers because StatefulSets like them
|
|
var partition int
|
|
// 1 is the default for replicas if not set
|
|
var replicas = 1
|
|
// For some reason, even if the update strategy is a rolling update, the
|
|
// actual rollingUpdate field can be nil. If it is, we can safely assume
|
|
// there is no partition value
|
|
if sts.Spec.UpdateStrategy.RollingUpdate != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
|
|
partition = int(*sts.Spec.UpdateStrategy.RollingUpdate.Partition)
|
|
}
|
|
if sts.Spec.Replicas != nil {
|
|
replicas = int(*sts.Spec.Replicas)
|
|
}
|
|
|
|
// Because an update strategy can use partitioning, we need to calculate the
|
|
// number of updated replicas we should have. For example, if the replicas
|
|
// is set to 3 and the partition is 2, we'd expect only one pod to be
|
|
// updated
|
|
expectedReplicas := replicas - partition
|
|
|
|
// Make sure all the updated pods have been scheduled
|
|
if int(sts.Status.UpdatedReplicas) < expectedReplicas {
|
|
c.log("StatefulSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", sts.Namespace, sts.Name, sts.Status.UpdatedReplicas, expectedReplicas)
|
|
return false
|
|
}
|
|
|
|
if int(sts.Status.ReadyReplicas) != replicas {
|
|
c.log("StatefulSet is not ready: %s/%s. %d out of %d expected pods are ready", sts.Namespace, sts.Name, sts.Status.ReadyReplicas, replicas)
|
|
return false
|
|
}
|
|
// This check only makes sense when all partitions are being upgraded otherwise during a
|
|
// partioned rolling upgrade, this condition will never evaluate to true, leading to
|
|
// error.
|
|
if partition == 0 && sts.Status.CurrentRevision != sts.Status.UpdateRevision {
|
|
c.log("StatefulSet is not ready: %s/%s. currentRevision %s does not yet match updateRevision %s", sts.Namespace, sts.Name, sts.Status.CurrentRevision, sts.Status.UpdateRevision)
|
|
return false
|
|
}
|
|
|
|
c.log("StatefulSet is ready: %s/%s. %d out of %d expected pods are ready", sts.Namespace, sts.Name, sts.Status.ReadyReplicas, replicas)
|
|
return true
|
|
}
|
|
|
|
func getPods(ctx context.Context, client kubernetes.Interface, namespace, selector string) ([]corev1.Pod, error) {
|
|
list, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
|
|
LabelSelector: selector,
|
|
})
|
|
return list.Items, err
|
|
}
|