feat(helm): add new update-policy annotation to recreate resources

Add annotation handling of new annotation helm.sh/update-policy with values recreate-on-conflict and recreate-on-invalid values to better deal with the following cases.
The server responds with 'invalid' e.g. if a certain resource contains an immutable field, which cannot be patched or updated by a PUT. In theses cases it can be helpful to delete the resource and recreate it. Examples for theses cases are:

 * roleRef in RoleBinding
 * spec.clusterIP in Service
 * parameters in StorageClass

If a chart developer has made sure that recreating a resource is safe, the annotation can be added to a resource template. Alternatively a user may add the annotation manually to an existing resource to allow Helm to recreate a certain resource.

Adding the new annotation in general by default or changing the default behaviour of Helm is not a good idea, since deletion and recreation of certain resources can lead to an unwanted or undefined state as described in detail in https://github.com/helm/helm/pull/7431#issuecomment-666735299:

"[...] selectively deleting/re-creating volume-backed resources (Secrets, Services, ConfigMaps, PVCs) without touching the Deployment. If a Persistent Volume Claim is deleted and re-created, there is a risk that an application (Deployment) consuming that Persistent Volume Claim will not receive the new volume's mount point. The same can be said for Secrets, Configmaps, Services, and other resources which are deployed and relied upon by another resource. Environment variables populated in a Deployment from a Service won't be updated until the Pods are destroyed."

Closes #7082.

Signed-off-by: Daniel Strobusch <1847260+dastrobu@users.noreply.github.com>
pull/9174/head
Daniel Strobusch 4 years ago
parent 8b4be22a81
commit 0b58239b51
No known key found for this signature in database
GPG Key ID: 305FC1B7491BC270

