Merge pull request #1648 from nmakhotkin/restarting_pods

Adding pod restart during release upgrade/rollback
pull/1711/head
Matt Butcher 8 years ago committed by GitHub
commit ff9651be75

@ -188,6 +188,8 @@ message UpdateReleaseRequest {
bool dry_run = 4; bool dry_run = 4;
// DisableHooks causes the server to skip running any hooks for the upgrade. // DisableHooks causes the server to skip running any hooks for the upgrade.
bool disable_hooks = 5; bool disable_hooks = 5;
// Performs pods restart for resources if applicable
bool recreate = 6;
} }
// UpdateReleaseResponse is the response to an update request. // UpdateReleaseResponse is the response to an update request.
@ -204,6 +206,8 @@ message RollbackReleaseRequest {
bool disable_hooks = 3; bool disable_hooks = 3;
// Version is the version of the release to deploy. // Version is the version of the release to deploy.
int32 version = 4; int32 version = 4;
// Performs pods restart for resources if applicable
bool recreate = 5;
} }
// RollbackReleaseResponse is the response to an update request. // RollbackReleaseResponse is the response to an update request.

@ -35,6 +35,7 @@ type rollbackCmd struct {
name string name string
revision int32 revision int32
dryRun bool dryRun bool
restart bool
disableHooks bool disableHooks bool
out io.Writer out io.Writer
client helm.Interface client helm.Interface
@ -71,6 +72,7 @@ func newRollbackCmd(c helm.Interface, out io.Writer) *cobra.Command {
f := cmd.Flags() f := cmd.Flags()
f.BoolVar(&rollback.dryRun, "dry-run", false, "simulate a rollback") f.BoolVar(&rollback.dryRun, "dry-run", false, "simulate a rollback")
f.BoolVar(&rollback.restart, "restart", false, "performs pods restart for the resource if applicable")
f.BoolVar(&rollback.disableHooks, "no-hooks", false, "prevent hooks from running during rollback") f.BoolVar(&rollback.disableHooks, "no-hooks", false, "prevent hooks from running during rollback")
return cmd return cmd
@ -80,6 +82,7 @@ func (r *rollbackCmd) run() error {
_, err := r.client.RollbackRelease( _, err := r.client.RollbackRelease(
r.name, r.name,
helm.RollbackDryRun(r.dryRun), helm.RollbackDryRun(r.dryRun),
helm.RollbackRecreate(r.restart),
helm.RollbackDisableHooks(r.disableHooks), helm.RollbackDisableHooks(r.disableHooks),
helm.RollbackVersion(r.revision), helm.RollbackVersion(r.revision),
) )

@ -54,6 +54,7 @@ type upgradeCmd struct {
out io.Writer out io.Writer
client helm.Interface client helm.Interface
dryRun bool dryRun bool
restart bool
disableHooks bool disableHooks bool
valueFiles valueFiles valueFiles valueFiles
values string values string
@ -92,6 +93,7 @@ func newUpgradeCmd(client helm.Interface, out io.Writer) *cobra.Command {
f := cmd.Flags() f := cmd.Flags()
f.VarP(&upgrade.valueFiles, "values", "f", "specify values in a YAML file (can specify multiple)") f.VarP(&upgrade.valueFiles, "values", "f", "specify values in a YAML file (can specify multiple)")
f.BoolVar(&upgrade.dryRun, "dry-run", false, "simulate an upgrade") f.BoolVar(&upgrade.dryRun, "dry-run", false, "simulate an upgrade")
f.BoolVar(&upgrade.restart, "restart", false, "performs pods restart for the resource if applicable")
f.StringVar(&upgrade.values, "set", "", "set values on the command line. Separate values with commas: key1=val1,key2=val2") f.StringVar(&upgrade.values, "set", "", "set values on the command line. Separate values with commas: key1=val1,key2=val2")
f.BoolVar(&upgrade.disableHooks, "disable-hooks", false, "disable pre/post upgrade hooks. DEPRECATED. Use no-hooks") f.BoolVar(&upgrade.disableHooks, "disable-hooks", false, "disable pre/post upgrade hooks. DEPRECATED. Use no-hooks")
f.BoolVar(&upgrade.disableHooks, "no-hooks", false, "disable pre/post upgrade hooks") f.BoolVar(&upgrade.disableHooks, "no-hooks", false, "disable pre/post upgrade hooks")
@ -149,6 +151,7 @@ func (u *upgradeCmd) run() error {
chartPath, chartPath,
helm.UpdateValueOverrides(rawVals), helm.UpdateValueOverrides(rawVals),
helm.UpgradeDryRun(u.dryRun), helm.UpgradeDryRun(u.dryRun),
helm.UpgradeRecreate(u.restart),
helm.UpgradeDisableHooks(u.disableHooks)) helm.UpgradeDisableHooks(u.disableHooks))
if err != nil { if err != nil {
return fmt.Errorf("UPGRADE FAILED: %v", prettyError(err)) return fmt.Errorf("UPGRADE FAILED: %v", prettyError(err))

@ -133,6 +133,7 @@ func (h *Client) UpdateRelease(rlsName string, chstr string, opts ...UpdateOptio
req.DryRun = h.opts.dryRun req.DryRun = h.opts.dryRun
req.Name = rlsName req.Name = rlsName
req.DisableHooks = h.opts.disableHooks req.DisableHooks = h.opts.disableHooks
req.Restart = h.opts.recreate
ctx := NewContext() ctx := NewContext()
if h.opts.before != nil { if h.opts.before != nil {

@ -40,6 +40,8 @@ type options struct {
dryRun bool dryRun bool
// if set, re-use an existing name // if set, re-use an existing name
reuseName bool reuseName bool
// if set, performs pod restart during upgrade/rollback
recreate bool
// if set, skip running hooks // if set, skip running hooks
disableHooks bool disableHooks bool
// name of release // name of release
@ -212,6 +214,13 @@ func RollbackDryRun(dry bool) RollbackOption {
} }
} }
// RollbackRecreate will (if true) recreate pods after rollback.
func RollbackRecreate(recreate bool) RollbackOption {
return func(opts *options) {
opts.recreate = recreate
}
}
// RollbackVersion sets the version of the release to deploy. // RollbackVersion sets the version of the release to deploy.
func RollbackVersion(ver int32) RollbackOption { func RollbackVersion(ver int32) RollbackOption {
return func(opts *options) { return func(opts *options) {
@ -233,6 +242,13 @@ func UpgradeDryRun(dry bool) UpdateOption {
} }
} }
// UpgradeRecreate will (if true) recreate pods after upgrade.
func UpgradeRecreate(recreate bool) UpdateOption {
return func(opts *options) {
opts.recreate = recreate
}
}
// ContentOption allows setting optional attributes when // ContentOption allows setting optional attributes when
// performing a GetReleaseContent tiller rpc. // performing a GetReleaseContent tiller rpc.
type ContentOption func(*options) type ContentOption func(*options)

@ -27,12 +27,18 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apimachinery/registered"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubectl"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/resource" "k8s.io/kubernetes/pkg/kubectl/resource"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/strategicpatch" "k8s.io/kubernetes/pkg/util/strategicpatch"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
@ -158,7 +164,7 @@ func (c *Client) Get(namespace string, reader io.Reader) (string, error) {
// not present in the target configuration // not present in the target configuration
// //
// Namespace will set the namespaces // Namespace will set the namespaces
func (c *Client) Update(namespace string, currentReader, targetReader io.Reader) error { func (c *Client) Update(namespace string, currentReader, targetReader io.Reader, recreate bool) error {
currentInfos, err := c.newBuilder(namespace, currentReader).Do().Infos() currentInfos, err := c.newBuilder(namespace, currentReader).Do().Infos()
if err != nil { if err != nil {
return fmt.Errorf("failed decoding reader into objects: %s", err) return fmt.Errorf("failed decoding reader into objects: %s", err)
@ -199,7 +205,7 @@ func (c *Client) Update(namespace string, currentReader, targetReader io.Reader)
return err return err
} }
if err := updateResource(info, currentObj); err != nil { if err := updateResource(c, info, currentObj, recreate); err != nil {
if alreadyExistErr, ok := err.(ErrAlreadyExists); ok { if alreadyExistErr, ok := err.(ErrAlreadyExists); ok {
log.Printf(alreadyExistErr.errorMsg) log.Printf(alreadyExistErr.errorMsg)
} else { } else {
@ -295,7 +301,7 @@ func deleteResource(info *resource.Info) error {
return resource.NewHelper(info.Client, info.Mapping).Delete(info.Namespace, info.Name) return resource.NewHelper(info.Client, info.Mapping).Delete(info.Namespace, info.Name)
} }
func updateResource(target *resource.Info, currentObj runtime.Object) error { func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, recreate bool) error {
encoder := api.Codecs.LegacyCodec(registered.EnabledVersions()...) encoder := api.Codecs.LegacyCodec(registered.EnabledVersions()...)
original, err := runtime.Encode(encoder, currentObj) original, err := runtime.Encode(encoder, currentObj)
if err != nil { if err != nil {
@ -319,9 +325,63 @@ func updateResource(target *resource.Info, currentObj runtime.Object) error {
// send patch to server // send patch to server
helper := resource.NewHelper(target.Client, target.Mapping) helper := resource.NewHelper(target.Client, target.Mapping)
_, err = helper.Patch(target.Namespace, target.Name, api.StrategicMergePatchType, patch) _, err = helper.Patch(target.Namespace, target.Name, api.StrategicMergePatchType, patch)
if err != nil {
return err
}
if recreate {
kind := target.Mapping.GroupVersionKind.Kind
client, _ := c.ClientSet()
switch kind {
case "ReplicationController":
rc := currentObj.(*v1.ReplicationController)
err = recreatePods(client, target.Namespace, rc.Spec.Selector)
case "DaemonSet":
daemonSet := currentObj.(*v1beta1.DaemonSet)
err = recreatePods(client, target.Namespace, daemonSet.Spec.Selector.MatchLabels)
case "StatefulSet":
petSet := currentObj.(*apps.StatefulSet)
err = recreatePods(client, target.Namespace, petSet.Spec.Selector.MatchLabels)
case "ReplicaSet":
replicaSet := currentObj.(*v1beta1.ReplicaSet)
err = recreatePods(client, target.Namespace, replicaSet.Spec.Selector.MatchLabels)
}
}
return err return err
} }
func recreatePods(client *internalclientset.Clientset, namespace string, selector map[string]string) error {
pods, err := client.Pods(namespace).List(api.ListOptions{
FieldSelector: fields.Everything(),
LabelSelector: labels.Set(selector).AsSelector(),
})
if err != nil {
return err
}
// Restart pods
for _, pod := range pods.Items {
log.Printf("Restarting pod: %v/%v", pod.Namespace, pod.Name)
// Delete each pod for get them restarted with changed spec.
err := client.Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{
Preconditions: &api.Preconditions{
UID: &pod.UID,
},
})
if err != nil {
return err
}
}
return nil
}
func watchUntilReady(info *resource.Info) error { func watchUntilReady(info *resource.Info) error {
w, err := resource.NewHelper(info.Client, info.Mapping).WatchSingle(info.Namespace, info.Name, info.ResourceVersion) w, err := resource.NewHelper(info.Client, info.Mapping).WatchSingle(info.Namespace, info.Name, info.ResourceVersion)
if err != nil { if err != nil {

@ -119,7 +119,7 @@ func TestUpdate(t *testing.T) {
} }
c := &Client{Factory: f} c := &Client{Factory: f}
if err := c.Update("test", objBody(codec, &listA), objBody(codec, &listB)); err != nil { if err := c.Update("test", objBody(codec, &listA), objBody(codec, &listB), false); err != nil {
t.Fatal(err) t.Fatal(err)
} }

@ -246,6 +246,8 @@ type UpdateReleaseRequest struct {
DryRun bool `protobuf:"varint,4,opt,name=dry_run,json=dryRun" json:"dry_run,omitempty"` DryRun bool `protobuf:"varint,4,opt,name=dry_run,json=dryRun" json:"dry_run,omitempty"`
// DisableHooks causes the server to skip running any hooks for the upgrade. // DisableHooks causes the server to skip running any hooks for the upgrade.
DisableHooks bool `protobuf:"varint,5,opt,name=disable_hooks,json=disableHooks" json:"disable_hooks,omitempty"` DisableHooks bool `protobuf:"varint,5,opt,name=disable_hooks,json=disableHooks" json:"disable_hooks,omitempty"`
// Performs pods restart for resources if applicable
Restart bool `protobuf:"varint,6,opt,name=restart,json=restart" json:"restart,omitempty"`
} }
func (m *UpdateReleaseRequest) Reset() { *m = UpdateReleaseRequest{} } func (m *UpdateReleaseRequest) Reset() { *m = UpdateReleaseRequest{} }
@ -293,6 +295,8 @@ type RollbackReleaseRequest struct {
DisableHooks bool `protobuf:"varint,3,opt,name=disable_hooks,json=disableHooks" json:"disable_hooks,omitempty"` DisableHooks bool `protobuf:"varint,3,opt,name=disable_hooks,json=disableHooks" json:"disable_hooks,omitempty"`
// Version is the version of the release to deploy. // Version is the version of the release to deploy.
Version int32 `protobuf:"varint,4,opt,name=version" json:"version,omitempty"` Version int32 `protobuf:"varint,4,opt,name=version" json:"version,omitempty"`
// Performs pods restart for resources if applicable
Restart bool `protobuf:"varint,5,opt,name=restart,json=restart" json:"restart,omitempty"`
} }
func (m *RollbackReleaseRequest) Reset() { *m = RollbackReleaseRequest{} } func (m *RollbackReleaseRequest) Reset() { *m = RollbackReleaseRequest{} }

@ -131,7 +131,7 @@ type KubeClient interface {
// //
// reader must contain a YAML stream (one or more YAML documents separated // reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n"). // by "\n---\n").
Update(namespace string, originalReader, modifiedReader io.Reader) error Update(namespace string, originalReader, modifiedReader io.Reader, recreate bool) error
} }
// PrintingKubeClient implements KubeClient, but simply prints the reader to // PrintingKubeClient implements KubeClient, but simply prints the reader to
@ -167,7 +167,7 @@ func (p *PrintingKubeClient) WatchUntilReady(ns string, r io.Reader) error {
} }
// Update implements KubeClient Update. // Update implements KubeClient Update.
func (p *PrintingKubeClient) Update(ns string, currentReader, modifiedReader io.Reader) error { func (p *PrintingKubeClient) Update(ns string, currentReader, modifiedReader io.Reader, recreate bool) error {
_, err := io.Copy(p.Out, modifiedReader) _, err := io.Copy(p.Out, modifiedReader)
return err return err
} }

@ -44,7 +44,7 @@ func (k *mockKubeClient) Get(ns string, r io.Reader) (string, error) {
func (k *mockKubeClient) Delete(ns string, r io.Reader) error { func (k *mockKubeClient) Delete(ns string, r io.Reader) error {
return nil return nil
} }
func (k *mockKubeClient) Update(ns string, currentReader, modifiedReader io.Reader) error { func (k *mockKubeClient) Update(ns string, currentReader, modifiedReader io.Reader, recreate bool) error {
return nil return nil
} }
func (k *mockKubeClient) WatchUntilReady(ns string, r io.Reader) error { func (k *mockKubeClient) WatchUntilReady(ns string, r io.Reader) error {

@ -340,7 +340,7 @@ func (s *ReleaseServer) performUpdate(originalRelease, updatedRelease *release.R
} }
} }
if err := s.performKubeUpdate(originalRelease, updatedRelease); err != nil { if err := s.performKubeUpdate(originalRelease, updatedRelease, req.Restart); err != nil {
log.Printf("warning: Release Upgrade %q failed: %s", updatedRelease.Name, err) log.Printf("warning: Release Upgrade %q failed: %s", updatedRelease.Name, err)
originalRelease.Info.Status.Code = release.Status_SUPERSEDED originalRelease.Info.Status.Code = release.Status_SUPERSEDED
updatedRelease.Info.Status.Code = release.Status_FAILED updatedRelease.Info.Status.Code = release.Status_FAILED
@ -478,7 +478,7 @@ func (s *ReleaseServer) performRollback(currentRelease, targetRelease *release.R
} }
} }
if err := s.performKubeUpdate(currentRelease, targetRelease); err != nil { if err := s.performKubeUpdate(currentRelease, targetRelease, req.Restart); err != nil {
log.Printf("warning: Release Rollback %q failed: %s", targetRelease.Name, err) log.Printf("warning: Release Rollback %q failed: %s", targetRelease.Name, err)
currentRelease.Info.Status.Code = release.Status_SUPERSEDED currentRelease.Info.Status.Code = release.Status_SUPERSEDED
targetRelease.Info.Status.Code = release.Status_FAILED targetRelease.Info.Status.Code = release.Status_FAILED
@ -502,11 +502,11 @@ func (s *ReleaseServer) performRollback(currentRelease, targetRelease *release.R
return res, nil return res, nil
} }
func (s *ReleaseServer) performKubeUpdate(currentRelease, targetRelease *release.Release) error { func (s *ReleaseServer) performKubeUpdate(currentRelease, targetRelease *release.Release, recreate bool) error {
kubeCli := s.env.KubeClient kubeCli := s.env.KubeClient
current := bytes.NewBufferString(currentRelease.Manifest) current := bytes.NewBufferString(currentRelease.Manifest)
target := bytes.NewBufferString(targetRelease.Manifest) target := bytes.NewBufferString(targetRelease.Manifest)
return kubeCli.Update(targetRelease.Namespace, current, target) return kubeCli.Update(targetRelease.Namespace, current, target, recreate)
} }
// prepareRollback finds the previous release and prepares a new release object with // prepareRollback finds the previous release and prepares a new release object with
@ -831,7 +831,7 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install
// so as to append to the old release's history // so as to append to the old release's history
r.Version = old.Version + 1 r.Version = old.Version + 1
if err := s.performKubeUpdate(old, r); err != nil { if err := s.performKubeUpdate(old, r, false); err != nil {
log.Printf("warning: Release replace %q failed: %s", r.Name, err) log.Printf("warning: Release replace %q failed: %s", r.Name, err)
old.Info.Status.Code = release.Status_SUPERSEDED old.Info.Status.Code = release.Status_SUPERSEDED
r.Info.Status.Code = release.Status_FAILED r.Info.Status.Code = release.Status_FAILED

@ -1361,7 +1361,7 @@ type updateFailingKubeClient struct {
environment.PrintingKubeClient environment.PrintingKubeClient
} }
func (u *updateFailingKubeClient) Update(namespace string, originalReader, modifiedReader io.Reader) error { func (u *updateFailingKubeClient) Update(namespace string, originalReader, modifiedReader io.Reader, recreate bool) error {
return errors.New("Failed update in kube client") return errors.New("Failed update in kube client")
} }

Loading…
Cancel
Save