feat: use Wait() to wait for CRDs to be ready

This forward-ports the CRD wait logic to Helm 3, and then uses that to wait for CRDs to be registered.

Signed-off-by: Matt Butcher <matt.butcher@microsoft.com>
pull/6332/head
Matt Butcher 6 years ago
parent f28da4210c
commit ebfc8d5a5b
No known key found for this signature in database
GPG Key ID: DCD5F5E5EF32C345

@ -103,24 +103,14 @@ func NewInstall(cfg *Configuration) *Install {
} }
} }
// Run executes the installation func (i *Install) installCRDs(crds []*chart.File) error {
//
// If DryRun is set to true, this will prepare the release, but not install it
func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.Release, error) {
if err := i.availableName(); err != nil {
return nil, err
}
// Pre-install anything in the crd/ directory. We do this before Helm
// contacts the upstream server and builds the capabilities object.
if crds := chrt.CRDs(); !i.ClientOnly && !i.SkipCRDs && len(crds) > 0 {
// We do these one at a time in the order they were read. // We do these one at a time in the order they were read.
for _, obj := range crds { for _, obj := range crds {
// Read in the resources // Read in the resources
res, err := i.cfg.KubeClient.Build(bytes.NewBuffer(obj.Data)) res, err := i.cfg.KubeClient.Build(bytes.NewBuffer(obj.Data))
if err != nil { if err != nil {
// We bail out immediately // We bail out immediately
return nil, errors.Wrapf(err, "failed to install CRD %s", obj.Name) return errors.Wrapf(err, "failed to install CRD %s", obj.Name)
} }
// On dry run, bail here // On dry run, bail here
if i.DryRun { if i.DryRun {
@ -135,7 +125,7 @@ func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.
i.cfg.Log("CRD %s is already present. Skipping.", crdName) i.cfg.Log("CRD %s is already present. Skipping.", crdName)
continue continue
} }
return nil, err return errors.Wrapf(err, "failed to instal CRD %s", obj.Name)
} }
// Invalidate the local cache. // Invalidate the local cache.
@ -147,11 +137,30 @@ func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.
i.cfg.Log("Clearing discovery cache") i.cfg.Log("Clearing discovery cache")
discoveryClient.Invalidate() discoveryClient.Invalidate()
// Give time for the CRD to be recognized. // Give time for the CRD to be recognized.
time.Sleep(5 * time.Second) if err := i.cfg.KubeClient.Wait(res, 60*time.Second); err != nil {
i.cfg.Log("Error waiting for CRD. Continuing blindly. %s", err)
}
// Make sure to force a rebuild of the cache. // Make sure to force a rebuild of the cache.
discoveryClient.ServerGroups() discoveryClient.ServerGroups()
} }
} }
return nil
}
// Run executes the installation
//
// If DryRun is set to true, this will prepare the release, but not install it
func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.Release, error) {
if err := i.availableName(); err != nil {
return nil, err
}
// Pre-install anything in the crd/ directory. We do this before Helm
// contacts the upstream server and builds the capabilities object.
if crds := chrt.CRDs(); !i.ClientOnly && !i.SkipCRDs && len(crds) > 0 {
if err := i.installCRDs(crds); err != nil {
return nil, err
}
} }
if i.ClientOnly { if i.ClientOnly {

@ -27,12 +27,14 @@ import (
batchv1 "k8s.io/api/batch/v1" batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1" extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
) )
@ -46,6 +48,9 @@ type waiter struct {
// until all are ready or a timeout is reached // until all are ready or a timeout is reached
func (w *waiter) waitForResources(created ResourceList) error { func (w *waiter) waitForResources(created ResourceList) error {
w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout)
if err := apiextv1beta1.AddToScheme(scheme.Scheme); err != nil {
w.log("error adding CRDs to schema: %s", err)
}
return wait.Poll(2*time.Second, w.timeout, func() (bool, error) { return wait.Poll(2*time.Second, w.timeout, func() (bool, error) {
for _, v := range created { for _, v := range created {
@ -138,6 +143,17 @@ func (w *waiter) waitForResources(created ResourceList) error {
if !w.daemonSetReady(ds) { if !w.daemonSetReady(ds) {
return false, nil return false, nil
} }
case *apiextv1beta1.CustomResourceDefinition:
if err := v.Get(); err != nil {
return false, err
}
crd := &apiextv1beta1.CustomResourceDefinition{}
if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil {
return false, err
}
if !w.crdReady(*crd) {
return false, nil
}
case *appsv1.StatefulSet, *appsv1beta1.StatefulSet, *appsv1beta2.StatefulSet: case *appsv1.StatefulSet, *appsv1beta1.StatefulSet, *appsv1beta2.StatefulSet:
sts, err := w.c.AppsV1().StatefulSets(v.Namespace).Get(v.Name, metav1.GetOptions{}) sts, err := w.c.AppsV1().StatefulSets(v.Namespace).Get(v.Name, metav1.GetOptions{})
if err != nil { if err != nil {
@ -257,6 +273,26 @@ func (w *waiter) daemonSetReady(ds *appsv1.DaemonSet) bool {
return true return true
} }
func (w *waiter) crdReady(crd apiextv1beta1.CustomResourceDefinition) bool {
for _, cond := range crd.Status.Conditions {
switch cond.Type {
case apiextv1beta1.Established:
if cond.Status == apiextv1beta1.ConditionTrue {
return true
}
case apiextv1beta1.NamesAccepted:
if cond.Status == apiextv1beta1.ConditionFalse {
// This indicates a naming conflict, but it's probably not the
// job of this function to faile because of that. Instead,
// we treat it as a success, since the process should be able to
// continue.
return true
}
}
}
return false
}
func (w *waiter) statefulSetReady(sts *appsv1.StatefulSet) bool { func (w *waiter) statefulSetReady(sts *appsv1.StatefulSet) bool {
// If the update strategy is not a rolling update, there will be nothing to wait for // If the update strategy is not a rolling update, there will be nothing to wait for
if sts.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType { if sts.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {

Loading…
Cancel
Save