From 0b58239b51bd86dd862dac8765d27a607da7edfb Mon Sep 17 00:00:00 2001 From: Daniel Strobusch <1847260+dastrobu@users.noreply.github.com> Date: Wed, 23 Dec 2020 11:46:26 +0100 Subject: [PATCH] feat(helm): add new update-policy annotation to recreate resources Add annotation handling of new annotation helm.sh/update-policy with values recreate-on-conflict and recreate-on-invalid values to better deal with the following cases. The server responds with 'invalid' e.g. if a certain resource contains an immutable field, which cannot be patched or updated by a PUT. In theses cases it can be helpful to delete the resource and recreate it. Examples for theses cases are: * roleRef in RoleBinding * spec.clusterIP in Service * parameters in StorageClass If a chart developer has made sure that recreating a resource is safe, the annotation can be added to a resource template. Alternatively a user may add the annotation manually to an existing resource to allow Helm to recreate a certain resource. Adding the new annotation in general by default or changing the default behaviour of Helm is not a good idea, since deletion and recreation of certain resources can lead to an unwanted or undefined state as described in detail in https://github.com/helm/helm/pull/7431#issuecomment-666735299: "[...] selectively deleting/re-creating volume-backed resources (Secrets, Services, ConfigMaps, PVCs) without touching the Deployment. If a Persistent Volume Claim is deleted and re-created, there is a risk that an application (Deployment) consuming that Persistent Volume Claim will not receive the new volume's mount point. The same can be said for Secrets, Configmaps, Services, and other resources which are deployed and relied upon by another resource. Environment variables populated in a Deployment from a Service won't be updated until the Pods are destroyed." Closes #7082. Signed-off-by: Daniel Strobusch <1847260+dastrobu@users.noreply.github.com> --- pkg/action/install.go | 2 +- pkg/action/interface_compat.go | 32 +++++ pkg/action/rollback.go | 2 +- pkg/action/upgrade.go | 2 +- pkg/action/upgrade_test.go | 2 +- pkg/kube/client.go | 96 ++++++++++++- pkg/kube/client_test.go | 249 ++++++++++++++++++++++++++++++++- pkg/kube/fake/fake.go | 8 ++ pkg/kube/fake/printer.go | 6 +- pkg/kube/interface.go | 10 ++ 10 files changed, 396 insertions(+), 13 deletions(-) create mode 100644 pkg/action/interface_compat.go diff --git a/pkg/action/install.go b/pkg/action/install.go index e3538a4f5..9927ab9c0 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -439,7 +439,7 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource if len(toBeAdopted) == 0 && len(resources) > 0 { _, err = i.cfg.KubeClient.Create(resources) } else if len(resources) > 0 { - _, err = i.cfg.KubeClient.Update(toBeAdopted, resources, i.Force) + _, err = updateWithTimeoutOrFallback(i.cfg.KubeClient, toBeAdopted, resources, i.Force, i.Timeout) } if err != nil { return rel, err diff --git a/pkg/action/interface_compat.go b/pkg/action/interface_compat.go new file mode 100644 index 000000000..e985629d6 --- /dev/null +++ b/pkg/action/interface_compat.go @@ -0,0 +1,32 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package action + +import ( + "time" + + "helm.sh/helm/v3/pkg/kube" +) + +// updateWithTimeoutOrFallback is a compatibility function to fallback for Helm 3 clients (implementors of kube.Interface only) +// this function can be inlined in Helm 4, when there is no fallback necessary anymore. +func updateWithTimeoutOrFallback(kubeClient kube.Interface, original, target kube.ResourceList, force bool, timeout time.Duration) (*kube.Result, error) { + if kubeClient, ok := kubeClient.(kube.UpdateWithTimeout); ok { + return kubeClient.UpdateWithTimeout(original, target, force, timeout) + } + return kubeClient.Update(original, target, force) +} diff --git a/pkg/action/rollback.go b/pkg/action/rollback.go index b0be17d13..6e3dcd4aa 100644 --- a/pkg/action/rollback.go +++ b/pkg/action/rollback.go @@ -188,8 +188,8 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas if err != nil { return targetRelease, errors.Wrap(err, "unable to set metadata visitor from target release") } - results, err := r.cfg.KubeClient.Update(current, target, r.Force) + results, err := updateWithTimeoutOrFallback(r.cfg.KubeClient, current, target, r.Force, r.Timeout) if err != nil { msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err) r.cfg.Log("warning: %s", msg) diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index ffb7538a6..a5400be34 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -407,7 +407,7 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele u.cfg.Log("upgrade hooks disabled for %s", upgradedRelease.Name) } - results, err := u.cfg.KubeClient.Update(current, target, u.Force) + results, err := updateWithTimeoutOrFallback(u.cfg.KubeClient, current, target, u.Force, u.Timeout) if err != nil { u.cfg.recordRelease(originalRelease) u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) diff --git a/pkg/action/upgrade_test.go b/pkg/action/upgrade_test.go index e259605ce..289c623e5 100644 --- a/pkg/action/upgrade_test.go +++ b/pkg/action/upgrade_test.go @@ -178,7 +178,7 @@ func TestUpgradeRelease_Atomic(t *testing.T) { upAction.cfg.Releases.Create(rel) failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) - failer.UpdateError = fmt.Errorf("update fail") + failer.UpdateWithTimeoutError = fmt.Errorf("update fail") upAction.cfg.KubeClient = failer upAction.Atomic = true vals := map[string]interface{}{} diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 0772678d1..8fa3a6a1d 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -25,6 +25,7 @@ import ( "os" "path/filepath" "reflect" + "sort" "strings" "sync" "time" @@ -47,6 +48,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/resource" @@ -58,6 +60,17 @@ import ( cmdutil "k8s.io/kubectl/pkg/cmd/util" ) +var accessor = meta.NewAccessor() + +// Annotation for resource recreation on update. The two cases "conflict" and "invalid" are the two cases where +// kubectl apply --force tries a recreate, so those seem to be the most relevant two cases users usually want +// to control. +const ( + updatePolicyAnnotation = "helm.sh/update-policy" + updatePolicyRecreateOnConflict = "recreate-on-conflict" + updatePolicyRecreateOnInvalid = "recreate-on-invalid" +) + // ErrNoObjectsVisited indicates that during a visit operation, no matching objects were found. var ErrNoObjectsVisited = errors.New("no objects visited") @@ -379,14 +392,20 @@ func (c *Client) BuildTable(reader io.Reader, validate bool) (ResourceList, erro return result, scrubValidationError(err) } -// Update takes the current list of objects and target list of objects and +// Update is a wrapper for UpdateWithTimeout to avoid a breaking API change. +// Deprecated: prefer UpdateWithTimeout, cannot be removed until Helm 4 +func (c *Client) Update(original, target ResourceList, force bool) (*Result, error) { + return c.UpdateWithTimeout(original, target, force, time.Duration(0)) +} + +// UpdateWithTimeout takes the current list of objects and target list of objects and // creates resources that don't already exist, updates resources that have been // modified in the target configuration, and deletes resources from the current // configuration that are not present in the target configuration. If an error // occurs, a Result will still be returned with the error, containing all // resource updates, creations, and deletions that were attempted. These can be // used for cleanup or other logging purposes. -func (c *Client) Update(original, target ResourceList, force bool) (*Result, error) { +func (c *Client) UpdateWithTimeout(original, target ResourceList, force bool, timeout time.Duration) (*Result, error) { updateErrors := []string{} res := &Result{} @@ -421,7 +440,7 @@ func (c *Client) Update(original, target ResourceList, force bool) (*Result, err return errors.Errorf("no %s with the name %q found", kind, info.Name) } - if err := updateResource(c, info, originalInfo.Object, force); err != nil { + if err := updateResource(c, info, originalInfo.Object, force, timeout); err != nil { c.Log("error updating the resource %q:\n\t %v", info.Name, err) updateErrors = append(updateErrors, err.Error()) } @@ -659,7 +678,7 @@ func createPatch(target *resource.Info, current runtime.Object) ([]byte, types.P return patch, types.StrategicMergePatchType, err } -func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, force bool) error { +func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, force bool, timeout time.Duration) error { var ( obj runtime.Object helper = resource.NewHelper(target.Client, target.Mapping).WithFieldManager(getManagedFieldsManager()) @@ -669,9 +688,11 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, // if --force is applied, attempt to replace the existing resource with the new object. if force { var err error - obj, err = helper.Replace(target.Namespace, target.Name, true, target.Object) + obj, err = withUpdatePolicy(c, helper, target, timeout, func() (runtime.Object, error) { + return helper.Replace(target.Namespace, target.Name, true, target.Object) + }) if err != nil { - return errors.Wrap(err, "failed to replace object") + return errors.Wrapf(err, "failed to replace %q with kind %s", target.Name, kind) } c.Log("Replaced %q with kind %s for kind %s", target.Name, currentObj.GetObjectKind().GroupVersionKind().Kind, kind) } else { @@ -691,7 +712,9 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, } // send patch to server c.Log("Patch %s %q in namespace %s", kind, target.Name, target.Namespace) - obj, err = helper.Patch(target.Namespace, target.Name, patchType, patch, nil) + obj, err = withUpdatePolicy(c, helper, target, timeout, func() (runtime.Object, error) { + return helper.Patch(target.Namespace, target.Name, patchType, patch, nil) + }) if err != nil { return errors.Wrapf(err, "cannot patch %q with kind %s", target.Name, kind) } @@ -701,6 +724,65 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, return nil } +// withUpdatePolicy applies the update to an object by executing applyUpdate and falls back to recreating the object +// if allowed by the updatePolicyAnnotation +func withUpdatePolicy(c *Client, helper *resource.Helper, target *resource.Info, timeout time.Duration, applyUpdate func() (runtime.Object, error)) (runtime.Object, error) { + obj, updateErr := applyUpdate() + if updateErr == nil { + return obj, updateErr + } + + annos, err := accessor.Annotations(target.Object) + if err != nil { + return nil, err + } + + updatePolicies := strings.Split(strings.ReplaceAll(annos[updatePolicyAnnotation], " ", ""), ",") + sort.Strings(updatePolicies) + + hasUpdatePolicy := func(updatePolicyValue string) bool { + return sort.SearchStrings(updatePolicies, updatePolicyValue) < len(updatePolicies) + } + + recreate := false + switch reason := apierrors.ReasonForError(updateErr); reason { + case metav1.StatusReasonConflict: + recreate = hasUpdatePolicy(updatePolicyRecreateOnConflict) + case metav1.StatusReasonInvalid: + recreate = hasUpdatePolicy(updatePolicyRecreateOnInvalid) + } + + if recreate { + c.Log("Update of %q of kind %s failed, trying to replace resource according to %s: %s", target.Name, + target.Mapping.GroupVersionKind.Kind, updatePolicyAnnotation, annos[updatePolicyAnnotation]) + err = c.deleteAndCreate(helper, target, timeout) + // target is already refreshed so don't return an object. + return nil, err + } + return obj, updateErr +} + +// deleteAndCreate deletes an object, polls until successfully deleted (or timeout is exceeded) and recreates it afterwards. +func (c *Client) deleteAndCreate(helper *resource.Helper, target *resource.Info, timeout time.Duration) error { + if err := deleteResource(target, metav1.DeletePropagationBackground); err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + if err := wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { + if _, err := helper.Get(target.Namespace, target.Name); !apierrors.IsNotFound(err) { + return false, err + } + return true, nil + }); err != nil { + return err + } + + return createResource(target) +} + func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error { kind := info.Mapping.GroupVersionKind.Kind switch kind { diff --git a/pkg/kube/client_test.go b/pkg/kube/client_test.go index 55aa5d8ed..25cc9abb0 100644 --- a/pkg/kube/client_test.go +++ b/pkg/kube/client_test.go @@ -22,6 +22,9 @@ import ( "net/http" "strings" "testing" + "time" + + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -83,6 +86,26 @@ func notFoundBody() *metav1.Status { } } +func unprocessableEntityBody() *metav1.Status { + return &metav1.Status{ + Code: http.StatusUnprocessableEntity, + Status: metav1.StatusFailure, + Reason: metav1.StatusReasonInvalid, + Message: "cannot change", + Details: &metav1.StatusDetails{}, + } +} + +func conflictEntityBody() *metav1.Status { + return &metav1.Status{ + Code: http.StatusConflict, + Status: metav1.StatusFailure, + Reason: metav1.StatusReasonConflict, + Message: "conflict", + Details: &metav1.StatusDetails{}, + } +} + func newResponse(code int, obj runtime.Object) (*http.Response, error) { header := http.Header{} header.Set("Content-Type", runtime.ContentTypeJSON) @@ -166,7 +189,7 @@ func TestUpdate(t *testing.T) { t.Fatal(err) } - result, err := c.Update(first, second, false) + result, err := c.UpdateWithTimeout(first, second, false, 0) if err != nil { t.Fatal(err) } @@ -213,6 +236,230 @@ func TestUpdate(t *testing.T) { } } +func TestClient_UpdateWithPolicy(t *testing.T) { + + is := assert.New(t) + + // setup two pods with one diff that should be patched + original := newPodList("starfish") + target := newPodList("starfish") + target.Items[0].Spec.Containers[0].Ports = []v1.ContainerPort{{Name: "https", ContainerPort: 443}} + + // make sure there is a patchable difference + is.NotEqual(original.Items[0].Spec.Containers[0].Ports, target.Items[0].Spec.Containers[0].Ports) + + okResponse := func() *http.Response { + res, _ := newResponse(http.StatusOK, &original.Items[0]) + return res + } + notFoundResponse := func() *http.Response { + res, _ := newResponse(http.StatusNotFound, notFoundBody()) + return res + } + unprocessableEntityResponse := func() *http.Response { + res, _ := newResponse(http.StatusUnprocessableEntity, unprocessableEntityBody()) + return res + } + conflictEntityResponse := func() *http.Response { + res, _ := newResponse(http.StatusConflict, conflictEntityBody()) + return res + } + + tests := []struct { + name string + force bool + updatePolicy string + responses []*http.Response + finalResponseFactory func() *http.Response + expectedActions []string + fail bool + }{ + { + name: "update should delete and recreate when invalid on PATCH", + force: false, + updatePolicy: updatePolicyRecreateOnInvalid, + responses: []*http.Response{ + okResponse(), // GET + okResponse(), // GET + unprocessableEntityResponse(), // PATCH + okResponse(), // DELETE + notFoundResponse(), // GET + okResponse(), // POST + }, + expectedActions: []string{ + "GET", + "GET", + "PATCH", + "DELETE", + "GET", + "POST", + }, + }, + { + name: "update should delete and recreate when conflict on PATCH", + force: false, + updatePolicy: updatePolicyRecreateOnConflict, + responses: []*http.Response{ + okResponse(), // GET + okResponse(), // GET + conflictEntityResponse(), // PATCH + okResponse(), // DELETE + notFoundResponse(), // GET + okResponse(), // POST + }, + expectedActions: []string{ + "GET", + "GET", + "PATCH", + "DELETE", + "GET", + "POST", + }, + }, + { + name: "update should delete and recreate when invalid on PUT", + force: true, + updatePolicy: updatePolicyRecreateOnInvalid, + responses: []*http.Response{ + okResponse(), // GET + okResponse(), // GET + unprocessableEntityResponse(), // PUT + okResponse(), // DELETE + notFoundResponse(), // GET + okResponse(), // POST + }, + expectedActions: []string{ + "GET", + "GET", + "PUT", + "DELETE", + "GET", + "POST", + }, + }, + { + name: "update should delete and recreate when conflict on PUT", + force: true, + updatePolicy: updatePolicyRecreateOnConflict, + responses: []*http.Response{ + okResponse(), // GET + okResponse(), // GET + conflictEntityResponse(), // PUT + okResponse(), // DELETE + notFoundResponse(), // GET + okResponse(), // POST + }, + expectedActions: []string{ + "GET", + "GET", + "PUT", + "DELETE", + "GET", + "POST", + }, + }, + { + name: "update should fail when timing out after DELETE", + force: false, + updatePolicy: updatePolicyRecreateOnInvalid, + responses: []*http.Response{ + okResponse(), // GET + okResponse(), // GET + unprocessableEntityResponse(), // PATCH + okResponse(), // DELETE + }, + // infinitely return OK on get, implying the server does not delete the resource within the given timeout + finalResponseFactory: okResponse, // GET + expectedActions: []string{ + "GET", + "GET", + "PATCH", + "DELETE", + }, + fail: true, + }, + { + name: "update should fail when no update policy specified", + force: false, + updatePolicy: "", + responses: []*http.Response{ + okResponse(), // GET + okResponse(), // GET + unprocessableEntityResponse(), // PATCH + }, + expectedActions: []string{ + "GET", + "GET", + "PATCH", + }, + fail: true, + }, + { + name: "update should fail when no update policy does not match error", + force: false, + updatePolicy: updatePolicyRecreateOnConflict, + responses: []*http.Response{ + okResponse(), // GET + okResponse(), // GET + unprocessableEntityResponse(), // PATCH + }, + expectedActions: []string{ + "GET", + "GET", + "PATCH", + }, + fail: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var actions []string + c := newTestClient(t) + c.Factory.(*cmdtesting.TestFactory).UnstructuredClient = &fake.RESTClient{ + NegotiatedSerializer: unstructuredSerializer, + Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { + p, m := req.URL.Path, req.Method + t.Logf("got request %s %s", p, m) + if len(tt.responses) == 0 { + if tt.finalResponseFactory != nil { + return tt.finalResponseFactory(), nil + } + t.Fatalf("unexpected request: %s %s", req.Method, req.URL.Path) + return nil, nil + } + // final responses are not added to actions, since it is unpredictable how often the final + // response will be returned on polling + actions = append(actions, m) + var response *http.Response + response, tt.responses = tt.responses[0], tt.responses[1:] + return response, nil + }), + } + + target.Items[0].Annotations = map[string]string{updatePolicyAnnotation: tt.updatePolicy} + + originalResourceList, err := c.Build(objBody(&original), false) + if err != nil { + t.Fatal(err) + } + targetResourceList, err := c.Build(objBody(&target), false) + if err != nil { + t.Fatal(err) + } + + // timeout should be larger than one second, to see actual polling with one second poll interval + _, err = c.UpdateWithTimeout(originalResourceList, targetResourceList, tt.force, 2*time.Second) + if (tt.fail && err == nil) || (!tt.fail && err != nil) { + t.Fatal(err) + } + t.Log(tt.expectedActions) + t.Log(actions) + is.ElementsMatch(tt.expectedActions, actions) + }) + } +} + func TestBuild(t *testing.T) { tests := []struct { name string diff --git a/pkg/kube/fake/fake.go b/pkg/kube/fake/fake.go index 267020d57..0f5b58808 100644 --- a/pkg/kube/fake/fake.go +++ b/pkg/kube/fake/fake.go @@ -41,6 +41,7 @@ type FailingKubeClient struct { DeleteWithPropagationError error WatchUntilReadyError error UpdateError error + UpdateWithTimeoutError error BuildError error BuildTableError error BuildDummy bool @@ -114,6 +115,13 @@ func (f *FailingKubeClient) Update(r, modified kube.ResourceList, ignoreMe bool) return f.PrintingKubeClient.Update(r, modified, ignoreMe) } +func (f *FailingKubeClient) UpdateWithTimeout(r, modified kube.ResourceList, ignoreMe bool, timeout time.Duration) (*kube.Result, error) { + if f.UpdateWithTimeoutError != nil { + return &kube.Result{}, f.UpdateWithTimeoutError + } + return f.PrintingKubeClient.UpdateWithTimeout(r, modified, ignoreMe, 0) +} + // Build returns the configured error if set or prints func (f *FailingKubeClient) Build(r io.Reader, _ bool) (kube.ResourceList, error) { if f.BuildError != nil { diff --git a/pkg/kube/fake/printer.go b/pkg/kube/fake/printer.go index e6c4b6207..3bfd276fa 100644 --- a/pkg/kube/fake/printer.go +++ b/pkg/kube/fake/printer.go @@ -90,7 +90,11 @@ func (p *PrintingKubeClient) WatchUntilReady(resources kube.ResourceList, _ time } // Update implements KubeClient Update. -func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ bool) (*kube.Result, error) { +func (p *PrintingKubeClient) Update(original, target kube.ResourceList, force bool) (*kube.Result, error) { + return p.UpdateWithTimeout(original, target, force, 0) +} + +func (p *PrintingKubeClient) UpdateWithTimeout(_, modified kube.ResourceList, _ bool, _ time.Duration) (*kube.Result, error) { _, err := io.Copy(p.Out, bufferize(modified)) if err != nil { return nil, err diff --git a/pkg/kube/interface.go b/pkg/kube/interface.go index ce42ed950..fe66040ea 100644 --- a/pkg/kube/interface.go +++ b/pkg/kube/interface.go @@ -110,7 +110,17 @@ type InterfaceResources interface { BuildTable(reader io.Reader, validate bool) (ResourceList, error) } +// UpdateWithTimeout is introduced to avoid breaking backwards compatibility for Interface and InterfaceExt implementers. +// It extends Interface.Update by a timeout, which is used to wait until a resource is deleted in case the a replacement +// is necessary due to the "helm.sh/update-policy". +// +// TODO Helm 4: Remove UpdateWithTimeout and integrate its method(s) into the Interface. +type UpdateWithTimeout interface { + UpdateWithTimeout(original, target ResourceList, force bool, timeout time.Duration) (*Result, error) +} + var _ Interface = (*Client)(nil) var _ InterfaceExt = (*Client)(nil) var _ InterfaceDeletionPropagation = (*Client)(nil) var _ InterfaceResources = (*Client)(nil) +var _ UpdateWithTimeout = (*Client)(nil)