diff --git a/pkg/action/install.go b/pkg/action/install.go index 7ca40c88a..06b74a62f 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -455,7 +455,7 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource if len(toBeAdopted) == 0 && len(resources) > 0 { _, err = i.cfg.KubeClient.Create(resources) } else if len(resources) > 0 { - _, err = i.cfg.KubeClient.Update(toBeAdopted, resources, i.Force) + _, err = updateWithTimeoutOrFallback(i.cfg.KubeClient, toBeAdopted, resources, i.Force, i.Timeout) } if err != nil { return rel, err diff --git a/pkg/action/interface_compat.go b/pkg/action/interface_compat.go new file mode 100644 index 000000000..e985629d6 --- /dev/null +++ b/pkg/action/interface_compat.go @@ -0,0 +1,32 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package action + +import ( + "time" + + "helm.sh/helm/v3/pkg/kube" +) + +// updateWithTimeoutOrFallback is a compatibility function to fallback for Helm 3 clients (implementors of kube.Interface only) +// this function can be inlined in Helm 4, when there is no fallback necessary anymore. +func updateWithTimeoutOrFallback(kubeClient kube.Interface, original, target kube.ResourceList, force bool, timeout time.Duration) (*kube.Result, error) { + if kubeClient, ok := kubeClient.(kube.UpdateWithTimeout); ok { + return kubeClient.UpdateWithTimeout(original, target, force, timeout) + } + return kubeClient.Update(original, target, force) +} diff --git a/pkg/action/rollback.go b/pkg/action/rollback.go index b0be17d13..6e3dcd4aa 100644 --- a/pkg/action/rollback.go +++ b/pkg/action/rollback.go @@ -188,8 +188,8 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas if err != nil { return targetRelease, errors.Wrap(err, "unable to set metadata visitor from target release") } - results, err := r.cfg.KubeClient.Update(current, target, r.Force) + results, err := updateWithTimeoutOrFallback(r.cfg.KubeClient, current, target, r.Force, r.Timeout) if err != nil { msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err) r.cfg.Log("warning: %s", msg) diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index a08d68495..88e362685 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -426,7 +426,7 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele u.cfg.Log("upgrade hooks disabled for %s", upgradedRelease.Name) } - results, err := u.cfg.KubeClient.Update(current, target, u.Force) + results, err := updateWithTimeoutOrFallback(u.cfg.KubeClient, current, target, u.Force, u.Timeout) if err != nil { u.cfg.recordRelease(originalRelease) u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) diff --git a/pkg/action/upgrade_test.go b/pkg/action/upgrade_test.go index db8486340..44b421f11 100644 --- a/pkg/action/upgrade_test.go +++ b/pkg/action/upgrade_test.go @@ -178,7 +178,7 @@ func TestUpgradeRelease_Atomic(t *testing.T) { upAction.cfg.Releases.Create(rel) failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) - failer.UpdateError = fmt.Errorf("update fail") + failer.UpdateWithTimeoutError = fmt.Errorf("update fail") upAction.cfg.KubeClient = failer upAction.Atomic = true vals := map[string]interface{}{} diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 4d93c91b9..f6d3432f7 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -25,6 +25,7 @@ import ( "os" "path/filepath" "reflect" + "sort" "strings" "sync" "time" @@ -47,6 +48,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/resource" @@ -59,6 +61,17 @@ import ( cmdutil "k8s.io/kubectl/pkg/cmd/util" ) +var accessor = meta.NewAccessor() + +// Annotation for resource recreation on update. The two cases "conflict" and "invalid" are the two cases where +// kubectl apply --force tries a recreate, so those seem to be the most relevant two cases users usually want +// to control. +const ( + updatePolicyAnnotation = "helm.sh/update-policy" + updatePolicyRecreateOnConflict = "recreate-on-conflict" + updatePolicyRecreateOnInvalid = "recreate-on-invalid" +) + // ErrNoObjectsVisited indicates that during a visit operation, no matching objects were found. var ErrNoObjectsVisited = errors.New("no objects visited") @@ -379,14 +392,20 @@ func (c *Client) BuildTable(reader io.Reader, validate bool) (ResourceList, erro return result, scrubValidationError(err) } -// Update takes the current list of objects and target list of objects and +// Update is a wrapper for UpdateWithTimeout to avoid a breaking API change. +// Deprecated: prefer UpdateWithTimeout, cannot be removed until Helm 4 +func (c *Client) Update(original, target ResourceList, force bool) (*Result, error) { + return c.UpdateWithTimeout(original, target, force, time.Duration(0)) +} + +// UpdateWithTimeout takes the current list of objects and target list of objects and // creates resources that don't already exist, updates resources that have been // modified in the target configuration, and deletes resources from the current // configuration that are not present in the target configuration. If an error // occurs, a Result will still be returned with the error, containing all // resource updates, creations, and deletions that were attempted. These can be // used for cleanup or other logging purposes. -func (c *Client) Update(original, target ResourceList, force bool) (*Result, error) { +func (c *Client) UpdateWithTimeout(original, target ResourceList, force bool, timeout time.Duration) (*Result, error) { updateErrors := []string{} res := &Result{} @@ -421,7 +440,7 @@ func (c *Client) Update(original, target ResourceList, force bool) (*Result, err return errors.Errorf("no %s with the name %q found", kind, info.Name) } - if err := updateResource(c, info, originalInfo.Object, force); err != nil { + if err := updateResource(c, info, originalInfo.Object, force, timeout); err != nil { c.Log("error updating the resource %q:\n\t %v", info.Name, err) updateErrors = append(updateErrors, err.Error()) } @@ -667,7 +686,7 @@ func createPatch(target *resource.Info, current runtime.Object) ([]byte, types.P return patch, types.StrategicMergePatchType, err } -func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, force bool) error { +func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, force bool, timeout time.Duration) error { var ( obj runtime.Object helper = resource.NewHelper(target.Client, target.Mapping).WithFieldManager(getManagedFieldsManager()) @@ -677,9 +696,11 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, // if --force is applied, attempt to replace the existing resource with the new object. if force { var err error - obj, err = helper.Replace(target.Namespace, target.Name, true, target.Object) + obj, err = withUpdatePolicy(c, helper, target, timeout, func() (runtime.Object, error) { + return helper.Replace(target.Namespace, target.Name, true, target.Object) + }) if err != nil { - return errors.Wrap(err, "failed to replace object") + return errors.Wrapf(err, "failed to replace %q with kind %s", target.Name, kind) } c.Log("Replaced %q with kind %s for kind %s", target.Name, currentObj.GetObjectKind().GroupVersionKind().Kind, kind) } else { @@ -699,7 +720,9 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, } // send patch to server c.Log("Patch %s %q in namespace %s", kind, target.Name, target.Namespace) - obj, err = helper.Patch(target.Namespace, target.Name, patchType, patch, nil) + obj, err = withUpdatePolicy(c, helper, target, timeout, func() (runtime.Object, error) { + return helper.Patch(target.Namespace, target.Name, patchType, patch, nil) + }) if err != nil { return errors.Wrapf(err, "cannot patch %q with kind %s", target.Name, kind) } @@ -709,6 +732,65 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, return nil } +// withUpdatePolicy applies the update to an object by executing applyUpdate and falls back to recreating the object +// if allowed by the updatePolicyAnnotation +func withUpdatePolicy(c *Client, helper *resource.Helper, target *resource.Info, timeout time.Duration, applyUpdate func() (runtime.Object, error)) (runtime.Object, error) { + obj, updateErr := applyUpdate() + if updateErr == nil { + return obj, updateErr + } + + annos, err := accessor.Annotations(target.Object) + if err != nil { + return nil, err + } + + updatePolicies := strings.Split(strings.ReplaceAll(annos[updatePolicyAnnotation], " ", ""), ",") + sort.Strings(updatePolicies) + + hasUpdatePolicy := func(updatePolicyValue string) bool { + return sort.SearchStrings(updatePolicies, updatePolicyValue) < len(updatePolicies) + } + + recreate := false + switch reason := apierrors.ReasonForError(updateErr); reason { + case metav1.StatusReasonConflict: + recreate = hasUpdatePolicy(updatePolicyRecreateOnConflict) + case metav1.StatusReasonInvalid: + recreate = hasUpdatePolicy(updatePolicyRecreateOnInvalid) + } + + if recreate { + c.Log("Update of %q of kind %s failed, trying to replace resource according to %s: %s", target.Name, + target.Mapping.GroupVersionKind.Kind, updatePolicyAnnotation, annos[updatePolicyAnnotation]) + err = c.deleteAndCreate(helper, target, timeout) + // target is already refreshed so don't return an object. + return nil, err + } + return obj, updateErr +} + +// deleteAndCreate deletes an object, polls until successfully deleted (or timeout is exceeded) and recreates it afterwards. +func (c *Client) deleteAndCreate(helper *resource.Helper, target *resource.Info, timeout time.Duration) error { + if err := deleteResource(target, metav1.DeletePropagationBackground); err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + if err := wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { + if _, err := helper.Get(target.Namespace, target.Name); !apierrors.IsNotFound(err) { + return false, err + } + return true, nil + }); err != nil { + return err + } + + return createResource(target) +} + func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error { kind := info.Mapping.GroupVersionKind.Kind switch kind { diff --git a/pkg/kube/client_test.go b/pkg/kube/client_test.go index 4ceb5f4b3..22cce2d27 100644 --- a/pkg/kube/client_test.go +++ b/pkg/kube/client_test.go @@ -22,6 +22,9 @@ import ( "net/http" "strings" "testing" + "time" + + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -83,6 +86,26 @@ func notFoundBody() *metav1.Status { } } +func unprocessableEntityBody() *metav1.Status { + return &metav1.Status{ + Code: http.StatusUnprocessableEntity, + Status: metav1.StatusFailure, + Reason: metav1.StatusReasonInvalid, + Message: "cannot change", + Details: &metav1.StatusDetails{}, + } +} + +func conflictEntityBody() *metav1.Status { + return &metav1.Status{ + Code: http.StatusConflict, + Status: metav1.StatusFailure, + Reason: metav1.StatusReasonConflict, + Message: "conflict", + Details: &metav1.StatusDetails{}, + } +} + func newResponse(code int, obj runtime.Object) (*http.Response, error) { header := http.Header{} header.Set("Content-Type", runtime.ContentTypeJSON) @@ -275,7 +298,7 @@ func TestUpdate(t *testing.T) { t.Fatal(err) } - result, err := c.Update(first, second, false) + result, err := c.UpdateWithTimeout(first, second, false, 0) if err != nil { t.Fatal(err) } @@ -324,6 +347,230 @@ func TestUpdate(t *testing.T) { } } +func TestClient_UpdateWithPolicy(t *testing.T) { + + is := assert.New(t) + + // setup two pods with one diff that should be patched + original := newPodList("starfish") + target := newPodList("starfish") + target.Items[0].Spec.Containers[0].Ports = []v1.ContainerPort{{Name: "https", ContainerPort: 443}} + + // make sure there is a patchable difference + is.NotEqual(original.Items[0].Spec.Containers[0].Ports, target.Items[0].Spec.Containers[0].Ports) + + okResponse := func() *http.Response { + res, _ := newResponse(http.StatusOK, &original.Items[0]) + return res + } + notFoundResponse := func() *http.Response { + res, _ := newResponse(http.StatusNotFound, notFoundBody()) + return res + } + unprocessableEntityResponse := func() *http.Response { + res, _ := newResponse(http.StatusUnprocessableEntity, unprocessableEntityBody()) + return res + } + conflictEntityResponse := func() *http.Response { + res, _ := newResponse(http.StatusConflict, conflictEntityBody()) + return res + } + + tests := []struct { + name string + force bool + updatePolicy string + responses []*http.Response + finalResponseFactory func() *http.Response + expectedActions []string + fail bool + }{ + { + name: "update should delete and recreate when invalid on PATCH", + force: false, + updatePolicy: updatePolicyRecreateOnInvalid, + responses: []*http.Response{ + okResponse(), // GET + okResponse(), // GET + unprocessableEntityResponse(), // PATCH + okResponse(), // DELETE + notFoundResponse(), // GET + okResponse(), // POST + }, + expectedActions: []string{ + "GET", + "GET", + "PATCH", + "DELETE", + "GET", + "POST", + }, + }, + { + name: "update should delete and recreate when conflict on PATCH", + force: false, + updatePolicy: updatePolicyRecreateOnConflict, + responses: []*http.Response{ + okResponse(), // GET + okResponse(), // GET + conflictEntityResponse(), // PATCH + okResponse(), // DELETE + notFoundResponse(), // GET + okResponse(), // POST + }, + expectedActions: []string{ + "GET", + "GET", + "PATCH", + "DELETE", + "GET", + "POST", + }, + }, + { + name: "update should delete and recreate when invalid on PUT", + force: true, + updatePolicy: updatePolicyRecreateOnInvalid, + responses: []*http.Response{ + okResponse(), // GET + okResponse(), // GET + unprocessableEntityResponse(), // PUT + okResponse(), // DELETE + notFoundResponse(), // GET + okResponse(), // POST + }, + expectedActions: []string{ + "GET", + "GET", + "PUT", + "DELETE", + "GET", + "POST", + }, + }, + { + name: "update should delete and recreate when conflict on PUT", + force: true, + updatePolicy: updatePolicyRecreateOnConflict, + responses: []*http.Response{ + okResponse(), // GET + okResponse(), // GET + conflictEntityResponse(), // PUT + okResponse(), // DELETE + notFoundResponse(), // GET + okResponse(), // POST + }, + expectedActions: []string{ + "GET", + "GET", + "PUT", + "DELETE", + "GET", + "POST", + }, + }, + { + name: "update should fail when timing out after DELETE", + force: false, + updatePolicy: updatePolicyRecreateOnInvalid, + responses: []*http.Response{ + okResponse(), // GET + okResponse(), // GET + unprocessableEntityResponse(), // PATCH + okResponse(), // DELETE + }, + // infinitely return OK on get, implying the server does not delete the resource within the given timeout + finalResponseFactory: okResponse, // GET + expectedActions: []string{ + "GET", + "GET", + "PATCH", + "DELETE", + }, + fail: true, + }, + { + name: "update should fail when no update policy specified", + force: false, + updatePolicy: "", + responses: []*http.Response{ + okResponse(), // GET + okResponse(), // GET + unprocessableEntityResponse(), // PATCH + }, + expectedActions: []string{ + "GET", + "GET", + "PATCH", + }, + fail: true, + }, + { + name: "update should fail when no update policy does not match error", + force: false, + updatePolicy: updatePolicyRecreateOnConflict, + responses: []*http.Response{ + okResponse(), // GET + okResponse(), // GET + unprocessableEntityResponse(), // PATCH + }, + expectedActions: []string{ + "GET", + "GET", + "PATCH", + }, + fail: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var actions []string + c := newTestClient(t) + c.Factory.(*cmdtesting.TestFactory).UnstructuredClient = &fake.RESTClient{ + NegotiatedSerializer: unstructuredSerializer, + Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { + p, m := req.URL.Path, req.Method + t.Logf("got request %s %s", p, m) + if len(tt.responses) == 0 { + if tt.finalResponseFactory != nil { + return tt.finalResponseFactory(), nil + } + t.Fatalf("unexpected request: %s %s", req.Method, req.URL.Path) + return nil, nil + } + // final responses are not added to actions, since it is unpredictable how often the final + // response will be returned on polling + actions = append(actions, m) + var response *http.Response + response, tt.responses = tt.responses[0], tt.responses[1:] + return response, nil + }), + } + + target.Items[0].Annotations = map[string]string{updatePolicyAnnotation: tt.updatePolicy} + + originalResourceList, err := c.Build(objBody(&original), false) + if err != nil { + t.Fatal(err) + } + targetResourceList, err := c.Build(objBody(&target), false) + if err != nil { + t.Fatal(err) + } + + // timeout should be larger than one second, to see actual polling with one second poll interval + _, err = c.UpdateWithTimeout(originalResourceList, targetResourceList, tt.force, 2*time.Second) + if (tt.fail && err == nil) || (!tt.fail && err != nil) { + t.Fatal(err) + } + t.Log(tt.expectedActions) + t.Log(actions) + is.ElementsMatch(tt.expectedActions, actions) + }) + } +} + func TestBuild(t *testing.T) { tests := []struct { name string diff --git a/pkg/kube/fake/fake.go b/pkg/kube/fake/fake.go index 267020d57..0f5b58808 100644 --- a/pkg/kube/fake/fake.go +++ b/pkg/kube/fake/fake.go @@ -41,6 +41,7 @@ type FailingKubeClient struct { DeleteWithPropagationError error WatchUntilReadyError error UpdateError error + UpdateWithTimeoutError error BuildError error BuildTableError error BuildDummy bool @@ -114,6 +115,13 @@ func (f *FailingKubeClient) Update(r, modified kube.ResourceList, ignoreMe bool) return f.PrintingKubeClient.Update(r, modified, ignoreMe) } +func (f *FailingKubeClient) UpdateWithTimeout(r, modified kube.ResourceList, ignoreMe bool, timeout time.Duration) (*kube.Result, error) { + if f.UpdateWithTimeoutError != nil { + return &kube.Result{}, f.UpdateWithTimeoutError + } + return f.PrintingKubeClient.UpdateWithTimeout(r, modified, ignoreMe, 0) +} + // Build returns the configured error if set or prints func (f *FailingKubeClient) Build(r io.Reader, _ bool) (kube.ResourceList, error) { if f.BuildError != nil { diff --git a/pkg/kube/fake/printer.go b/pkg/kube/fake/printer.go index cc2c84b40..d082203ef 100644 --- a/pkg/kube/fake/printer.go +++ b/pkg/kube/fake/printer.go @@ -90,7 +90,11 @@ func (p *PrintingKubeClient) WatchUntilReady(resources kube.ResourceList, _ time } // Update implements KubeClient Update. -func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ bool) (*kube.Result, error) { +func (p *PrintingKubeClient) Update(original, target kube.ResourceList, force bool) (*kube.Result, error) { + return p.UpdateWithTimeout(original, target, force, 0) +} + +func (p *PrintingKubeClient) UpdateWithTimeout(_, modified kube.ResourceList, _ bool, _ time.Duration) (*kube.Result, error) { _, err := io.Copy(p.Out, bufferize(modified)) if err != nil { return nil, err diff --git a/pkg/kube/interface.go b/pkg/kube/interface.go index ce42ed950..fe66040ea 100644 --- a/pkg/kube/interface.go +++ b/pkg/kube/interface.go @@ -110,7 +110,17 @@ type InterfaceResources interface { BuildTable(reader io.Reader, validate bool) (ResourceList, error) } +// UpdateWithTimeout is introduced to avoid breaking backwards compatibility for Interface and InterfaceExt implementers. +// It extends Interface.Update by a timeout, which is used to wait until a resource is deleted in case the a replacement +// is necessary due to the "helm.sh/update-policy". +// +// TODO Helm 4: Remove UpdateWithTimeout and integrate its method(s) into the Interface. +type UpdateWithTimeout interface { + UpdateWithTimeout(original, target ResourceList, force bool, timeout time.Duration) (*Result, error) +} + var _ Interface = (*Client)(nil) var _ InterfaceExt = (*Client)(nil) var _ InterfaceDeletionPropagation = (*Client)(nil) var _ InterfaceResources = (*Client)(nil) +var _ UpdateWithTimeout = (*Client)(nil)