@ -439,7 +439,7 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource
if len(toBeAdopted) == 0 && len(resources) > 0 {
_, err = i.cfg.KubeClient.Create(resources)
} else if len(resources) > 0 {
_, err = i.cfg.KubeClient.Update(toBeAdopted, resources, i.Force)
_, err = updateWithTimeoutOrFallback(i.cfg.KubeClient, toBeAdopted, resources, i.Force, i.Timeout)
}
if err != nil {
return rel, err

@ -0,0 +1,32 @@
/*
Copyright The Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package action
import (
"time"
"helm.sh/helm/v3/pkg/kube"
)
// updateWithTimeoutOrFallback is a compatibility function to fallback for Helm 3 clients (implementors of kube.Interface only)
// this function can be inlined in Helm 4, when there is no fallback necessary anymore.
func updateWithTimeoutOrFallback(kubeClient kube.Interface, original, target kube.ResourceList, force bool, timeout time.Duration) (*kube.Result, error) {
if kubeClient, ok := kubeClient.(kube.UpdateWithTimeout); ok {
return kubeClient.UpdateWithTimeout(original, target, force, timeout)
}
return kubeClient.Update(original, target, force)
}

@ -188,8 +188,8 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
if err != nil {
return targetRelease, errors.Wrap(err, "unable to set metadata visitor from target release")
}
results, err := r.cfg.KubeClient.Update(current, target, r.Force)
results, err := updateWithTimeoutOrFallback(r.cfg.KubeClient, current, target, r.Force, r.Timeout)
if err != nil {
msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err)
r.cfg.Log("warning: %s", msg)

@ -407,7 +407,7 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele
u.cfg.Log("upgrade hooks disabled for %s", upgradedRelease.Name)
}
results, err := u.cfg.KubeClient.Update(current, target, u.Force)
results, err := updateWithTimeoutOrFallback(u.cfg.KubeClient, current, target, u.Force, u.Timeout)
if err != nil {
u.cfg.recordRelease(originalRelease)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)

@ -178,7 +178,7 @@ func TestUpgradeRelease_Atomic(t *testing.T) {
upAction.cfg.Releases.Create(rel)
failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.UpdateError = fmt.Errorf("update fail")
failer.UpdateWithTimeoutError = fmt.Errorf("update fail")
upAction.cfg.KubeClient = failer
upAction.Atomic = true
vals := map[string]interface{}{}

@ -25,6 +25,7 @@ import (
"os"
"path/filepath"
"reflect"
"sort"
"strings"
"sync"
"time"
@ -47,6 +48,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/resource"
@ -58,6 +60,17 @@ import (
cmdutil "k8s.io/kubectl/pkg/cmd/util"
)
var accessor = meta.NewAccessor()
// Annotation for resource recreation on update. The two cases "conflict" and "invalid" are the two cases where
// kubectl apply --force tries a recreate, so those seem to be the most relevant two cases users usually want
// to control.
const (
updatePolicyAnnotation = "helm.sh/update-policy"
updatePolicyRecreateOnConflict = "recreate-on-conflict"
updatePolicyRecreateOnInvalid = "recreate-on-invalid"
)
// ErrNoObjectsVisited indicates that during a visit operation, no matching objects were found.
var ErrNoObjectsVisited = errors.New("no objects visited")
@ -379,14 +392,20 @@ func (c *Client) BuildTable(reader io.Reader, validate bool) (ResourceList, erro
return result, scrubValidationError(err)
}
// Update takes the current list of objects and target list of objects and
// Update is a wrapper for UpdateWithTimeout to avoid a breaking API change.
// Deprecated: prefer UpdateWithTimeout, cannot be removed until Helm 4
func (c *Client) Update(original, target ResourceList, force bool) (*Result, error) {
return c.UpdateWithTimeout(original, target, force, time.Duration(0))
}
// UpdateWithTimeout takes the current list of objects and target list of objects and
// creates resources that don't already exist, updates resources that have been
// modified in the target configuration, and deletes resources from the current
// configuration that are not present in the target configuration. If an error
// occurs, a Result will still be returned with the error, containing all
// resource updates, creations, and deletions that were attempted. These can be
// used for cleanup or other logging purposes.
func (c *Client) Update(original, target ResourceList, force bool) (*Result, error) {
func (c *Client) UpdateWithTimeout(original, target ResourceList, force bool, timeout time.Duration) (*Result, error) {
updateErrors := []string{}
res := &Result{}
@ -421,7 +440,7 @@ func (c *Client) Update(original, target ResourceList, force bool) (*Result, err
return errors.Errorf("no %s with the name %q found", kind, info.Name)
}
if err := updateResource(c, info, originalInfo.Object, force); err != nil {
if err := updateResource(c, info, originalInfo.Object, force, timeout); err != nil {
c.Log("error updating the resource %q:\n\t %v", info.Name, err)
updateErrors = append(updateErrors, err.Error())
}
@ -659,7 +678,7 @@ func createPatch(target *resource.Info, current runtime.Object) ([]byte, types.P
return patch, types.StrategicMergePatchType, err
}
func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, force bool) error {
func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, force bool, timeout time.Duration) error {
var (
obj runtime.Object
helper = resource.NewHelper(target.Client, target.Mapping).WithFieldManager(getManagedFieldsManager())
@ -669,9 +688,11 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object,
// if --force is applied, attempt to replace the existing resource with the new object.
if force {
var err error
obj, err = helper.Replace(target.Namespace, target.Name, true, target.Object)
obj, err = withUpdatePolicy(c, helper, target, timeout, func() (runtime.Object, error) {
return helper.Replace(target.Namespace, target.Name, true, target.Object)
})
if err != nil {
return errors.Wrap(err, "failed to replace object")
return errors.Wrapf(err, "failed to replace %q with kind %s", target.Name, kind)
}
c.Log("Replaced %q with kind %s for kind %s", target.Name, currentObj.GetObjectKind().GroupVersionKind().Kind, kind)
} else {
@ -691,7 +712,9 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object,
}
// send patch to server
c.Log("Patch %s %q in namespace %s", kind, target.Name, target.Namespace)
obj, err = helper.Patch(target.Namespace, target.Name, patchType, patch, nil)
obj, err = withUpdatePolicy(c, helper, target, timeout, func() (runtime.Object, error) {
return helper.Patch(target.Namespace, target.Name, patchType, patch, nil)
})
if err != nil {
return errors.Wrapf(err, "cannot patch %q with kind %s", target.Name, kind)
}
@ -701,6 +724,65 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object,
return nil
}
// withUpdatePolicy applies the update to an object by executing applyUpdate and falls back to recreating the object
// if allowed by the updatePolicyAnnotation
func withUpdatePolicy(c *Client, helper *resource.Helper, target *resource.Info, timeout time.Duration, applyUpdate func() (runtime.Object, error)) (runtime.Object, error) {
obj, updateErr := applyUpdate()
if updateErr == nil {
return obj, updateErr
}
annos, err := accessor.Annotations(target.Object)
if err != nil {
return nil, err
}
updatePolicies := strings.Split(strings.ReplaceAll(annos[updatePolicyAnnotation], " ", ""), ",")
sort.Strings(updatePolicies)
hasUpdatePolicy := func(updatePolicyValue string) bool {
return sort.SearchStrings(updatePolicies, updatePolicyValue) < len(updatePolicies)
}
recreate := false
switch reason := apierrors.ReasonForError(updateErr); reason {
case metav1.StatusReasonConflict:
recreate = hasUpdatePolicy(updatePolicyRecreateOnConflict)
case metav1.StatusReasonInvalid:
recreate = hasUpdatePolicy(updatePolicyRecreateOnInvalid)
}
if recreate {
c.Log("Update of %q of kind %s failed, trying to replace resource according to %s: %s", target.Name,
target.Mapping.GroupVersionKind.Kind, updatePolicyAnnotation, annos[updatePolicyAnnotation])
err = c.deleteAndCreate(helper, target, timeout)
// target is already refreshed so don't return an object.
return nil, err
}
return obj, updateErr
}
// deleteAndCreate deletes an object, polls until successfully deleted (or timeout is exceeded) and recreates it afterwards.
func (c *Client) deleteAndCreate(helper *resource.Helper, target *resource.Info, timeout time.Duration) error {
if err := deleteResource(target, metav1.DeletePropagationBackground); err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if err := wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) {
if _, err := helper.Get(target.Namespace, target.Name); !apierrors.IsNotFound(err) {
return false, err
}
return true, nil
}); err != nil {
return err
}
return createResource(target)
}
func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error {
kind := info.Mapping.GroupVersionKind.Kind
switch kind {

@ -22,6 +22,9 @@ import (
"net/http"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -83,6 +86,26 @@ func notFoundBody() *metav1.Status {
}
}
func unprocessableEntityBody() *metav1.Status {
return &metav1.Status{
Code: http.StatusUnprocessableEntity,
Status: metav1.StatusFailure,
Reason: metav1.StatusReasonInvalid,
Message: "cannot change",
Details: &metav1.StatusDetails{},
}
}
func conflictEntityBody() *metav1.Status {
return &metav1.Status{
Code: http.StatusConflict,
Status: metav1.StatusFailure,
Reason: metav1.StatusReasonConflict,
Message: "conflict",
Details: &metav1.StatusDetails{},
}
}
func newResponse(code int, obj runtime.Object) (*http.Response, error) {
header := http.Header{}
header.Set("Content-Type", runtime.ContentTypeJSON)
@ -166,7 +189,7 @@ func TestUpdate(t *testing.T) {
t.Fatal(err)
}
result, err := c.Update(first, second, false)
result, err := c.UpdateWithTimeout(first, second, false, 0)
if err != nil {
t.Fatal(err)
}
@ -213,6 +236,230 @@ func TestUpdate(t *testing.T) {
}
}
func TestClient_UpdateWithPolicy(t *testing.T) {
is := assert.New(t)
// setup two pods with one diff that should be patched
original := newPodList("starfish")
target := newPodList("starfish")
target.Items[0].Spec.Containers[0].Ports = []v1.ContainerPort{{Name: "https", ContainerPort: 443}}
// make sure there is a patchable difference
is.NotEqual(original.Items[0].Spec.Containers[0].Ports, target.Items[0].Spec.Containers[0].Ports)
okResponse := func() *http.Response {
res, _ := newResponse(http.StatusOK, &original.Items[0])
return res
}
notFoundResponse := func() *http.Response {
res, _ := newResponse(http.StatusNotFound, notFoundBody())
return res
}
unprocessableEntityResponse := func() *http.Response {
res, _ := newResponse(http.StatusUnprocessableEntity, unprocessableEntityBody())
return res
}
conflictEntityResponse := func() *http.Response {
res, _ := newResponse(http.StatusConflict, conflictEntityBody())
return res
}
tests := []struct {
name string
force bool
updatePolicy string
responses []*http.Response
finalResponseFactory func() *http.Response
expectedActions []string
fail bool
}{
{
name: "update should delete and recreate when invalid on PATCH",
force: false,
updatePolicy: updatePolicyRecreateOnInvalid,
responses: []*http.Response{
okResponse(), // GET
okResponse(), // GET
unprocessableEntityResponse(), // PATCH
okResponse(), // DELETE
notFoundResponse(), // GET
okResponse(), // POST
},
expectedActions: []string{
"GET",
"GET",
"PATCH",
"DELETE",
"GET",
"POST",
},
},
{
name: "update should delete and recreate when conflict on PATCH",
force: false,
updatePolicy: updatePolicyRecreateOnConflict,
responses: []*http.Response{
okResponse(), // GET
okResponse(), // GET
conflictEntityResponse(), // PATCH
okResponse(), // DELETE
notFoundResponse(), // GET
okResponse(), // POST
},
expectedActions: []string{
"GET",
"GET",
"PATCH",
"DELETE",
"GET",
"POST",
},
},
{
name: "update should delete and recreate when invalid on PUT",
force: true,
updatePolicy: updatePolicyRecreateOnInvalid,
responses: []*http.Response{
okResponse(), // GET
okResponse(), // GET
unprocessableEntityResponse(), // PUT
okResponse(), // DELETE
notFoundResponse(), // GET
okResponse(), // POST
},
expectedActions: []string{
"GET",
"GET",
"PUT",
"DELETE",
"GET",
"POST",
},
},
{
name: "update should delete and recreate when conflict on PUT",
force: true,
updatePolicy: updatePolicyRecreateOnConflict,
responses: []*http.Response{
okResponse(), // GET
okResponse(), // GET
conflictEntityResponse(), // PUT
okResponse(), // DELETE
notFoundResponse(), // GET
okResponse(), // POST
},
expectedActions: []string{
"GET",
"GET",
"PUT",
"DELETE",
"GET",
"POST",
},
},
{
name: "update should fail when timing out after DELETE",
force: false,
updatePolicy: updatePolicyRecreateOnInvalid,
responses: []*http.Response{
okResponse(), // GET
okResponse(), // GET
unprocessableEntityResponse(), // PATCH
okResponse(), // DELETE
},
// infinitely return OK on get, implying the server does not delete the resource within the given timeout
finalResponseFactory: okResponse, // GET
expectedActions: []string{
"GET",
"GET",
"PATCH",
"DELETE",
},
fail: true,
},
{
name: "update should fail when no update policy specified",
force: false,
updatePolicy: "",
responses: []*http.Response{
okResponse(), // GET
okResponse(), // GET
unprocessableEntityResponse(), // PATCH
},
expectedActions: []string{
"GET",
"GET",
"PATCH",
},
fail: true,
},
{
name: "update should fail when no update policy does not match error",
force: false,
updatePolicy: updatePolicyRecreateOnConflict,
responses: []*http.Response{
okResponse(), // GET
okResponse(), // GET
unprocessableEntityResponse(), // PATCH
},
expectedActions: []string{
"GET",
"GET",
"PATCH",
},
fail: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var actions []string
c := newTestClient(t)
c.Factory.(*cmdtesting.TestFactory).UnstructuredClient = &fake.RESTClient{
NegotiatedSerializer: unstructuredSerializer,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
p, m := req.URL.Path, req.Method
t.Logf("got request %s %s", p, m)
if len(tt.responses) == 0 {
if tt.finalResponseFactory != nil {
return tt.finalResponseFactory(), nil
}
t.Fatalf("unexpected request: %s %s", req.Method, req.URL.Path)
return nil, nil
}
// final responses are not added to actions, since it is unpredictable how often the final
// response will be returned on polling
actions = append(actions, m)
var response *http.Response
response, tt.responses = tt.responses[0], tt.responses[1:]
return response, nil
}),
}
target.Items[0].Annotations = map[string]string{updatePolicyAnnotation: tt.updatePolicy}
originalResourceList, err := c.Build(objBody(&original), false)
if err != nil {
t.Fatal(err)
}
targetResourceList, err := c.Build(objBody(&target), false)
if err != nil {
t.Fatal(err)
}
// timeout should be larger than one second, to see actual polling with one second poll interval
_, err = c.UpdateWithTimeout(originalResourceList, targetResourceList, tt.force, 2*time.Second)
if (tt.fail && err == nil) || (!tt.fail && err != nil) {
t.Fatal(err)
}
t.Log(tt.expectedActions)
t.Log(actions)
is.ElementsMatch(tt.expectedActions, actions)
})
}
}
func TestBuild(t *testing.T) {
tests := []struct {
name string

@ -41,6 +41,7 @@ type FailingKubeClient struct {
DeleteWithPropagationError error
WatchUntilReadyError error
UpdateError error
UpdateWithTimeoutError error
BuildError error
BuildTableError error
BuildDummy bool
@ -114,6 +115,13 @@ func (f *FailingKubeClient) Update(r, modified kube.ResourceList, ignoreMe bool)
return f.PrintingKubeClient.Update(r, modified, ignoreMe)
}
func (f *FailingKubeClient) UpdateWithTimeout(r, modified kube.ResourceList, ignoreMe bool, timeout time.Duration) (*kube.Result, error) {
if f.UpdateWithTimeoutError != nil {
return &kube.Result{}, f.UpdateWithTimeoutError
}
return f.PrintingKubeClient.UpdateWithTimeout(r, modified, ignoreMe, 0)
}
// Build returns the configured error if set or prints
func (f *FailingKubeClient) Build(r io.Reader, _ bool) (kube.ResourceList, error) {
if f.BuildError != nil {

@ -90,7 +90,11 @@ func (p *PrintingKubeClient) WatchUntilReady(resources kube.ResourceList, _ time
}
// Update implements KubeClient Update.
func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ bool) (*kube.Result, error) {
func (p *PrintingKubeClient) Update(original, target kube.ResourceList, force bool) (*kube.Result, error) {
return p.UpdateWithTimeout(original, target, force, 0)
}
func (p *PrintingKubeClient) UpdateWithTimeout(_, modified kube.ResourceList, _ bool, _ time.Duration) (*kube.Result, error) {
_, err := io.Copy(p.Out, bufferize(modified))
if err != nil {
return nil, err

@ -110,7 +110,17 @@ type InterfaceResources interface {
BuildTable(reader io.Reader, validate bool) (ResourceList, error)
}
// UpdateWithTimeout is introduced to avoid breaking backwards compatibility for Interface and InterfaceExt implementers.
// It extends Interface.Update by a timeout, which is used to wait until a resource is deleted in case the a replacement
// is necessary due to the "helm.sh/update-policy".
//
// TODO Helm 4: Remove UpdateWithTimeout and integrate its method(s) into the Interface.
type UpdateWithTimeout interface {
UpdateWithTimeout(original, target ResourceList, force bool, timeout time.Duration) (*Result, error)
}
var _ Interface = (*Client)(nil)
var _ InterfaceExt = (*Client)(nil)
var _ InterfaceDeletionPropagation = (*Client)(nil)
var _ InterfaceResources = (*Client)(nil)
var _ UpdateWithTimeout = (*Client)(nil)

Loading…
Cancel
Save