From b8e40a7c31faf094c950be40a0d08ce554bdc7dd Mon Sep 17 00:00:00 2001 From: Morten Torkildsen Date: Thu, 27 Dec 2018 01:56:00 -0800 Subject: [PATCH] fix(helm): Wait for CRDs to reach established state for crd_install hook Makes sure CRDs installed through the crd_install hook reaches the `established` state before the hook is considered complete. Signed-off-by: Morten Torkildsen --- pkg/kube/client.go | 57 ++++++++ pkg/kube/client_test.go | 150 ++++++++++++++++++++- pkg/tiller/environment/environment.go | 7 + pkg/tiller/environment/environment_test.go | 4 + pkg/tiller/release_server.go | 8 +- pkg/tiller/release_server_test.go | 4 + 6 files changed, 225 insertions(+), 5 deletions(-) diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 66c822ebd..ee643fade 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -37,6 +37,7 @@ import ( batch "k8s.io/api/batch/v1" "k8s.io/api/core/v1" extv1beta1 "k8s.io/api/extensions/v1beta1" + apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -45,6 +46,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/resource" @@ -76,6 +78,12 @@ func New(getter genericclioptions.RESTClientGetter) *Client { if getter == nil { getter = genericclioptions.NewConfigFlags(true) } + + err := apiextv1beta1.AddToScheme(scheme.Scheme) + if err != nil { + panic(err) + } + return &Client{ Factory: cmdutil.NewFactory(getter), Log: nopLogger, @@ -439,6 +447,55 @@ func (c *Client) WatchUntilReady(namespace string, reader io.Reader, timeout int return perform(infos, c.watchTimeout(time.Duration(timeout)*time.Second)) } +// WatchUntilCRDEstablished polls the given CRD until it reaches the established +// state. A CRD needs to reach the established state before CRs can be created. +// +// If a naming conflict condition is found, this function will return an error. +func (c *Client) WaitUntilCRDEstablished(reader io.Reader, timeout time.Duration) error { + infos, err := c.BuildUnstructured(metav1.NamespaceAll, reader) + if err != nil { + return err + } + + return perform(infos, c.pollCRDEstablished(timeout)) +} + +func (c *Client) pollCRDEstablished(t time.Duration) ResourceActorFunc { + return func(info *resource.Info) error { + return c.pollCRDUntilEstablished(t, info) + } +} + +func (c *Client) pollCRDUntilEstablished(timeout time.Duration, info *resource.Info) error { + return wait.PollImmediate(time.Second, timeout, func() (bool, error) { + err := info.Get() + if err != nil { + return false, fmt.Errorf("unable to get CRD: %v", err) + } + + crd := &apiextv1beta1.CustomResourceDefinition{} + err = scheme.Scheme.Convert(info.Object, crd, nil) + if err != nil { + return false, fmt.Errorf("unable to convert to CRD type: %v", err) + } + + for _, cond := range crd.Status.Conditions { + switch cond.Type { + case apiextv1beta1.Established: + if cond.Status == apiextv1beta1.ConditionTrue { + return true, nil + } + case apiextv1beta1.NamesAccepted: + if cond.Status == apiextv1beta1.ConditionFalse { + return false, fmt.Errorf("naming conflict detected for CRD %s", crd.GetName()) + } + } + } + + return false, nil + }) +} + func perform(infos Result, fn ResourceActorFunc) error { if len(infos) == 0 { return ErrNoObjectsVisited diff --git a/pkg/kube/client_test.go b/pkg/kube/client_test.go index a41490f9b..810abdf17 100644 --- a/pkg/kube/client_test.go +++ b/pkg/kube/client_test.go @@ -24,8 +24,10 @@ import ( "sort" "strings" "testing" + "time" - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" + apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -33,15 +35,35 @@ import ( "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest/fake" cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing" + kubectlscheme "k8s.io/kubernetes/pkg/kubectl/scheme" ) +func init() { + err := apiextv1beta1.AddToScheme(scheme.Scheme) + if err != nil { + panic(err) + } + + // Tiller use the scheme from go-client, but the cmdtesting + // package used here is hardcoded to use the scheme from + // kubectl. So for testing, we need to add the CustomResourceDefinition + // type to both schemes. + err = apiextv1beta1.AddToScheme(kubectlscheme.Scheme) + if err != nil { + panic(err) + } +} + var ( - codec = scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...) unstructuredSerializer = resource.UnstructuredPlusDefaultContentConfig().NegotiatedSerializer ) +func getCodec() runtime.Codec { + return scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...) +} + func objBody(obj runtime.Object) io.ReadCloser { - return ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(codec, obj)))) + return ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(getCodec(), obj)))) } func newPod(name string) v1.Pod { @@ -103,7 +125,7 @@ func notFoundBody() *metav1.Status { func newResponse(code int, obj runtime.Object) (*http.Response, error) { header := http.Header{} header.Set("Content-Type", runtime.ContentTypeJSON) - body := ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(codec, obj)))) + body := ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(getCodec(), obj)))) return &http.Response{StatusCode: code, Header: header, Body: body}, nil } @@ -434,6 +456,88 @@ func TestResourceSortOrder(t *testing.T) { } } +func TestWaitUntilCRDEstablished(t *testing.T) { + testCases := map[string]struct { + conditions []apiextv1beta1.CustomResourceDefinitionCondition + returnConditionsAfter int + success bool + }{ + "crd reaches established state after 2 requests": { + conditions: []apiextv1beta1.CustomResourceDefinitionCondition{ + { + Type: apiextv1beta1.Established, + Status: apiextv1beta1.ConditionTrue, + }, + }, + returnConditionsAfter: 2, + success: true, + }, + "crd does not reach established state before timeout": { + conditions: []apiextv1beta1.CustomResourceDefinitionCondition{}, + returnConditionsAfter: 100, + success: false, + }, + "crd name is not accepted": { + conditions: []apiextv1beta1.CustomResourceDefinitionCondition{ + { + Type: apiextv1beta1.NamesAccepted, + Status: apiextv1beta1.ConditionFalse, + }, + }, + returnConditionsAfter: 1, + success: false, + }, + } + + for tn, tc := range testCases { + func(name string) { + c := newTestClient() + defer c.Cleanup() + + crdWithoutConditions := newCrdWithStatus("name", apiextv1beta1.CustomResourceDefinitionStatus{}) + crdWithConditions := newCrdWithStatus("name", apiextv1beta1.CustomResourceDefinitionStatus{ + Conditions: tc.conditions, + }) + + requestCount := 0 + c.TestFactory.UnstructuredClient = &fake.RESTClient{ + GroupVersion: schema.GroupVersion{Version: "v1"}, + NegotiatedSerializer: unstructuredSerializer, + Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { + var crd apiextv1beta1.CustomResourceDefinition + if requestCount < tc.returnConditionsAfter { + crd = crdWithoutConditions + } else { + crd = crdWithConditions + } + requestCount += 1 + return newResponse(200, &crd) + }), + } + + err := c.WaitUntilCRDEstablished(strings.NewReader(crdManifest), 5*time.Second) + if err != nil && tc.success { + t.Errorf("%s: expected no error, but got %v", name, err) + } + if err == nil && !tc.success { + t.Errorf("%s: expected error, but didn't get one", name) + } + }(tn) + } +} + +func newCrdWithStatus(name string, status apiextv1beta1.CustomResourceDefinitionStatus) apiextv1beta1.CustomResourceDefinition { + crd := apiextv1beta1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: metav1.NamespaceDefault, + }, + Spec: apiextv1beta1.CustomResourceDefinitionSpec{}, + Status: status, + } + return crd +} + func TestPerform(t *testing.T) { tests := []struct { name string @@ -701,3 +805,41 @@ spec: ports: - containerPort: 80 ` + +const crdManifest = ` +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + creationTimestamp: null + labels: + controller-tools.k8s.io: "1.0" + name: applications.app.k8s.io +spec: + group: app.k8s.io + names: + kind: Application + plural: applications + scope: Namespaced + validation: + openAPIV3Schema: + properties: + apiVersion: + description: 'Description' + type: string + kind: + description: 'Kind' + type: string + metadata: + type: object + spec: + type: object + status: + type: object + version: v1beta1 +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] +` diff --git a/pkg/tiller/environment/environment.go b/pkg/tiller/environment/environment.go index dca7c0756..6063e9c5f 100644 --- a/pkg/tiller/environment/environment.go +++ b/pkg/tiller/environment/environment.go @@ -141,6 +141,8 @@ type KubeClient interface { // WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase // and returns said phase (PodSucceeded or PodFailed qualify). WaitAndGetCompletedPodPhase(namespace string, reader io.Reader, timeout time.Duration) (v1.PodPhase, error) + + WaitUntilCRDEstablished(reader io.Reader, timeout time.Duration) error } // PrintingKubeClient implements KubeClient, but simply prints the reader to @@ -197,6 +199,11 @@ func (p *PrintingKubeClient) WaitAndGetCompletedPodPhase(namespace string, reade return v1.PodUnknown, err } +func (p *PrintingKubeClient) WaitUntilCRDEstablished(reader io.Reader, timeout time.Duration) error { + _, err := io.Copy(p.Out, reader) + return err +} + // Environment provides the context for executing a client request. // // All services in a context are concurrency safe. diff --git a/pkg/tiller/environment/environment_test.go b/pkg/tiller/environment/environment_test.go index 9e9b549d4..464cee191 100644 --- a/pkg/tiller/environment/environment_test.go +++ b/pkg/tiller/environment/environment_test.go @@ -69,6 +69,10 @@ func (k *mockKubeClient) WaitAndGetCompletedPodStatus(namespace string, reader i return "", nil } +func (k *mockKubeClient) WaitUntilCRDEstablished(reader io.Reader, timeout time.Duration) error { + return nil +} + var _ Engine = &mockEngine{} var _ KubeClient = &mockKubeClient{} var _ KubeClient = &PrintingKubeClient{} diff --git a/pkg/tiller/release_server.go b/pkg/tiller/release_server.go index d32fd82f6..c95b31477 100644 --- a/pkg/tiller/release_server.go +++ b/pkg/tiller/release_server.go @@ -23,6 +23,7 @@ import ( "path" "regexp" "strings" + "time" "github.com/technosophos/moniker" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -399,7 +400,7 @@ func (s *ReleaseServer) execHook(hs []*release.Hook, name, namespace, hook strin b.Reset() b.WriteString(h.Manifest) - // We can't watch CRDs + // We can't watch CRDs, but need to wait until they reach the established state before continuing if hook != hooks.CRDInstall { if err := kubeCli.WatchUntilReady(namespace, b, timeout, false); err != nil { s.Log("warning: Release %s %s %s could not complete: %s", name, hook, h.Path, err) @@ -410,6 +411,11 @@ func (s *ReleaseServer) execHook(hs []*release.Hook, name, namespace, hook strin } return err } + } else { + if err := kubeCli.WaitUntilCRDEstablished(b, time.Duration(timeout)*time.Second); err != nil { + s.Log("warning: Release %s %s %s could not complete: %s", name, hook, h.Path, err) + return err + } } } diff --git a/pkg/tiller/release_server_test.go b/pkg/tiller/release_server_test.go index ecf5ffd17..4087b2f76 100644 --- a/pkg/tiller/release_server_test.go +++ b/pkg/tiller/release_server_test.go @@ -642,6 +642,10 @@ func (kc *mockHooksKubeClient) WaitAndGetCompletedPodPhase(namespace string, rea return v1.PodUnknown, nil } +func (kc *mockHooksKubeClient) WaitUntilCRDEstablished(reader io.Reader, timeout time.Duration) error { + return nil +} + func deletePolicyStub(kubeClient *mockHooksKubeClient) *ReleaseServer { e := environment.New() e.Releases = storage.Init(driver.NewMemory())