From 462d8c6bd3720814853733a91de34f34361da29a Mon Sep 17 00:00:00 2001 From: Daniel Strobusch <1847260+dastrobu@users.noreply.github.com> Date: Fri, 17 Jan 2020 19:20:17 +0100 Subject: [PATCH] feat(helm): add recreate upgrade (rollback) strategy An additional optional flag --recreate can be passed on upgrade (or rollback) of a release. In combination with the --force flag the following strategies are employed when updating a resource (which can be directly compared to kubectl): helm kubectl action on 'invalid' or 'conflict' -------------------------------------------------------------------------------------------------------------- upgrade apply PATCH fail upgrade --force replace PUT fail upgrade --recreate apply --force PATCH DELETE -> GET (poll) -> POST upgrade --recreate --force replace --force DELETE -> GET (poll) -> POST fail The 'on error' column should be interpreted as follows. The server responds with 'invalid' e.g. if a certain resource contains an immutable field, which cannot be patched or updated by a PUT. In theses cases it can be helpful to delete the resource and recreate it. Examples for theses cases are: * roleRef in RoleBinding * spec.clusterIP in Service * parameters in StorageClass Closes #7082. Signed-off-by: Daniel Strobusch <1847260+dastrobu@users.noreply.github.com> --- cmd/helm/rollback.go | 3 +- cmd/helm/upgrade.go | 3 +- pkg/action/action.go | 4 + pkg/action/action_test.go | 20 +++- pkg/action/rollback.go | 43 ++++--- pkg/action/rollback_test.go | 80 +++++++++++++ pkg/action/upgrade.go | 27 +++-- pkg/action/upgrade_test.go | 39 ++++++ pkg/kube/client.go | 98 +++++++++++---- pkg/kube/client_test.go | 229 ++++++++++++++++++++++++++++++++++++ pkg/kube/fake/fake.go | 10 ++ pkg/kube/fake/printer.go | 7 ++ pkg/kube/fake/spy.go | 97 +++++++++++++++ pkg/kube/interface.go | 23 +++- scripts/completions.bash | 0 15 files changed, 634 insertions(+), 49 deletions(-) create mode 100644 pkg/action/rollback_test.go create mode 100644 pkg/kube/fake/spy.go create mode 100644 scripts/completions.bash diff --git a/cmd/helm/rollback.go b/cmd/helm/rollback.go index 3b336d0ff..34f8841d6 100644 --- a/cmd/helm/rollback.go +++ b/cmd/helm/rollback.go @@ -73,7 +73,8 @@ func newRollbackCmd(cfg *action.Configuration, out io.Writer) *cobra.Command { f := cmd.Flags() f.BoolVar(&client.DryRun, "dry-run", false, "simulate a rollback") f.BoolVar(&client.Recreate, "recreate-pods", false, "performs pods restart for the resource if applicable") - f.BoolVar(&client.Force, "force", false, "force resource update through delete/recreate if needed") + f.BoolVar(&client.Force, "force", false, "force resources to be replaced by rendered templates on rollback. In combination with --recreate resources will be deleted before recreation.") + f.BoolVar(&client.RecreateResources, "recreate", false, "patch resources, on error delete and recreate resources. In combination with --force always delete resources before recreation.") f.BoolVar(&client.DisableHooks, "no-hooks", false, "prevent hooks from running during rollback") f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)") f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout") diff --git a/cmd/helm/upgrade.go b/cmd/helm/upgrade.go index dc3bc9a58..d4daf5b1b 100644 --- a/cmd/helm/upgrade.go +++ b/cmd/helm/upgrade.go @@ -171,7 +171,8 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command { f.BoolVar(&client.DryRun, "dry-run", false, "simulate an upgrade") f.BoolVar(&client.Recreate, "recreate-pods", false, "performs pods restart for the resource if applicable") f.MarkDeprecated("recreate-pods", "functionality will no longer be updated. Consult the documentation for other methods to recreate pods") - f.BoolVar(&client.Force, "force", false, "force resource updates through a replacement strategy") + f.BoolVar(&client.Force, "force", false, "force resources to be replaced by rendered templates on update. In combination with --recreate resources will be deleted before recreation.") + f.BoolVar(&client.RecreateResources, "recreate", false, "patch resources, on error delete and recreate resources. In combination with --force always delete resourced before recreation.") f.BoolVar(&client.DisableHooks, "no-hooks", false, "disable pre/post upgrade hooks") f.BoolVar(&client.DisableOpenAPIValidation, "disable-openapi-validation", false, "if set, the upgrade process will not validate rendered templates against the Kubernetes OpenAPI Schema") f.BoolVar(&client.SkipCRDs, "skip-crds", false, "if set, no CRDs will be installed when an upgrade is performed with install flag enabled. By default, CRDs are installed if not already present, when an upgrade is performed with install flag enabled") diff --git a/pkg/action/action.go b/pkg/action/action.go index bb9ef5f71..4339cd084 100644 --- a/pkg/action/action.go +++ b/pkg/action/action.go @@ -85,6 +85,9 @@ type Configuration struct { // KubeClient is a Kubernetes API client. KubeClient kube.Interface + // KubeClient is a Kubernetes API client (version 2) + KubeClientV2 kube.InterfaceV2 + // RegistryClient is a client for working with registries RegistryClient *registry.Client @@ -408,6 +411,7 @@ func (c *Configuration) Init(getter genericclioptions.RESTClientGetter, namespac c.RESTClientGetter = getter c.KubeClient = kc + c.KubeClientV2 = kc c.Releases = store c.Log = log diff --git a/pkg/action/action_test.go b/pkg/action/action_test.go index 0cbdb162b..d0c35c495 100644 --- a/pkg/action/action_test.go +++ b/pkg/action/action_test.go @@ -20,10 +20,14 @@ import ( "flag" "io/ioutil" "net/http" + "os" "path/filepath" "testing" + "helm.sh/helm/v3/pkg/cli" + dockerauth "github.com/deislabs/oras/pkg/auth/docker" + "github.com/stretchr/testify/assert" fakeclientset "k8s.io/client-go/kubernetes/fake" "helm.sh/helm/v3/internal/experimental/registry" @@ -77,9 +81,11 @@ func actionConfigFixture(t *testing.T) *Configuration { t.Fatal(err) } + kubeClient := kubefake.FailingKubeClient{PrintingKubeClient: kubefake.PrintingKubeClient{Out: ioutil.Discard}} return &Configuration{ Releases: storage.Init(driver.NewMemory()), - KubeClient: &kubefake.FailingKubeClient{PrintingKubeClient: kubefake.PrintingKubeClient{Out: ioutil.Discard}}, + KubeClient: &kubeClient, + KubeClientV2: &kubeClient, Capabilities: chartutil.DefaultCapabilities, RegistryClient: registryClient, Log: func(format string, v ...interface{}) { @@ -317,6 +323,18 @@ func TestGetVersionSet(t *testing.T) { } } +func TestKubeClientSet(t *testing.T) { + is := assert.New(t) + config := new(Configuration) + settings := cli.New() + if err := config.Init(settings.RESTClientGetter(), settings.Namespace(), os.Getenv("HELM_DRIVER"), func(_ string, v ...interface{}) {}); err != nil { + t.Error(err) + } + is.NotNil(config.KubeClient, "KubeClient not set") + is.NotNil(config.KubeClientV2, "KubeClientV2 not set") + is.Equal(config.KubeClient, config.KubeClientV2, "KubeClientV2 not set") +} + // TestValidName is a regression test for ValidName // // Kubernetes has strict naming conventions for resource names. This test represents diff --git a/pkg/action/rollback.go b/pkg/action/rollback.go index 81812983f..81ad5936e 100644 --- a/pkg/action/rollback.go +++ b/pkg/action/rollback.go @@ -22,6 +22,8 @@ import ( "strings" "time" + "helm.sh/helm/v3/pkg/kube" + "github.com/pkg/errors" "helm.sh/helm/v3/pkg/release" @@ -34,14 +36,18 @@ import ( type Rollback struct { cfg *Configuration - Version int - Timeout time.Duration - Wait bool - DisableHooks bool - DryRun bool - Recreate bool // will (if true) recreate pods after a rollback. - Force bool // will (if true) force resource upgrade through uninstall/recreate if needed - CleanupOnFail bool + Version int + Timeout time.Duration + Wait bool + DisableHooks bool + DryRun bool + // Recreate will (if true) recreate pods after a rollback. (not to be confused with RecreateResources) + Recreate bool + // recreate resources on update + // for compatibility reasons this field cannot be named "Recreate", since "Recreate" is referring to the "recreate-pods" flag. + RecreateResources bool + Force bool // will (if true) force resource upgrade through uninstall/recreate if needed + CleanupOnFail bool } // NewRollback creates a new Rollback object with the given configuration. @@ -159,9 +165,18 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas r.cfg.Log("rollback hooks disabled for %s", targetRelease.Name) } - results, err := r.cfg.KubeClient.Update(current, target, r.Force) - - if err != nil { + var result *kube.Result + kubeClientV2 := r.cfg.KubeClientV2 + switch { + case r.RecreateResources && kubeClientV2 != nil: + result, err = kubeClientV2.UpdateRecreate(current, target, r.Force, r.Timeout) + case r.RecreateResources: + r.cfg.Log("warning: kubeClient does not support recreate flag, ignoring it.") + fallthrough + default: + result, err = r.cfg.KubeClient.Update(current, target, r.Force) + } + if err != nil && result != nil { msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err) r.cfg.Log("warning: %s", msg) currentRelease.Info.Status = release.StatusSuperseded @@ -170,8 +185,8 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas r.cfg.recordRelease(currentRelease) r.cfg.recordRelease(targetRelease) if r.CleanupOnFail { - r.cfg.Log("Cleanup on fail set, cleaning up %d resources", len(results.Created)) - _, errs := r.cfg.KubeClient.Delete(results.Created) + r.cfg.Log("Cleanup on fail set, cleaning up %d resources", len(result.Created)) + _, errs := r.cfg.KubeClient.Delete(result.Created) if errs != nil { var errorList []string for _, e := range errs { @@ -189,7 +204,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas // log if an error occurs and continue onward. If we ever introduce log // levels, we should make these error level logs so users are notified // that they'll need to go do the cleanup on their own - if err := recreate(r.cfg, results.Updated); err != nil { + if err := recreate(r.cfg, result.Updated); err != nil { r.cfg.Log(err.Error()) } } diff --git a/pkg/action/rollback_test.go b/pkg/action/rollback_test.go new file mode 100644 index 000000000..8d4924b8c --- /dev/null +++ b/pkg/action/rollback_test.go @@ -0,0 +1,80 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package action + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + kubefake "helm.sh/helm/v3/pkg/kube/fake" + "helm.sh/helm/v3/pkg/release" +) + +func rollbackAction(t *testing.T) *Rollback { + config := actionConfigFixture(t) + rbAction := NewRollback(config) + + return rbAction +} + +func TestRollbackRelease_RecreateResources(t *testing.T) { + is := assert.New(t) + + rbAction := rollbackAction(t) + rbAction.Timeout = 0 + rbAction.RecreateResources = true + + cfg := rbAction.cfg + kubeClient := cfg.KubeClientV2 + + prevRelease := releaseStub() + prevRelease.Info.Status = release.StatusSuperseded + prevRelease.Name = "my-release" + prevRelease.Version = 0 + err := cfg.Releases.Create(prevRelease) + is.NoError(err) + + currRelease := releaseStub() + currRelease.Info.Status = release.StatusDeployed + currRelease.Name = "my-release" + currRelease.Version = 1 + err = cfg.Releases.Create(currRelease) + is.NoError(err) + + t.Run("recreate should work when kubeClient and kubeClientV2 is set", func(t *testing.T) { + verifiableKubeClient := kubefake.NewKubeClientSpy(kubeClient) + cfg.KubeClient = verifiableKubeClient + cfg.KubeClientV2 = verifiableKubeClient + + err := rbAction.Run(currRelease.Name) + is.NoError(err) + is.Equal(verifiableKubeClient.Calls["Update"], 0) + is.Equal(verifiableKubeClient.Calls["UpdateRecreate"], 1) + }) + + t.Run("recreate should fallback to Update when only kubeClient is set", func(t *testing.T) { + kubeClientSpy := kubefake.NewKubeClientSpy(kubeClient) + cfg.KubeClient = kubeClientSpy + cfg.KubeClientV2 = nil + + err := rbAction.Run(currRelease.Name) + is.NoError(err) + is.Equal(kubeClientSpy.Calls["Update"], 1) + is.Equal(kubeClientSpy.Calls["UpdateRecreate"], 0) + }) +} diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index fc289dbab..75c76fe07 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -73,11 +73,14 @@ type Upgrade struct { // // This should be used with caution. Force bool + // recreate resources on update + // for compatibility reasons this field cannot be named "Recreate", since "Recreate" is referring to the "recreate-pods" flag. + RecreateResources bool // ResetValues will reset the values to the chart's built-ins rather than merging with existing. ResetValues bool // ReuseValues will re-use the user's last supplied values. ReuseValues bool - // Recreate will (if true) recreate pods after a rollback. + // Recreate will (if true) recreate pods after a upgrade. (not to be confused with RecreateResources) Recreate bool // MaxHistory limits the maximum number of revisions saved per release MaxHistory int @@ -319,10 +322,20 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea u.cfg.Log("upgrade hooks disabled for %s", upgradedRelease.Name) } - results, err := u.cfg.KubeClient.Update(current, target, u.Force) - if err != nil { + var result *kube.Result + kubeClientV2 := u.cfg.KubeClientV2 + switch { + case u.RecreateResources && kubeClientV2 != nil: + result, err = kubeClientV2.UpdateRecreate(current, target, u.Force, u.Timeout) + case u.RecreateResources: + u.cfg.Log("warning: kubeClient does not support recreate flag, ignoring it.") + fallthrough + default: + result, err = u.cfg.KubeClient.Update(current, target, u.Force) + } + if err != nil && result != nil { u.cfg.recordRelease(originalRelease) - return u.failRelease(upgradedRelease, results.Created, err) + return u.failRelease(upgradedRelease, result.Created, err) } if u.Recreate { @@ -330,7 +343,7 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea // log if an error occurs and continue onward. If we ever introduce log // levels, we should make these error level logs so users are notified // that they'll need to go do the cleanup on their own - if err := recreate(u.cfg, results.Updated); err != nil { + if err := recreate(u.cfg, result.Updated); err != nil { u.cfg.Log(err.Error()) } } @@ -338,14 +351,14 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea if u.Wait { if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil { u.cfg.recordRelease(originalRelease) - return u.failRelease(upgradedRelease, results.Created, err) + return u.failRelease(upgradedRelease, result.Created, err) } } // post-upgrade hooks if !u.DisableHooks { if err := u.cfg.execHook(upgradedRelease, release.HookPostUpgrade, u.Timeout); err != nil { - return u.failRelease(upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %s", err)) + return u.failRelease(upgradedRelease, result.Created, fmt.Errorf("post-upgrade hooks failed: %s", err)) } } diff --git a/pkg/action/upgrade_test.go b/pkg/action/upgrade_test.go index f25d115c4..dd989b242 100644 --- a/pkg/action/upgrade_test.go +++ b/pkg/action/upgrade_test.go @@ -253,3 +253,42 @@ func TestUpgradeRelease_ReuseValues(t *testing.T) { is.Equal(expectedValues, updatedRes.Config) }) } + +func TestUpgradeRelease_RecreateResources(t *testing.T) { + is := assert.New(t) + + upAction := upgradeAction(t) + upAction.Timeout = 0 + upAction.RecreateResources = true + + cfg := upAction.cfg + kubeClient := cfg.KubeClientV2 + + rel := releaseStub() + rel.Info.Status = release.StatusDeployed + rel.Name = "my-release" + err := cfg.Releases.Create(rel) + is.NoError(err) + + t.Run("recreate should work when kubeClient and kubeClientV2 is set", func(t *testing.T) { + verifiableKubeClient := kubefake.NewKubeClientSpy(kubeClient) + cfg.KubeClient = verifiableKubeClient + cfg.KubeClientV2 = verifiableKubeClient + + _, err := upAction.Run(rel.Name, buildChart(), map[string]interface{}{}) + is.NoError(err) + is.Equal(verifiableKubeClient.Calls["Update"], 0) + is.Equal(verifiableKubeClient.Calls["UpdateRecreate"], 1) + }) + + t.Run("recreate should fallback to Update when only kubeClient is set", func(t *testing.T) { + kubeClientSpy := kubefake.NewKubeClientSpy(kubeClient) + cfg.KubeClient = kubeClientSpy + cfg.KubeClientV2 = nil + + _, err := upAction.Run(rel.Name, buildChart(), map[string]interface{}{}) + is.NoError(err) + is.Equal(kubeClientSpy.Calls["Update"], 1) + is.Equal(kubeClientSpy.Calls["UpdateRecreate"], 0) + }) +} diff --git a/pkg/kube/client.go b/pkg/kube/client.go index f908611db..d8351758d 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -39,6 +39,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/resource" @@ -167,7 +168,16 @@ func (c *Client) Build(reader io.Reader, validate bool) (ResourceList, error) { // resource updates, creations, and deletions that were attempted. These can be // used for cleanup or other logging purposes. func (c *Client) Update(original, target ResourceList, force bool) (*Result, error) { - updateErrors := []string{} + return c.update(target, original, force, false, 0) +} + +// Like Update but using the recreate strategy on updating a resource +func (c *Client) UpdateRecreate(original, target ResourceList, force bool, timeout time.Duration) (*Result, error) { + return c.update(target, original, force, true, timeout) +} + +func (c *Client) update(target ResourceList, original ResourceList, force bool, recreate bool, timeout time.Duration) (*Result, error) { + var updateErrors []string res := &Result{} c.Log("checking %d resources for changes", len(target)) @@ -201,7 +211,7 @@ func (c *Client) Update(original, target ResourceList, force bool) (*Result, err return errors.Errorf("no %s with the name %q found", kind, info.Name) } - if err := updateResource(c, info, originalInfo.Object, force); err != nil { + if err := updateResource(c, info, originalInfo.Object, force, recreate, timeout); err != nil { c.Log("error updating the resource %q:\n\t %v", info.Name, err) updateErrors = append(updateErrors, err.Error()) } @@ -411,47 +421,87 @@ func createPatch(target *resource.Info, current runtime.Object) ([]byte, types.P return patch, types.StrategicMergePatchType, err } -func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, force bool) error { +func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, force bool, recreate bool, timeout time.Duration) error { var ( - obj runtime.Object helper = resource.NewHelper(target.Client, target.Mapping) kind = target.Mapping.GroupVersionKind.Kind ) - // if --force is applied, attempt to replace the existing resource with the new object. - if force { - var err error - obj, err = helper.Replace(target.Namespace, target.Name, true, target.Object) + // update strategies: + // default PATCH + // --force PUT + // --recreate PATCH, on failure DELETE, POST + // --recreate --force DELETE, POST + switch { + case recreate && force: + err := c.deleteAndCreate(helper, target, timeout) if err != nil { - return errors.Wrap(err, "failed to replace object") + return errors.Wrapf(err, "failed to recreate %q with kind %s", target.Name, kind) } - c.Log("Replaced %q with kind %s for kind %s", target.Name, currentObj.GetObjectKind().GroupVersionKind().Kind, kind) - } else { - patch, patchType, err := createPatch(target, currentObj) + case recreate: + err := c.patch(helper, target, currentObj) if err != nil { - return errors.Wrap(err, "failed to create patch") - } - - if patch == nil || string(patch) == "{}" { - c.Log("Looks like there are no changes for %s %q", target.Mapping.GroupVersionKind.Kind, target.Name) - // This needs to happen to make sure that Helm has the latest info from the API - // Otherwise there will be no labels and other functions that use labels will panic - if err := target.Get(); err != nil { - return errors.Wrap(err, "failed to refresh resource information") + if apierrors.IsConflict(err) || apierrors.IsInvalid(err) { + err = c.deleteAndCreate(helper, target, timeout) } - return nil + return errors.Wrapf(err, "failed to recreate %q with kind %s", target.Name, kind) } - // send patch to server - obj, err = helper.Patch(target.Namespace, target.Name, patchType, patch, nil) + case force: + obj, err := helper.Replace(target.Namespace, target.Name, true, target.Object) + if err != nil { + return errors.Wrapf(err, "failed to replace %q with kind %s", target.Name, kind) + } + target.Refresh(obj, true) + c.Log("Replaced %q with kind %s for kind %s\n", target.Name, currentObj.GetObjectKind().GroupVersionKind().Kind, kind) + default: + err := c.patch(helper, target, currentObj) if err != nil { return errors.Wrapf(err, "cannot patch %q with kind %s", target.Name, kind) } } + return nil +} + +func (c *Client) patch(helper *resource.Helper, target *resource.Info, currentObj runtime.Object) error { + patch, patchType, err := createPatch(target, currentObj) + if err != nil { + return errors.Wrap(err, "failed to create patch") + } + if patch == nil || string(patch) == "{}" { + c.Log("Looks like there are no changes for %s %q", target.Mapping.GroupVersionKind.Kind, target.Name) + // This needs to happen to make sure that Helm has the latest info from the API + // Otherwise there will be no labels and other functions that use labels will panic + if err := target.Get(); err != nil { + return errors.Wrap(err, "failed to refresh resource information") + } + return nil + } + obj, err := helper.Patch(target.Namespace, target.Name, patchType, patch, nil) + if err != nil { + return err + } target.Refresh(obj, true) return nil } +func (c *Client) deleteAndCreate(helper *resource.Helper, target *resource.Info, timeout time.Duration) error { + if err := deleteResource(target); err != nil { + return err + } + + if err := wait.PollImmediate(1*time.Second, timeout, func() (bool, error) { + if _, err := helper.Get(target.Namespace, target.Name, false); !apierrors.IsNotFound(err) { + return false, err + } + return true, nil + }); err != nil { + return err + } + + return createResource(target) +} + func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error { kind := info.Mapping.GroupVersionKind.Kind switch kind { diff --git a/pkg/kube/client_test.go b/pkg/kube/client_test.go index 568afa094..9203ef3e9 100644 --- a/pkg/kube/client_test.go +++ b/pkg/kube/client_test.go @@ -23,7 +23,9 @@ import ( "net/http" "strings" "testing" + "time" + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -84,6 +86,36 @@ func notFoundBody() *metav1.Status { } } +func unprocessableEntityBody() *metav1.Status { + return &metav1.Status{ + Code: http.StatusUnprocessableEntity, + Status: metav1.StatusFailure, + Reason: metav1.StatusReasonInvalid, + Message: "cannot change", + Details: &metav1.StatusDetails{}, + } +} + +func conflictEntityBody() *metav1.Status { + return &metav1.Status{ + Code: http.StatusConflict, + Status: metav1.StatusFailure, + Reason: metav1.StatusReasonConflict, + Message: "conflict", + Details: &metav1.StatusDetails{}, + } +} + +func gatewayTimeoutEntityBody() *metav1.Status { + return &metav1.Status{ + Code: http.StatusGatewayTimeout, + Status: metav1.StatusFailure, + Reason: metav1.StatusReasonTimeout, + Message: "timeout", + Details: &metav1.StatusDetails{}, + } +} + func newResponse(code int, obj runtime.Object) (*http.Response, error) { header := http.Header{} header.Set("Content-Type", runtime.ContentTypeJSON) @@ -300,6 +332,203 @@ func TestPerform(t *testing.T) { } } +func TestClient_UpdateRecreate(t *testing.T) { + + is := assert.New(t) + + // setup two pods with one diff that should be patched + listA := newPodList("starfish") + listB := newPodList("starfish") + listB.Items[0].Spec.Containers[0].Ports = []v1.ContainerPort{{Name: "https", ContainerPort: 443}} + + // make sure there is a patchable difference + is.NotEqual(listA.Items[0].Spec.Containers[0].Ports, listB.Items[0].Spec.Containers[0].Ports) + + okResponse := func() *http.Response { + res, _ := newResponse(http.StatusOK, &listA.Items[0]) + return res + } + notFoundResponse := func() *http.Response { + res, _ := newResponse(http.StatusNotFound, notFoundBody()) + return res + } + unprocessableEntityResponse := func() *http.Response { + res, _ := newResponse(http.StatusUnprocessableEntity, unprocessableEntityBody()) + return res + } + conflictEntityResponse := func() *http.Response { + res, _ := newResponse(http.StatusConflict, conflictEntityBody()) + return res + } + gatewayTimeoutResponse := func() *http.Response { + res, _ := newResponse(http.StatusGatewayTimeout, gatewayTimeoutEntityBody()) + return res + } + + tests := []struct { + name string + force bool + responses []*http.Response + finalResponseFactory func() *http.Response + expectedActions []string + fail bool + }{ + { + name: "update recreate should delete and recreate when invalid on PATCH", + force: false, + responses: []*http.Response{ + okResponse(), // GET + okResponse(), // GET + unprocessableEntityResponse(), // PATCH + okResponse(), // DELETE + notFoundResponse(), // GET + okResponse(), // POST + }, + expectedActions: []string{ + "GET", + "GET", + "PATCH", + "DELETE", + "GET", + "POST", + }, + }, + { + name: "update recreate should delete and recreate when conflict on PATCH", + force: false, + responses: []*http.Response{ + okResponse(), // GET + okResponse(), // GET + conflictEntityResponse(), // PATCH + okResponse(), // DELETE + notFoundResponse(), // GET + okResponse(), // POST + }, + expectedActions: []string{ + "GET", + "GET", + "PATCH", + "DELETE", + "GET", + "POST", + }, + }, + { + name: "update recreate should delete and recreate when force flag is true", + force: true, + responses: []*http.Response{ + okResponse(), // GET + okResponse(), // GET + okResponse(), // DELETE + notFoundResponse(), // GET + okResponse(), // POST + }, + expectedActions: []string{ + "GET", + "GET", + "DELETE", + "GET", + "POST", + }, + }, + { + name: "update recreate should fail when timeout on PATCH", + force: false, + responses: []*http.Response{ + okResponse(), // GET + okResponse(), // GET + gatewayTimeoutResponse(), // PATCH + }, + expectedActions: []string{ + "GET", + "GET", + "PATCH", + }, + fail: true, + }, + { + name: "update recreate should fail when timing out after DELETE", + force: false, + responses: []*http.Response{ + okResponse(), // GET + okResponse(), // GET + unprocessableEntityResponse(), // PATCH + okResponse(), // DELETE + }, + // infinitely return OK on get, implying the server does not delete the resource within the given timeout + finalResponseFactory: okResponse, // GET + expectedActions: []string{ + "GET", + "GET", + "PATCH", + "DELETE", + }, + fail: true, + }, + { + name: "update recreate force should fail when timing out after DELETE", + force: true, + responses: []*http.Response{ + okResponse(), // GET + okResponse(), // GET + okResponse(), // DELETE + }, + // infinitely return OK on get, implying the server does not delete the resource within the given timeout + finalResponseFactory: okResponse, // GET + expectedActions: []string{ + "GET", + "GET", + "DELETE", + }, + fail: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var actions []string + c := newTestClient() + c.Factory.(*cmdtesting.TestFactory).UnstructuredClient = &fake.RESTClient{ + NegotiatedSerializer: unstructuredSerializer, + Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { + p, m := req.URL.Path, req.Method + t.Logf("got request %s %s", p, m) + if len(tt.responses) == 0 { + if tt.finalResponseFactory != nil { + return tt.finalResponseFactory(), nil + } + t.Fatalf("unexpected request: %s %s", req.Method, req.URL.Path) + return nil, nil + } + // final responses are not added to actions, since it is unpredictable how often the final + // response will be returned on polling + actions = append(actions, m) + var response *http.Response + response, tt.responses = tt.responses[0], tt.responses[1:] + return response, nil + }), + } + first, err := c.Build(objBody(&listA), false) + if err != nil { + t.Fatal(err) + } + second, err := c.Build(objBody(&listB), false) + if err != nil { + t.Fatal(err) + } + + // timeout should be larger than one second, to see actual polling with one second poll interval + _, err = c.UpdateRecreate(first, second, tt.force, 2*time.Second) + if (tt.fail && err == nil) || (!tt.fail && err != nil) { + t.Fatal(err) + } + t.Log(tt.expectedActions) + t.Log(actions) + is.ElementsMatch(tt.expectedActions, actions) + }) + } +} + func TestReal(t *testing.T) { t.Skip("This is a live test, comment this line to run") c := New(nil) diff --git a/pkg/kube/fake/fake.go b/pkg/kube/fake/fake.go index b3f7a393b..29f655916 100644 --- a/pkg/kube/fake/fake.go +++ b/pkg/kube/fake/fake.go @@ -97,3 +97,13 @@ func (f *FailingKubeClient) WaitAndGetCompletedPodPhase(s string, d time.Duratio } return f.PrintingKubeClient.WaitAndGetCompletedPodPhase(s, d) } + +// UpdateRecreate returns the configured error if set or prints +func (f *FailingKubeClient) UpdateRecreate(r, modified kube.ResourceList, force bool, timeout time.Duration) (*kube.Result, error) { + if f.UpdateError != nil { + return &kube.Result{}, f.UpdateError + } + return f.PrintingKubeClient.UpdateRecreate(r, modified, force, timeout) +} + +var _ kube.InterfaceV2 = (*FailingKubeClient)(nil) diff --git a/pkg/kube/fake/printer.go b/pkg/kube/fake/printer.go index 58b389ab5..df32ac5a1 100644 --- a/pkg/kube/fake/printer.go +++ b/pkg/kube/fake/printer.go @@ -81,6 +81,11 @@ func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ bool) (*kub return &kube.Result{Updated: modified}, nil } +// UpdateRecreate falls back to Update +func (p *PrintingKubeClient) UpdateRecreate(original, target kube.ResourceList, force bool, _ time.Duration) (*kube.Result, error) { + return p.Update(original, target, force) +} + // Build implements KubeClient Build. func (p *PrintingKubeClient) Build(_ io.Reader, _ bool) (kube.ResourceList, error) { return []*resource.Info{}, nil @@ -98,3 +103,5 @@ func bufferize(resources kube.ResourceList) io.Reader { } return strings.NewReader(builder.String()) } + +var _ kube.InterfaceV2 = (*PrintingKubeClient)(nil) diff --git a/pkg/kube/fake/spy.go b/pkg/kube/fake/spy.go new file mode 100644 index 000000000..c1a7e7b88 --- /dev/null +++ b/pkg/kube/fake/spy.go @@ -0,0 +1,97 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fake + +import ( + "io" + "runtime" + "strings" + "time" + + v1 "k8s.io/api/core/v1" + + "helm.sh/helm/v3/pkg/kube" +) + +// KubeClient wrapper which can be used for testing to verify that a certain method has been called a certain +// number of times +type KubeClientSpy struct { + KubeClientV2 kube.InterfaceV2 + // map with function names as keys and number of times it was called as names + Calls map[string]int +} + +func NewKubeClientSpy(kubeClient kube.InterfaceV2) KubeClientSpy { + return KubeClientSpy{ + KubeClientV2: kubeClient, + Calls: make(map[string]int), + } +} + +func functionName() string { + pc := make([]uintptr, 15) + n := runtime.Callers(2, pc) + frames := runtime.CallersFrames(pc[:n]) + frame, _ := frames.Next() + pathSegments := strings.Split(frame.Function, ".") + return pathSegments[len(pathSegments)-1] +} + +func (v KubeClientSpy) Create(resources kube.ResourceList) (*kube.Result, error) { + v.Calls[functionName()]++ + return v.KubeClientV2.Create(resources) +} + +func (v KubeClientSpy) Wait(resources kube.ResourceList, timeout time.Duration) error { + v.Calls[functionName()]++ + return v.KubeClientV2.Wait(resources, timeout) +} + +func (v KubeClientSpy) Delete(resources kube.ResourceList) (*kube.Result, []error) { + v.Calls[functionName()]++ + return v.KubeClientV2.Delete(resources) +} + +func (v KubeClientSpy) WatchUntilReady(resources kube.ResourceList, timeout time.Duration) error { + v.Calls[functionName()]++ + return v.KubeClientV2.WatchUntilReady(resources, timeout) +} + +func (v KubeClientSpy) Update(original, target kube.ResourceList, force bool) (*kube.Result, error) { + v.Calls[functionName()]++ + return v.KubeClientV2.Update(original, target, force) +} + +func (v KubeClientSpy) Build(reader io.Reader, validate bool) (kube.ResourceList, error) { + v.Calls[functionName()]++ + return v.KubeClientV2.Build(reader, validate) +} + +func (v KubeClientSpy) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) { + v.Calls[functionName()]++ + return v.KubeClientV2.WaitAndGetCompletedPodPhase(name, timeout) +} + +func (v KubeClientSpy) IsReachable() error { + v.Calls[functionName()]++ + return v.KubeClientV2.IsReachable() +} + +func (v KubeClientSpy) UpdateRecreate(original, target kube.ResourceList, force bool, timeout time.Duration) (*kube.Result, error) { + v.Calls[functionName()]++ + return v.KubeClientV2.UpdateRecreate(original, target, force, timeout) +} diff --git a/pkg/kube/interface.go b/pkg/kube/interface.go index 4bf61211e..af24f3a48 100644 --- a/pkg/kube/interface.go +++ b/pkg/kube/interface.go @@ -63,4 +63,25 @@ type Interface interface { IsReachable() error } -var _ Interface = (*Client)(nil) +// Extended Kubernetes client interface +// +// Version 2 interface adds new methods in a backward compatible way. +// In the next API breaking release it could be merged with the base interface. +type InterfaceV2 interface { + Interface + + // Update updates one or more resources or creates the resource if it doesn't exist. + // + // Force controls how to perform the update of a resource: + // + // force: false + // Patch a resource, if that fails due to an StatusReasonInvalid or StatusReasonConflict error, + // delete it and recreate it afterwards. + // force: true + // Delete and recreated without trying to patch it first. + // + // After deleting a resource poll and wait until resource was deleted, fails if server does not delete resource within timeout. + UpdateRecreate(original, target ResourceList, force bool, timeout time.Duration) (*Result, error) +} + +var _ InterfaceV2 = (*Client)(nil) diff --git a/scripts/completions.bash b/scripts/completions.bash new file mode 100644 index 000000000..e69de29bb