feat(helm): add recreate upgrade (rollback) strategy

An additional optional flag --recreate can be passed on upgrade (or rollback) of a release.
In combination with the --force flag the following strategies are employed when updating a resource (which can be directly compared to kubectl):

helm                        kubectl          action                         on 'invalid' or 'conflict'
--------------------------------------------------------------------------------------------------------------

upgrade                     apply            PATCH                          fail

upgrade --force             replace          PUT                            fail

upgrade --recreate          apply --force    PATCH                          DELETE -> GET (poll) -> POST

upgrade --recreate --force  replace --force  DELETE -> GET (poll) ->  POST  fail

The 'on error' column should be interpreted as follows. The server responds with 'invalid' e.g. if a certain resource contains an immutable field, which cannot be patched or updated by a PUT. In theses cases it can be helpful to delete the resource and recreate it. Examples for theses cases are:

 * roleRef in RoleBinding
 * spec.clusterIP in Service
 * parameters in StorageClass

Closes #7082.

Signed-off-by: Daniel Strobusch <1847260+dastrobu@users.noreply.github.com>
pull/9836/head
Daniel Strobusch 6 years ago committed by Mateusz Gozdek
parent 179f90151d
commit 4fdcedc172
No known key found for this signature in database
GPG Key ID: 9F8CD59A18F6E996

@ -78,7 +78,8 @@ func newRollbackCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
f := cmd.Flags() f := cmd.Flags()
f.BoolVar(&client.DryRun, "dry-run", false, "simulate a rollback") f.BoolVar(&client.DryRun, "dry-run", false, "simulate a rollback")
f.BoolVar(&client.Recreate, "recreate-pods", false, "performs pods restart for the resource if applicable") f.BoolVar(&client.Recreate, "recreate-pods", false, "performs pods restart for the resource if applicable")
f.BoolVar(&client.Force, "force", false, "force resource update through delete/recreate if needed") f.BoolVar(&client.Force, "force", false, "force resources to be replaced by rendered templates on update")
f.BoolVar(&client.RecreateResources, "recreate", false, "force resources to be replaced by rendered templates on update")
f.BoolVar(&client.DisableHooks, "no-hooks", false, "prevent hooks from running during rollback") f.BoolVar(&client.DisableHooks, "no-hooks", false, "prevent hooks from running during rollback")
f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)") f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)")
f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout") f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout")

@ -194,7 +194,8 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
f.BoolVar(&client.DryRun, "dry-run", false, "simulate an upgrade") f.BoolVar(&client.DryRun, "dry-run", false, "simulate an upgrade")
f.BoolVar(&client.Recreate, "recreate-pods", false, "performs pods restart for the resource if applicable") f.BoolVar(&client.Recreate, "recreate-pods", false, "performs pods restart for the resource if applicable")
f.MarkDeprecated("recreate-pods", "functionality will no longer be updated. Consult the documentation for other methods to recreate pods") f.MarkDeprecated("recreate-pods", "functionality will no longer be updated. Consult the documentation for other methods to recreate pods")
f.BoolVar(&client.Force, "force", false, "force resource updates through a replacement strategy") f.BoolVar(&client.Force, "force", false, "force resources to be replaced by rendered templates on update")
f.BoolVar(&client.RecreateResources, "recreate", false, "force resources to be replaced by rendered templates on update")
f.BoolVar(&client.DisableHooks, "no-hooks", false, "disable pre/post upgrade hooks") f.BoolVar(&client.DisableHooks, "no-hooks", false, "disable pre/post upgrade hooks")
f.BoolVar(&client.DisableOpenAPIValidation, "disable-openapi-validation", false, "if set, the upgrade process will not validate rendered templates against the Kubernetes OpenAPI Schema") f.BoolVar(&client.DisableOpenAPIValidation, "disable-openapi-validation", false, "if set, the upgrade process will not validate rendered templates against the Kubernetes OpenAPI Schema")
f.BoolVar(&client.SkipCRDs, "skip-crds", false, "if set, no CRDs will be installed when an upgrade is performed with install flag enabled. By default, CRDs are installed if not already present, when an upgrade is performed with install flag enabled") f.BoolVar(&client.SkipCRDs, "skip-crds", false, "if set, no CRDs will be installed when an upgrade is performed with install flag enabled. By default, CRDs are installed if not already present, when an upgrade is performed with install flag enabled")

