diff --git a/cmd/tiller/environment/environment.go b/cmd/tiller/environment/environment.go index 9a966ca8a..2a9d85370 100644 --- a/cmd/tiller/environment/environment.go +++ b/cmd/tiller/environment/environment.go @@ -161,6 +161,15 @@ type KubeClient interface { // For all other kinds, it means the kind was created or modified without // error. WatchUntilReady(namespace string, reader io.Reader) error + + // Update updates one or more resources or creates the resource + // if it doesn't exist + // + // namespace must contain a valid existing namespace + // + // reader must contain a YAML stream (one or more YAML documents separated + // by "\n---\n"). + Update(namespace string, originalReader, modifiedReader io.Reader) error } // PrintingKubeClient implements KubeClient, but simply prints the reader to @@ -189,6 +198,12 @@ func (p *PrintingKubeClient) WatchUntilReady(ns string, r io.Reader) error { return err } +// Update implements KubeClient Update. +func (p *PrintingKubeClient) Update(ns string, currentReader, modifiedReader io.Reader) error { + _, err := io.Copy(p.Out, modifiedReader) + return err +} + // Environment provides the context for executing a client request. // // All services in a context are concurrency safe. diff --git a/cmd/tiller/environment/environment_test.go b/cmd/tiller/environment/environment_test.go index cfcbc8ca4..ff82e2cc1 100644 --- a/cmd/tiller/environment/environment_test.go +++ b/cmd/tiller/environment/environment_test.go @@ -83,6 +83,9 @@ func (k *mockKubeClient) Create(ns string, r io.Reader) error { func (k *mockKubeClient) Delete(ns string, r io.Reader) error { return nil } +func (k *mockKubeClient) Update(ns string, currentReader, modifiedReader io.Reader) error { + return nil +} func (k *mockKubeClient) WatchUntilReady(ns string, r io.Reader) error { return nil } diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 4e0c51908..5fde74344 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -20,15 +20,21 @@ import ( "fmt" "io" "log" + "reflect" "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/kubectl" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/kubectl/resource" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/strategicpatch" + "k8s.io/kubernetes/pkg/util/yaml" "k8s.io/kubernetes/pkg/watch" ) @@ -57,6 +63,77 @@ func (c *Client) Create(namespace string, reader io.Reader) error { return perform(c, namespace, reader, createResource) } +// Update reads in the current configuration and a modified configuration from io.reader +// and creates resources that don't already exists, updates resources that have been modified +// and deletes resources from the current configuration that are not present in the +// modified configuration +// +// Namespace will set the namespaces +func (c *Client) Update(namespace string, currentReader, modifiedReader io.Reader) error { + current := c.NewBuilder(includeThirdPartyAPIs). + ContinueOnError(). + NamespaceParam(namespace). + DefaultNamespace(). + Stream(currentReader, ""). + Flatten(). + Do() + + modified := c.NewBuilder(includeThirdPartyAPIs). + ContinueOnError(). + NamespaceParam(namespace). + DefaultNamespace(). + Stream(modifiedReader, ""). + Flatten(). + Do() + + currentInfos, err := current.Infos() + if err != nil { + return err + } + + modifiedInfos := []*resource.Info{} + + modified.Visit(func(info *resource.Info, err error) error { + modifiedInfos = append(modifiedInfos, info) + if err != nil { + return err + } + resourceName := info.Name + + helper := resource.NewHelper(info.Client, info.Mapping) + if _, err := helper.Get(info.Namespace, resourceName, info.Export); err != nil { + if !errors.IsNotFound(err) { + return fmt.Errorf("Could not get information about the resource: err: %s", err) + } + + // Since the resource does not exist, create it. + if err := createResource(info); err != nil { + return err + } + + kind := info.Mapping.GroupVersionKind.Kind + log.Printf("Created a new %s called %s\n", kind, resourceName) + return nil + } + + currentObj, err := getCurrentObject(resourceName, currentInfos) + if err != nil { + return err + } + + if err := updateResource(info, currentObj); err != nil { + log.Printf("error updating the resource %s:\n\t %v", resourceName, err) + return err + } + + return err + }) + + deleteUnwantedResources(currentInfos, modifiedInfos) + + return nil +} + // Delete deletes kubernetes resources from an io.reader // // Namespace will set the namespace @@ -136,6 +213,52 @@ func createResource(info *resource.Info) error { return err } +func deleteResource(info *resource.Info) error { + return resource.NewHelper(info.Client, info.Mapping).Delete(info.Namespace, info.Name) +} + +func updateResource(modified *resource.Info, currentObj runtime.Object) error { + + encoder := api.Codecs.LegacyCodec(registered.EnabledVersions()...) + originalSerialization, err := runtime.Encode(encoder, currentObj) + if err != nil { + return err + } + + editedSerialization, err := runtime.Encode(encoder, modified.Object) + if err != nil { + return err + } + + originalJS, err := yaml.ToJSON(originalSerialization) + if err != nil { + return err + } + + editedJS, err := yaml.ToJSON(editedSerialization) + if err != nil { + return err + } + + if reflect.DeepEqual(originalJS, editedJS) { + return fmt.Errorf("Looks like there are no changes for %s", modified.Name) + } + + patch, err := strategicpatch.CreateStrategicMergePatch(originalJS, editedJS, currentObj) + if err != nil { + return err + } + + // send patch to server + helper := resource.NewHelper(modified.Client, modified.Mapping) + _, err = helper.Patch(modified.Namespace, modified.Name, api.StrategicMergePatchType, patch) + if err != nil { + return err + } + + return nil +} + func watchUntilReady(info *resource.Info) error { w, err := resource.NewHelper(info.Client, info.Mapping).WatchSingle(info.Namespace, info.Name, info.ResourceVersion) if err != nil { @@ -213,3 +336,37 @@ func (c *Client) ensureNamespace(namespace string) error { } return nil } + +func deleteUnwantedResources(currentInfos, modifiedInfos []*resource.Info) { + for _, cInfo := range currentInfos { + found := false + for _, m := range modifiedInfos { + if m.Name == cInfo.Name { + found = true + } + } + if !found { + log.Printf("Deleting %s...", cInfo.Name) + if err := deleteResource(cInfo); err != nil { + log.Printf("Failed to delete %s, err: %s", cInfo.Name, err) + } + } + } +} + +func getCurrentObject(targetName string, infos []*resource.Info) (runtime.Object, error) { + var curr *resource.Info + for _, currInfo := range infos { + if currInfo.Name == targetName { + curr = currInfo + } + } + + if curr == nil { + return nil, fmt.Errorf("No resource with the name %s found.", targetName) + } + + encoder := api.Codecs.LegacyCodec(registered.EnabledVersions()...) + defaultVersion := unversioned.GroupVersion{} + return resource.AsVersionedObject([]*resource.Info{curr}, false, defaultVersion, encoder) +} diff --git a/pkg/kube/client_test.go b/pkg/kube/client_test.go index 5ef8b913e..c59cc37b0 100644 --- a/pkg/kube/client_test.go +++ b/pkg/kube/client_test.go @@ -17,15 +17,55 @@ limitations under the License. package kube import ( + "bytes" + "encoding/json" "io" + "io/ioutil" + "net/http" "strings" "testing" "k8s.io/kubernetes/pkg/api/meta" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/api/unversioned" + api "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/unversioned/fake" "k8s.io/kubernetes/pkg/kubectl/resource" + "k8s.io/kubernetes/pkg/runtime" ) +func TestUpdateResource(t *testing.T) { + + tests := []struct { + name string + namespace string + modified *resource.Info + currentObj runtime.Object + err bool + errMessage string + }{ + { + name: "no changes when updating resources", + modified: createFakeInfo("nginx", nil), + currentObj: createFakePod("nginx", nil), + err: true, + errMessage: "Looks like there are no changes for nginx", + }, + //{ + //name: "valid update input", + //modified: createFakeInfo("nginx", map[string]string{"app": "nginx"}), + //currentObj: createFakePod("nginx", nil), + //}, + } + + for _, tt := range tests { + err := updateResource(tt.modified, tt.currentObj) + if err != nil && err.Error() != tt.errMessage { + t.Errorf("%q. expected error message: %v, got %v", tt.name, tt.errMessage, err) + } + } +} + func TestPerform(t *testing.T) { tests := []struct { name string @@ -214,3 +254,53 @@ spec: ports: - containerPort: 80 ` + +func createFakePod(name string, labels map[string]string) runtime.Object { + objectMeta := createObjectMeta(name, labels) + + object := &api.Pod{ + ObjectMeta: objectMeta, + } + + return object +} + +func createFakeInfo(name string, labels map[string]string) *resource.Info { + pod := createFakePod(name, labels) + marshaledObj, _ := json.Marshal(pod) + + mapping := &meta.RESTMapping{ + Resource: name, + Scope: meta.RESTScopeNamespace, + GroupVersionKind: unversioned.GroupVersionKind{ + Kind: "Pod", + Version: "v1", + }} + + client := &fake.RESTClient{ + Codec: testapi.Default.Codec(), + Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { + header := http.Header{} + header.Set("Content-Type", runtime.ContentTypeJSON) + return &http.Response{ + StatusCode: 200, + Header: header, + Body: ioutil.NopCloser(bytes.NewReader(marshaledObj)), + }, nil + })} + info := resource.NewInfo(client, mapping, "default", "nginx", false) + + info.Object = pod + + return info +} + +func createObjectMeta(name string, labels map[string]string) api.ObjectMeta { + objectMeta := api.ObjectMeta{Name: name, Namespace: "default"} + + if labels != nil { + objectMeta.Labels = labels + } + + return objectMeta +}