From f600b30c7ac71d7d5bae47fb9322b4c2ab0eb18b Mon Sep 17 00:00:00 2001 From: Michelle Noorali Date: Fri, 5 Aug 2016 09:56:02 -0600 Subject: [PATCH] feat(kube): add update logic to kube client This commit adds an Update function to the client. The Update function takes in the modified manifests and the original manifests. It then iterates through the modified objects, creates objects not found in kubernetes, and updates objects that exists but have been modified. Finally, it iterates through the original resources and checks to see if they have been deleted in the modified configuration and then proceeds to delete them. #690 --- cmd/tiller/environment/environment.go | 15 ++ cmd/tiller/environment/environment_test.go | 3 + pkg/kube/client.go | 157 +++++++++++++++++++++ pkg/kube/client_test.go | 90 ++++++++++++ 4 files changed, 265 insertions(+) diff --git a/cmd/tiller/environment/environment.go b/cmd/tiller/environment/environment.go index 9a966ca8a..2a9d85370 100644 --- a/cmd/tiller/environment/environment.go +++ b/cmd/tiller/environment/environment.go @@ -161,6 +161,15 @@ type KubeClient interface { // For all other kinds, it means the kind was created or modified without // error. WatchUntilReady(namespace string, reader io.Reader) error + + // Update updates one or more resources or creates the resource + // if it doesn't exist + // + // namespace must contain a valid existing namespace + // + // reader must contain a YAML stream (one or more YAML documents separated + // by "\n---\n"). + Update(namespace string, originalReader, modifiedReader io.Reader) error } // PrintingKubeClient implements KubeClient, but simply prints the reader to @@ -189,6 +198,12 @@ func (p *PrintingKubeClient) WatchUntilReady(ns string, r io.Reader) error { return err } +// Update implements KubeClient Update. +func (p *PrintingKubeClient) Update(ns string, currentReader, modifiedReader io.Reader) error { + _, err := io.Copy(p.Out, modifiedReader) + return err +} + // Environment provides the context for executing a client request. // // All services in a context are concurrency safe. diff --git a/cmd/tiller/environment/environment_test.go b/cmd/tiller/environment/environment_test.go index cfcbc8ca4..ff82e2cc1 100644 --- a/cmd/tiller/environment/environment_test.go +++ b/cmd/tiller/environment/environment_test.go @@ -83,6 +83,9 @@ func (k *mockKubeClient) Create(ns string, r io.Reader) error { func (k *mockKubeClient) Delete(ns string, r io.Reader) error { return nil } +func (k *mockKubeClient) Update(ns string, currentReader, modifiedReader io.Reader) error { + return nil +} func (k *mockKubeClient) WatchUntilReady(ns string, r io.Reader) error { return nil } diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 4e0c51908..5fde74344 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -20,15 +20,21 @@ import ( "fmt" "io" "log" + "reflect" "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/kubectl" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/kubectl/resource" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/strategicpatch" + "k8s.io/kubernetes/pkg/util/yaml" "k8s.io/kubernetes/pkg/watch" ) @@ -57,6 +63,77 @@ func (c *Client) Create(namespace string, reader io.Reader) error { return perform(c, namespace, reader, createResource) } +// Update reads in the current configuration and a modified configuration from io.reader +// and creates resources that don't already exists, updates resources that have been modified +// and deletes resources from the current configuration that are not present in the +// modified configuration +// +// Namespace will set the namespaces +func (c *Client) Update(namespace string, currentReader, modifiedReader io.Reader) error { + current := c.NewBuilder(includeThirdPartyAPIs). + ContinueOnError(). + NamespaceParam(namespace). + DefaultNamespace(). + Stream(currentReader, ""). + Flatten(). + Do() + + modified := c.NewBuilder(includeThirdPartyAPIs). + ContinueOnError(). + NamespaceParam(namespace). + DefaultNamespace(). + Stream(modifiedReader, ""). + Flatten(). + Do() + + currentInfos, err := current.Infos() + if err != nil { + return err + } + + modifiedInfos := []*resource.Info{} + + modified.Visit(func(info *resource.Info, err error) error { + modifiedInfos = append(modifiedInfos, info) + if err != nil { + return err + } + resourceName := info.Name + + helper := resource.NewHelper(info.Client, info.Mapping) + if _, err := helper.Get(info.Namespace, resourceName, info.Export); err != nil { + if !errors.IsNotFound(err) { + return fmt.Errorf("Could not get information about the resource: err: %s", err) + } + + // Since the resource does not exist, create it. + if err := createResource(info); err != nil { + return err + } + + kind := info.Mapping.GroupVersionKind.Kind + log.Printf("Created a new %s called %s\n", kind, resourceName) + return nil + } + + currentObj, err := getCurrentObject(resourceName, currentInfos) + if err != nil { + return err + } + + if err := updateResource(info, currentObj); err != nil { + log.Printf("error updating the resource %s:\n\t %v", resourceName, err) + return err + } + + return err + }) + + deleteUnwantedResources(currentInfos, modifiedInfos) + + return nil +} + // Delete deletes kubernetes resources from an io.reader // // Namespace will set the namespace @@ -136,6 +213,52 @@ func createResource(info *resource.Info) error { return err } +func deleteResource(info *resource.Info) error { + return resource.NewHelper(info.Client, info.Mapping).Delete(info.Namespace, info.Name) +} + +func updateResource(modified *resource.Info, currentObj runtime.Object) error { + + encoder := api.Codecs.LegacyCodec(registered.EnabledVersions()...) + originalSerialization, err := runtime.Encode(encoder, currentObj) + if err != nil { + return err + } + + editedSerialization, err := runtime.Encode(encoder, modified.Object) + if err != nil { + return err + } + + originalJS, err := yaml.ToJSON(originalSerialization) + if err != nil { + return err + } + + editedJS, err := yaml.ToJSON(editedSerialization) + if err != nil { + return err + } + + if reflect.DeepEqual(originalJS, editedJS) { + return fmt.Errorf("Looks like there are no changes for %s", modified.Name) + } + + patch, err := strategicpatch.CreateStrategicMergePatch(originalJS, editedJS, currentObj) + if err != nil { + return err + } + + // send patch to server + helper := resource.NewHelper(modified.Client, modified.Mapping) + _, err = helper.Patch(modified.Namespace, modified.Name, api.StrategicMergePatchType, patch) + if err != nil { + return err + } + + return nil +} + func watchUntilReady(info *resource.Info) error { w, err := resource.NewHelper(info.Client, info.Mapping).WatchSingle(info.Namespace, info.Name, info.ResourceVersion) if err != nil { @@ -213,3 +336,37 @@ func (c *Client) ensureNamespace(namespace string) error { } return nil } + +func deleteUnwantedResources(currentInfos, modifiedInfos []*resource.Info) { + for _, cInfo := range currentInfos { + found := false + for _, m := range modifiedInfos { + if m.Name == cInfo.Name { + found = true + } + } + if !found { + log.Printf("Deleting %s...", cInfo.Name) + if err := deleteResource(cInfo); err != nil { + log.Printf("Failed to delete %s, err: %s", cInfo.Name, err) + } + } + } +} + +func getCurrentObject(targetName string, infos []*resource.Info) (runtime.Object, error) { + var curr *resource.Info + for _, currInfo := range infos { + if currInfo.Name == targetName { + curr = currInfo + } + } + + if curr == nil { + return nil, fmt.Errorf("No resource with the name %s found.", targetName) + } + + encoder := api.Codecs.LegacyCodec(registered.EnabledVersions()...) + defaultVersion := unversioned.GroupVersion{} + return resource.AsVersionedObject([]*resource.Info{curr}, false, defaultVersion, encoder) +} diff --git a/pkg/kube/client_test.go b/pkg/kube/client_test.go index 5ef8b913e..c59cc37b0 100644 --- a/pkg/kube/client_test.go +++ b/pkg/kube/client_test.go @@ -17,15 +17,55 @@ limitations under the License. package kube import ( + "bytes" + "encoding/json" "io" + "io/ioutil" + "net/http" "strings" "testing" "k8s.io/kubernetes/pkg/api/meta" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/api/unversioned" + api "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/unversioned/fake" "k8s.io/kubernetes/pkg/kubectl/resource" + "k8s.io/kubernetes/pkg/runtime" ) +func TestUpdateResource(t *testing.T) { + + tests := []struct { + name string + namespace string + modified *resource.Info + currentObj runtime.Object + err bool + errMessage string + }{ + { + name: "no changes when updating resources", + modified: createFakeInfo("nginx", nil), + currentObj: createFakePod("nginx", nil), + err: true, + errMessage: "Looks like there are no changes for nginx", + }, + //{ + //name: "valid update input", + //modified: createFakeInfo("nginx", map[string]string{"app": "nginx"}), + //currentObj: createFakePod("nginx", nil), + //}, + } + + for _, tt := range tests { + err := updateResource(tt.modified, tt.currentObj) + if err != nil && err.Error() != tt.errMessage { + t.Errorf("%q. expected error message: %v, got %v", tt.name, tt.errMessage, err) + } + } +} + func TestPerform(t *testing.T) { tests := []struct { name string @@ -214,3 +254,53 @@ spec: ports: - containerPort: 80 ` + +func createFakePod(name string, labels map[string]string) runtime.Object { + objectMeta := createObjectMeta(name, labels) + + object := &api.Pod{ + ObjectMeta: objectMeta, + } + + return object +} + +func createFakeInfo(name string, labels map[string]string) *resource.Info { + pod := createFakePod(name, labels) + marshaledObj, _ := json.Marshal(pod) + + mapping := &meta.RESTMapping{ + Resource: name, + Scope: meta.RESTScopeNamespace, + GroupVersionKind: unversioned.GroupVersionKind{ + Kind: "Pod", + Version: "v1", + }} + + client := &fake.RESTClient{ + Codec: testapi.Default.Codec(), + Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { + header := http.Header{} + header.Set("Content-Type", runtime.ContentTypeJSON) + return &http.Response{ + StatusCode: 200, + Header: header, + Body: ioutil.NopCloser(bytes.NewReader(marshaledObj)), + }, nil + })} + info := resource.NewInfo(client, mapping, "default", "nginx", false) + + info.Object = pod + + return info +} + +func createObjectMeta(name string, labels map[string]string) api.ObjectMeta { + objectMeta := api.ObjectMeta{Name: name, Namespace: "default"} + + if labels != nil { + objectMeta.Labels = labels + } + + return objectMeta +}