@ -88,6 +88,9 @@ type Configuration struct {
// KubeClient is a Kubernetes API client. // KubeClient is a Kubernetes API client.
KubeClient kube.Interface KubeClient kube.Interface
// KubeClient is a Kubernetes API client (version 2)
KubeClientV2 kube.InterfaceV2
// RegistryClient is a client for working with registries // RegistryClient is a client for working with registries
RegistryClient *registry.Client RegistryClient *registry.Client
@ -413,6 +416,7 @@ func (cfg *Configuration) Init(getter genericclioptions.RESTClientGetter, namesp
cfg.RESTClientGetter = getter cfg.RESTClientGetter = getter
cfg.KubeClient = kc cfg.KubeClient = kc
cfg.KubeClientV2 = kc
cfg.Releases = store cfg.Releases = store
cfg.Log = log cfg.Log = log

@ -24,7 +24,10 @@ import (
"path/filepath" "path/filepath"
"testing" "testing"
"helm.sh/helm/v3/pkg/cli"
dockerauth "github.com/deislabs/oras/pkg/auth/docker" dockerauth "github.com/deislabs/oras/pkg/auth/docker"
"github.com/stretchr/testify/assert"
fakeclientset "k8s.io/client-go/kubernetes/fake" fakeclientset "k8s.io/client-go/kubernetes/fake"
"helm.sh/helm/v3/internal/experimental/registry" "helm.sh/helm/v3/internal/experimental/registry"
@ -80,9 +83,11 @@ func actionConfigFixture(t *testing.T) *Configuration {
t.Fatal(err) t.Fatal(err)
} }
kubeClient := kubefake.FailingKubeClient{PrintingKubeClient: kubefake.PrintingKubeClient{Out: ioutil.Discard}}
return &Configuration{ return &Configuration{
Releases: storage.Init(driver.NewMemory()), Releases: storage.Init(driver.NewMemory()),
KubeClient: &kubefake.FailingKubeClient{PrintingKubeClient: kubefake.PrintingKubeClient{Out: ioutil.Discard}}, KubeClient: &kubeClient,
KubeClientV2: &kubeClient,
Capabilities: chartutil.DefaultCapabilities, Capabilities: chartutil.DefaultCapabilities,
RegistryClient: registryClient, RegistryClient: registryClient,
Log: func(format string, v ...interface{}) { Log: func(format string, v ...interface{}) {
@ -319,3 +324,15 @@ func TestGetVersionSet(t *testing.T) {
t.Error("Non-existent version is reported found.") t.Error("Non-existent version is reported found.")
} }
} }
func TestKubeClientSet(t *testing.T) {
is := assert.New(t)
config := new(Configuration)
settings := cli.New()
if err := config.Init(settings.RESTClientGetter(), settings.Namespace(), os.Getenv("HELM_DRIVER"), func(_ string, v ...interface{}) {}); err != nil {
t.Error(err)
}
is.NotNil(config.KubeClient, "KubeClient not set")
is.NotNil(config.KubeClientV2, "KubeClientV2 not set")
is.Equal(config.KubeClient, config.KubeClientV2, "KubeClientV2 not set")
}

@ -22,6 +22,8 @@ import (
"strings" "strings"
"time" "time"
"helm.sh/helm/v3/pkg/kube"
"github.com/pkg/errors" "github.com/pkg/errors"
"helm.sh/helm/v3/pkg/chartutil" "helm.sh/helm/v3/pkg/chartutil"
@ -35,16 +37,20 @@ import (
type Rollback struct { type Rollback struct {
cfg *Configuration cfg *Configuration
Version int Version int
Timeout time.Duration Timeout time.Duration
Wait bool Wait bool
WaitForJobs bool WaitForJobs bool
DisableHooks bool DisableHooks bool
DryRun bool DryRun bool
Recreate bool // will (if true) recreate pods after a rollback. // Recreate will (if true) recreate pods after a rollback. (not to be confused with RecreateResources)
Force bool // will (if true) force resource upgrade through uninstall/recreate if needed Recreate bool
CleanupOnFail bool // recreate resources on update
MaxHistory int // MaxHistory limits the maximum number of revisions saved per release // for compatibility reasons this field cannot be named "Recreate", since "Recreate" is referring to the "recreate-pods" flag.
RecreateResources bool
Force bool // will (if true) force resource upgrade through uninstall/recreate if needed
CleanupOnFail bool
MaxHistory int // MaxHistory limits the maximum number of revisions saved per release
} }
// NewRollback creates a new Rollback object with the given configuration. // NewRollback creates a new Rollback object with the given configuration.
@ -164,9 +170,18 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
r.cfg.Log("rollback hooks disabled for %s", targetRelease.Name) r.cfg.Log("rollback hooks disabled for %s", targetRelease.Name)
} }
results, err := r.cfg.KubeClient.Update(current, target, r.Force) var result *kube.Result
kubeClientV2 := r.cfg.KubeClientV2
if err != nil { switch {
case r.RecreateResources && kubeClientV2 != nil:
result, err = kubeClientV2.UpdateRecreate(current, target, r.Force, r.Timeout)
case r.RecreateResources:
r.cfg.Log("warning: kubeClient does not support recreate flag, ignoring it.")
fallthrough
default:
result, err = r.cfg.KubeClient.Update(current, target, r.Force)
}
if err != nil && result != nil {
msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err) msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err)
r.cfg.Log("warning: %s", msg) r.cfg.Log("warning: %s", msg)
currentRelease.Info.Status = release.StatusSuperseded currentRelease.Info.Status = release.StatusSuperseded
@ -175,8 +190,8 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
r.cfg.recordRelease(currentRelease) r.cfg.recordRelease(currentRelease)
r.cfg.recordRelease(targetRelease) r.cfg.recordRelease(targetRelease)
if r.CleanupOnFail { if r.CleanupOnFail {
r.cfg.Log("Cleanup on fail set, cleaning up %d resources", len(results.Created)) r.cfg.Log("Cleanup on fail set, cleaning up %d resources", len(result.Created))
_, errs := r.cfg.KubeClient.Delete(results.Created) _, errs := r.cfg.KubeClient.Delete(result.Created)
if errs != nil { if errs != nil {
var errorList []string var errorList []string
for _, e := range errs { for _, e := range errs {
@ -194,7 +209,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
// log if an error occurs and continue onward. If we ever introduce log // log if an error occurs and continue onward. If we ever introduce log
// levels, we should make these error level logs so users are notified // levels, we should make these error level logs so users are notified
// that they'll need to go do the cleanup on their own // that they'll need to go do the cleanup on their own
if err := recreate(r.cfg, results.Updated); err != nil { if err := recreate(r.cfg, result.Updated); err != nil {
r.cfg.Log(err.Error()) r.cfg.Log(err.Error())
} }
} }

@ -0,0 +1,80 @@
/*
Copyright The Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package action
import (
"testing"
"github.com/stretchr/testify/assert"
kubefake "helm.sh/helm/v3/pkg/kube/fake"
"helm.sh/helm/v3/pkg/release"
)
func rollbackAction(t *testing.T) *Rollback {
config := actionConfigFixture(t)
rbAction := NewRollback(config)
return rbAction
}
func TestRollbackRelease_RecreateResources(t *testing.T) {
is := assert.New(t)
rbAction := rollbackAction(t)
rbAction.Timeout = 0
rbAction.RecreateResources = true
cfg := rbAction.cfg
kubeClient := cfg.KubeClientV2
prevRelease := releaseStub()
prevRelease.Info.Status = release.StatusSuperseded
prevRelease.Name = "my-release"
prevRelease.Version = 0
err := cfg.Releases.Create(prevRelease)
is.NoError(err)
currRelease := releaseStub()
currRelease.Info.Status = release.StatusDeployed
currRelease.Name = "my-release"
currRelease.Version = 1
err = cfg.Releases.Create(currRelease)
is.NoError(err)
t.Run("recreate should work when kubeClient and kubeClientV2 is set", func(t *testing.T) {
verifiableKubeClient := kubefake.NewKubeClientSpy(kubeClient)
cfg.KubeClient = verifiableKubeClient
cfg.KubeClientV2 = verifiableKubeClient
err := rbAction.Run(currRelease.Name)
is.NoError(err)
is.Equal(verifiableKubeClient.Calls["Update"], 0)
is.Equal(verifiableKubeClient.Calls["UpdateRecreate"], 1)
})
t.Run("recreate should fallback to Update when only kubeClient is set", func(t *testing.T) {
kubeClientSpy := kubefake.NewKubeClientSpy(kubeClient)
cfg.KubeClient = kubeClientSpy
cfg.KubeClientV2 = nil
err := rbAction.Run(currRelease.Name)
is.NoError(err)
is.Equal(kubeClientSpy.Calls["Update"], 1)
is.Equal(kubeClientSpy.Calls["UpdateRecreate"], 0)
})
}

@ -75,11 +75,14 @@ type Upgrade struct {
// //
// This should be used with caution. // This should be used with caution.
Force bool Force bool
// recreate resources on update
// for compatibility reasons this field cannot be named "Recreate", since "Recreate" is referring to the "recreate-pods" flag.
RecreateResources bool
// ResetValues will reset the values to the chart's built-ins rather than merging with existing. // ResetValues will reset the values to the chart's built-ins rather than merging with existing.
ResetValues bool ResetValues bool
// ReuseValues will re-use the user's last supplied values. // ReuseValues will re-use the user's last supplied values.
ReuseValues bool ReuseValues bool
// Recreate will (if true) recreate pods after a rollback. // Recreate will (if true) recreate pods after a upgrade. (not to be confused with RecreateResources).
Recreate bool Recreate bool
// MaxHistory limits the maximum number of revisions saved per release // MaxHistory limits the maximum number of revisions saved per release
MaxHistory int MaxHistory int
@ -316,10 +319,20 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea
u.cfg.Log("upgrade hooks disabled for %s", upgradedRelease.Name) u.cfg.Log("upgrade hooks disabled for %s", upgradedRelease.Name)
} }
results, err := u.cfg.KubeClient.Update(current, target, u.Force) var result *kube.Result
if err != nil { kubeClientV2 := u.cfg.KubeClientV2
switch {
case u.RecreateResources && kubeClientV2 != nil:
result, err = kubeClientV2.UpdateRecreate(current, target, u.Force, u.Timeout)
case u.RecreateResources:
u.cfg.Log("warning: kubeClient does not support recreate flag, ignoring it.")
fallthrough
default:
result, err = u.cfg.KubeClient.Update(current, target, u.Force)
}
if err != nil && result != nil {
u.cfg.recordRelease(originalRelease) u.cfg.recordRelease(originalRelease)
return u.failRelease(upgradedRelease, results.Created, err) return u.failRelease(upgradedRelease, result.Created, err)
} }
if u.Recreate { if u.Recreate {
@ -327,7 +340,7 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea
// log if an error occurs and continue onward. If we ever introduce log // log if an error occurs and continue onward. If we ever introduce log
// levels, we should make these error level logs so users are notified // levels, we should make these error level logs so users are notified
// that they'll need to go do the cleanup on their own // that they'll need to go do the cleanup on their own
if err := recreate(u.cfg, results.Updated); err != nil { if err := recreate(u.cfg, result.Updated); err != nil {
u.cfg.Log(err.Error()) u.cfg.Log(err.Error())
} }
} }
@ -336,12 +349,12 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea
if u.WaitForJobs { if u.WaitForJobs {
if err := u.cfg.KubeClient.WaitWithJobs(target, u.Timeout); err != nil { if err := u.cfg.KubeClient.WaitWithJobs(target, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease) u.cfg.recordRelease(originalRelease)
return u.failRelease(upgradedRelease, results.Created, err) return u.failRelease(upgradedRelease, result.Created, err)
} }
} else { } else {
if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil { if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease) u.cfg.recordRelease(originalRelease)
return u.failRelease(upgradedRelease, results.Created, err) return u.failRelease(upgradedRelease, result.Created, err)
} }
} }
} }
@ -349,7 +362,7 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea
// post-upgrade hooks // post-upgrade hooks
if !u.DisableHooks { if !u.DisableHooks {
if err := u.cfg.execHook(upgradedRelease, release.HookPostUpgrade, u.Timeout); err != nil { if err := u.cfg.execHook(upgradedRelease, release.HookPostUpgrade, u.Timeout); err != nil {
return u.failRelease(upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %s", err)) return u.failRelease(upgradedRelease, result.Created, fmt.Errorf("post-upgrade hooks failed: %s", err))
} }
} }

@ -296,3 +296,42 @@ func TestUpgradeRelease_Pending(t *testing.T) {
_, err := upAction.Run(rel.Name, buildChart(), vals) _, err := upAction.Run(rel.Name, buildChart(), vals)
req.Contains(err.Error(), "progress", err) req.Contains(err.Error(), "progress", err)
} }
func TestUpgradeRelease_RecreateResources(t *testing.T) {
is := assert.New(t)
upAction := upgradeAction(t)
upAction.Timeout = 0
upAction.RecreateResources = true
cfg := upAction.cfg
kubeClient := cfg.KubeClientV2
rel := releaseStub()
rel.Info.Status = release.StatusDeployed
rel.Name = "my-release"
err := cfg.Releases.Create(rel)
is.NoError(err)
t.Run("recreate should work when kubeClient and kubeClientV2 is set", func(t *testing.T) {
verifiableKubeClient := kubefake.NewKubeClientSpy(kubeClient)
cfg.KubeClient = verifiableKubeClient
cfg.KubeClientV2 = verifiableKubeClient
_, err := upAction.Run(rel.Name, buildChart(), map[string]interface{}{})
is.NoError(err)
is.Equal(verifiableKubeClient.Calls["Update"], 0)
is.Equal(verifiableKubeClient.Calls["UpdateRecreate"], 1)
})
t.Run("recreate should fallback to Update when only kubeClient is set", func(t *testing.T) {
kubeClientSpy := kubefake.NewKubeClientSpy(kubeClient)
cfg.KubeClient = kubeClientSpy
cfg.KubeClientV2 = nil
_, err := upAction.Run(rel.Name, buildChart(), map[string]interface{}{})
is.NoError(err)
is.Equal(kubeClientSpy.Calls["Update"], 1)
is.Equal(kubeClientSpy.Calls["UpdateRecreate"], 0)
})
}

@ -40,6 +40,7 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/resource" "k8s.io/cli-runtime/pkg/resource"
@ -197,7 +198,16 @@ func (c *Client) Build(reader io.Reader, validate bool) (ResourceList, error) {
// resource updates, creations, and deletions that were attempted. These can be // resource updates, creations, and deletions that were attempted. These can be
// used for cleanup or other logging purposes. // used for cleanup or other logging purposes.
func (c *Client) Update(original, target ResourceList, force bool) (*Result, error) { func (c *Client) Update(original, target ResourceList, force bool) (*Result, error) {
updateErrors := []string{} return c.update(target, original, force, false, 0)
}
// Like Update but using the recreate strategy on updating a resource
func (c *Client) UpdateRecreate(original, target ResourceList, force bool, timeout time.Duration) (*Result, error) {
return c.update(target, original, force, true, timeout)
}
func (c *Client) update(target ResourceList, original ResourceList, force bool, recreate bool, timeout time.Duration) (*Result, error) {
var updateErrors []string
res := &Result{} res := &Result{}
c.Log("checking %d resources for changes", len(target)) c.Log("checking %d resources for changes", len(target))
@ -231,7 +241,7 @@ func (c *Client) Update(original, target ResourceList, force bool) (*Result, err
return errors.Errorf("no %s with the name %q found", kind, info.Name) return errors.Errorf("no %s with the name %q found", kind, info.Name)
} }
if err := updateResource(c, info, originalInfo.Object, force); err != nil { if err := updateResource(c, info, originalInfo.Object, force, recreate, timeout); err != nil {
c.Log("error updating the resource %q:\n\t %v", info.Name, err) c.Log("error updating the resource %q:\n\t %v", info.Name, err)
updateErrors = append(updateErrors, err.Error()) updateErrors = append(updateErrors, err.Error())
} }
@ -441,47 +451,83 @@ func createPatch(target *resource.Info, current runtime.Object) ([]byte, types.P
return patch, types.StrategicMergePatchType, err return patch, types.StrategicMergePatchType, err
} }
func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, force bool) error { func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, force bool, recreate bool, timeout time.Duration) error {
var ( var (
obj runtime.Object obj runtime.Object
helper = resource.NewHelper(target.Client, target.Mapping) helper = resource.NewHelper(target.Client, target.Mapping)
kind = target.Mapping.GroupVersionKind.Kind kind = target.Mapping.GroupVersionKind.Kind
) )
// if --force is applied, attempt to replace the existing resource with the new object. patch, patchType, err := createPatch(target, currentObj)
if force { if err != nil {
var err error return errors.Wrap(err, "failed to create patch")
obj, err = helper.Replace(target.Namespace, target.Name, true, target.Object) }
if err != nil {
return errors.Wrap(err, "failed to replace object") if patch == nil || string(patch) == "{}" {
c.Log("Looks like there are no changes for %s %q", target.Mapping.GroupVersionKind.Kind, target.Name)
// This needs to happen to make sure that tiller has the latest info from the API
// Otherwise there will be no labels and other functions that use labels will panic
if err := target.Get(); err != nil {
return errors.Wrap(err, "failed to refresh resource information")
} }
c.Log("Replaced %q with kind %s for kind %s", target.Name, currentObj.GetObjectKind().GroupVersionKind().Kind, kind) return nil
} else { }
patch, patchType, err := createPatch(target, currentObj)
// update strategies:
// default PATCH
// --force PUT
// --recreate PATCH, on failure DELETE, POST
// --recreate --force DELETE, POST
switch {
case recreate && force:
err = c.deleteAndCreate(helper, target, timeout)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to create patch") return errors.Wrapf(err, "failed to recreate %q with kind %s", target.Name, kind)
} }
case recreate:
if patch == nil || string(patch) == "{}" { obj, err = helper.Patch(target.Namespace, target.Name, patchType, patch, nil)
c.Log("Looks like there are no changes for %s %q", target.Mapping.GroupVersionKind.Kind, target.Name) if err != nil {
// This needs to happen to make sure that Helm has the latest info from the API if apierrors.IsConflict(err) || apierrors.IsInvalid(err) {
// Otherwise there will be no labels and other functions that use labels will panic err = c.deleteAndCreate(helper, target, timeout)
if err := target.Get(); err != nil {
return errors.Wrap(err, "failed to refresh resource information")
} }
return nil return errors.Wrapf(err, "failed to recreate %q with kind %s", target.Name, kind)
}
case force:
obj, err = helper.Replace(target.Namespace, target.Name, true, target.Object)
if err != nil {
return errors.Wrapf(err, "failed to replace %q with kind %s", target.Name, kind)
} }
// send patch to server c.Log("Replaced %q with kind %s for kind %s\n", target.Name, currentObj.GetObjectKind().GroupVersionKind().Kind, kind)
default:
obj, err = helper.Patch(target.Namespace, target.Name, patchType, patch, nil) obj, err = helper.Patch(target.Namespace, target.Name, patchType, patch, nil)
if err != nil { if err != nil {
return errors.Wrapf(err, "cannot patch %q with kind %s", target.Name, kind) return errors.Wrapf(err, "cannot patch %q with kind %s", target.Name, kind)
} }
} }
target.Refresh(obj, true) if obj != nil {
return target.Refresh(obj, true)
}
return nil return nil
} }
func (c *Client) deleteAndCreate(helper *resource.Helper, target *resource.Info, timeout time.Duration) error {
if err := deleteResource(target); err != nil {
return err
}
if err := wait.PollImmediate(1*time.Second, timeout, func() (bool, error) {
if _, err := helper.Get(target.Namespace, target.Name); !apierrors.IsNotFound(err) {
return false, err
}
return true, nil
}); err != nil {
return err
}
return createResource(target)
}
func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error { func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error {
kind := info.Mapping.GroupVersionKind.Kind kind := info.Mapping.GroupVersionKind.Kind
switch kind { switch kind {

@ -23,7 +23,9 @@ import (
"net/http" "net/http"
"strings" "strings"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
@ -84,6 +86,36 @@ func notFoundBody() *metav1.Status {
} }
} }
func unprocessableEntityBody() *metav1.Status {
return &metav1.Status{
Code: http.StatusUnprocessableEntity,
Status: metav1.StatusFailure,
Reason: metav1.StatusReasonInvalid,
Message: "cannot change",
Details: &metav1.StatusDetails{},
}
}
func conflictEntityBody() *metav1.Status {
return &metav1.Status{
Code: http.StatusConflict,
Status: metav1.StatusFailure,
Reason: metav1.StatusReasonConflict,
Message: "conflict",
Details: &metav1.StatusDetails{},
}
}
func gatewayTimeoutEntityBody() *metav1.Status {
return &metav1.Status{
Code: http.StatusGatewayTimeout,
Status: metav1.StatusFailure,
Reason: metav1.StatusReasonTimeout,
Message: "timeout",
Details: &metav1.StatusDetails{},
}
}
func newResponse(code int, obj runtime.Object) (*http.Response, error) { func newResponse(code int, obj runtime.Object) (*http.Response, error) {
header := http.Header{} header := http.Header{}
header.Set("Content-Type", runtime.ContentTypeJSON) header.Set("Content-Type", runtime.ContentTypeJSON)
@ -303,6 +335,203 @@ func TestPerform(t *testing.T) {
} }
} }
func TestClient_UpdateRecreate(t *testing.T) {
is := assert.New(t)
// setup two pods with one diff that should be patched
listA := newPodList("starfish")
listB := newPodList("starfish")
listB.Items[0].Spec.Containers[0].Ports = []v1.ContainerPort{{Name: "https", ContainerPort: 443}}
// make sure there is a patchable difference
is.NotEqual(listA.Items[0].Spec.Containers[0].Ports, listB.Items[0].Spec.Containers[0].Ports)
okResponse := func() *http.Response {
res, _ := newResponse(http.StatusOK, &listA.Items[0])
return res
}
notFoundResponse := func() *http.Response {
res, _ := newResponse(http.StatusNotFound, notFoundBody())
return res
}
unprocessableEntityResponse := func() *http.Response {
res, _ := newResponse(http.StatusUnprocessableEntity, unprocessableEntityBody())
return res
}
conflictEntityResponse := func() *http.Response {
res, _ := newResponse(http.StatusConflict, conflictEntityBody())
return res
}
gatewayTimeoutResponse := func() *http.Response {
res, _ := newResponse(http.StatusGatewayTimeout, gatewayTimeoutEntityBody())
return res
}
tests := []struct {
name string
force bool
responses []*http.Response
finalResponseFactory func() *http.Response
expectedActions []string
fail bool
}{
{
name: "update recreate should delete and recreate when invalid on PATCH",
force: false,
responses: []*http.Response{
okResponse(), // GET
okResponse(), // GET
unprocessableEntityResponse(), // PATCH
okResponse(), // DELETE
notFoundResponse(), // GET
okResponse(), // POST
},
expectedActions: []string{
"GET",
"GET",
"PATCH",
"DELETE",
"GET",
"POST",
},
},
{
name: "update recreate should delete and recreate when conflict on PATCH",
force: false,
responses: []*http.Response{
okResponse(), // GET
okResponse(), // GET
conflictEntityResponse(), // PATCH
okResponse(), // DELETE
notFoundResponse(), // GET
okResponse(), // POST
},
expectedActions: []string{
"GET",
"GET",
"PATCH",
"DELETE",
"GET",
"POST",
},
},
{
name: "update recreate should delete and recreate when force flag is true",
force: true,
responses: []*http.Response{
okResponse(), // GET
okResponse(), // GET
okResponse(), // DELETE
notFoundResponse(), // GET
okResponse(), // POST
},
expectedActions: []string{
"GET",
"GET",
"DELETE",
"GET",
"POST",
},
},
{
name: "update recreate should fail when timeout on PATCH",
force: false,
responses: []*http.Response{
okResponse(), // GET
okResponse(), // GET
gatewayTimeoutResponse(), // PATCH
},
expectedActions: []string{
"GET",
"GET",
"PATCH",
},
fail: true,
},
{
name: "update recreate should fail when timing out after DELETE",
force: false,
responses: []*http.Response{
okResponse(), // GET
okResponse(), // GET
unprocessableEntityResponse(), // PATCH
okResponse(), // DELETE
},
// infinitely return OK on get, implying the server does not delete the resource within the given timeout
finalResponseFactory: okResponse, // GET
expectedActions: []string{
"GET",
"GET",
"PATCH",
"DELETE",
},
fail: true,
},
{
name: "update recreate force should fail when timing out after DELETE",
force: true,
responses: []*http.Response{
okResponse(), // GET
okResponse(), // GET
okResponse(), // DELETE
},
// infinitely return OK on get, implying the server does not delete the resource within the given timeout
finalResponseFactory: okResponse, // GET
expectedActions: []string{
"GET",
"GET",
"DELETE",
},
fail: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var actions []string
c := newTestClient(t)
c.Factory.(*cmdtesting.TestFactory).UnstructuredClient = &fake.RESTClient{
NegotiatedSerializer: unstructuredSerializer,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
p, m := req.URL.Path, req.Method
t.Logf("got request %s %s", p, m)
if len(tt.responses) == 0 {
if tt.finalResponseFactory != nil {
return tt.finalResponseFactory(), nil
}
t.Fatalf("unexpected request: %s %s", req.Method, req.URL.Path)
return nil, nil
}
// final responses are not added to actions, since it is unpredictable how often the final
// response will be returned on polling
actions = append(actions, m)
var response *http.Response
response, tt.responses = tt.responses[0], tt.responses[1:]
return response, nil
}),
}
first, err := c.Build(objBody(&listA), false)
if err != nil {
t.Fatal(err)
}
second, err := c.Build(objBody(&listB), false)
if err != nil {
t.Fatal(err)
}
// timeout should be larger than one second, to see actual polling with one second poll interval
_, err = c.UpdateRecreate(first, second, tt.force, 2*time.Second)
if (tt.fail && err == nil) || (!tt.fail && err != nil) {
t.Fatal(err)
}
t.Log(tt.expectedActions)
t.Log(actions)
is.ElementsMatch(tt.expectedActions, actions)
})
}
}
func TestReal(t *testing.T) { func TestReal(t *testing.T) {
t.Skip("This is a live test, comment this line to run") t.Skip("This is a live test, comment this line to run")
c := New(nil) c := New(nil)

@ -105,3 +105,13 @@ func (f *FailingKubeClient) WaitAndGetCompletedPodPhase(s string, d time.Duratio
} }
return f.PrintingKubeClient.WaitAndGetCompletedPodPhase(s, d) return f.PrintingKubeClient.WaitAndGetCompletedPodPhase(s, d)
} }
// UpdateRecreate returns the configured error if set or prints
func (f *FailingKubeClient) UpdateRecreate(r, modified kube.ResourceList, force bool, timeout time.Duration) (*kube.Result, error) {
if f.UpdateError != nil {
return &kube.Result{}, f.UpdateError
}
return f.PrintingKubeClient.UpdateRecreate(r, modified, force, timeout)
}
var _ kube.InterfaceV2 = (*FailingKubeClient)(nil)

@ -86,6 +86,11 @@ func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ bool) (*kub
return &kube.Result{Updated: modified}, nil return &kube.Result{Updated: modified}, nil
} }
// UpdateRecreate falls back to Update
func (p *PrintingKubeClient) UpdateRecreate(original, target kube.ResourceList, force bool, _ time.Duration) (*kube.Result, error) {
return p.Update(original, target, force)
}
// Build implements KubeClient Build. // Build implements KubeClient Build.
func (p *PrintingKubeClient) Build(_ io.Reader, _ bool) (kube.ResourceList, error) { func (p *PrintingKubeClient) Build(_ io.Reader, _ bool) (kube.ResourceList, error) {
return []*resource.Info{}, nil return []*resource.Info{}, nil
@ -103,3 +108,5 @@ func bufferize(resources kube.ResourceList) io.Reader {
} }
return strings.NewReader(builder.String()) return strings.NewReader(builder.String())
} }
var _ kube.InterfaceV2 = (*PrintingKubeClient)(nil)

@ -0,0 +1,102 @@
/*
Copyright The Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fake
import (
"io"
"runtime"
"strings"
"time"
v1 "k8s.io/api/core/v1"
"helm.sh/helm/v3/pkg/kube"
)
// KubeClient wrapper which can be used for testing to verify that a certain method has been called a certain
// number of times
type KubeClientSpy struct {
KubeClientV2 kube.InterfaceV2
// map with function names as keys and number of times it was called as names
Calls map[string]int
}
func NewKubeClientSpy(kubeClient kube.InterfaceV2) KubeClientSpy {
return KubeClientSpy{
KubeClientV2: kubeClient,
Calls: make(map[string]int),
}
}
func functionName() string {
pc := make([]uintptr, 15)
n := runtime.Callers(2, pc)
frames := runtime.CallersFrames(pc[:n])
frame, _ := frames.Next()
pathSegments := strings.Split(frame.Function, ".")
return pathSegments[len(pathSegments)-1]
}
func (v KubeClientSpy) Create(resources kube.ResourceList) (*kube.Result, error) {
v.Calls[functionName()]++
return v.KubeClientV2.Create(resources)
}
func (v KubeClientSpy) Wait(resources kube.ResourceList, timeout time.Duration) error {
v.Calls[functionName()]++
return v.KubeClientV2.Wait(resources, timeout)
}
func (v KubeClientSpy) Delete(resources kube.ResourceList) (*kube.Result, []error) {
v.Calls[functionName()]++
return v.KubeClientV2.Delete(resources)
}
func (v KubeClientSpy) WatchUntilReady(resources kube.ResourceList, timeout time.Duration) error {
v.Calls[functionName()]++
return v.KubeClientV2.WatchUntilReady(resources, timeout)
}
func (v KubeClientSpy) Update(original, target kube.ResourceList, force bool) (*kube.Result, error) {
v.Calls[functionName()]++
return v.KubeClientV2.Update(original, target, force)
}
func (v KubeClientSpy) Build(reader io.Reader, validate bool) (kube.ResourceList, error) {
v.Calls[functionName()]++
return v.KubeClientV2.Build(reader, validate)
}
func (v KubeClientSpy) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) {
v.Calls[functionName()]++
return v.KubeClientV2.WaitAndGetCompletedPodPhase(name, timeout)
}
func (v KubeClientSpy) IsReachable() error {
v.Calls[functionName()]++
return v.KubeClientV2.IsReachable()
}
func (v KubeClientSpy) UpdateRecreate(original, target kube.ResourceList, force bool, timeout time.Duration) (*kube.Result, error) {
v.Calls[functionName()]++
return v.KubeClientV2.UpdateRecreate(original, target, force, timeout)
}
func (v KubeClientSpy) WaitWithJobs(resources kube.ResourceList, timeout time.Duration) error {
v.Calls[functionName()]++
return v.KubeClientV2.WaitWithJobs(resources, timeout)
}

@ -65,4 +65,25 @@ type Interface interface {
IsReachable() error IsReachable() error
} }
var _ Interface = (*Client)(nil) // Extended Kubernetes client interface
//
// Version 2 interface adds new methods in a backward compatible way.
// In the next API breaking release it could be merged with the base interface.
type InterfaceV2 interface {
Interface
// Update updates one or more resources or creates the resource if it doesn't exist.
//
// Force controls how to perform the update of a resource:
//
// force: false
// Patch a resource, if that fails due to an StatusReasonInvalid or StatusReasonConflict error,
// delete it and recreate it afterwards.
// force: true
// Delete and recreated without trying to patch it first.
//
// After deleting a resource poll and wait until resource was deleted, fails if server does not delete resource within timeout.
UpdateRecreate(original, target ResourceList, force bool, timeout time.Duration) (*Result, error)
}
var _ InterfaceV2 = (*Client)(nil)

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save