diff --git a/pkg/kube/client.go b/pkg/kube/client.go index e76ed5180..ac188a794 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -39,6 +39,11 @@ import ( "k8s.io/kubernetes/pkg/util/strategicpatch" "k8s.io/kubernetes/pkg/util/yaml" "k8s.io/kubernetes/pkg/watch" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/apis/apps/v1alpha1" + "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" ) // ErrNoObjectsVisited indicates that during a visit operation, no matching objects were found. @@ -216,7 +221,7 @@ func (c *Client) Update(namespace string, currentReader, targetReader io.Reader) return err } - if err := updateResource(info, currentObj); err != nil { + if err := updateResource(c, info, currentObj); err != nil { if alreadyExistErr, ok := err.(ErrAlreadyExists); ok { log.Printf(alreadyExistErr.errorMsg) } else { @@ -312,7 +317,7 @@ func deleteResource(info *resource.Info) error { return resource.NewHelper(info.Client, info.Mapping).Delete(info.Namespace, info.Name) } -func updateResource(target *resource.Info, currentObj runtime.Object) error { +func updateResource(c *Client, target *resource.Info, currentObj runtime.Object) error { encoder := api.Codecs.LegacyCodec(registered.EnabledVersions()...) originalSerialization, err := runtime.Encode(encoder, currentObj) @@ -347,9 +352,60 @@ func updateResource(target *resource.Info, currentObj runtime.Object) error { // send patch to server helper := resource.NewHelper(target.Client, target.Mapping) _, err = helper.Patch(target.Namespace, target.Name, api.StrategicMergePatchType, patch) + + if err != nil { + return err + } + + kind := target.Mapping.GroupVersionKind.Kind + + client, _ := c.Client() + switch kind { + case "ReplicationController": + rc := currentObj.(*v1.ReplicationController) + err = restartPods(client, target.Namespace, rc.Spec.Selector) + case "DaemonSet": + daemonSet := currentObj.(*v1beta1.DaemonSet) + err = restartPods(client, target.Namespace, daemonSet.Spec.Selector.MatchLabels) + case "PetSet": + petSet := currentObj.(*v1alpha1.PetSet) + err = restartPods(client, target.Namespace, petSet.Spec.Selector.MatchLabels) + } + return err } + +func restartPods(client *unversioned.Client, namespace string, selector map[string]string) error { + pods, err := client.Pods(namespace).List(api.ListOptions{ + FieldSelector: fields.Everything(), + LabelSelector: labels.Set(selector).AsSelector(), + }) + + if err != nil { + return err + } + + // Restart pods + for _, pod := range pods.Items { + log.Printf("Restarting pod: %v/%v", pod.Namespace, pod.Name) + + // Delete each pod for get them restarted with changed spec. + err := client.Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{ + Preconditions: &api.Preconditions{ + UID: &pod.UID, + }, + }) + + if err != nil { + return err + } + } + + return nil +} + + func watchUntilReady(info *resource.Info) error { w, err := resource.NewHelper(info.Client, info.Mapping).WatchSingle(info.Namespace, info.Name, info.ResourceVersion) if err != nil {