From b22e6fcfaa7f1ff0ed73cac34b667d253a5d3a6d Mon Sep 17 00:00:00 2001 From: Adam Reese Date: Fri, 13 Jan 2017 12:41:21 -0800 Subject: [PATCH] ref(kube): add Result to abstract a collection of resources --- pkg/kube/client.go | 91 +++++++--------------- pkg/kube/client_test.go | 31 ++++---- pkg/kube/result.go | 87 +++++++++++++++++++++ pkg/kube/result_test.go | 58 ++++++++++++++ pkg/tiller/environment/environment.go | 4 +- pkg/tiller/environment/environment_test.go | 3 +- 6 files changed, 194 insertions(+), 80 deletions(-) create mode 100644 pkg/kube/result.go create mode 100644 pkg/kube/result_test.go diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 53d087ffc..9498d4a41 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -28,7 +28,6 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/v1" - "k8s.io/kubernetes/pkg/apimachinery/registered" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" @@ -65,15 +64,6 @@ func New(config clientcmd.ClientConfig) *Client { // ResourceActorFunc performs an action on a single resource. type ResourceActorFunc func(*resource.Info) error -// ErrAlreadyExists can be returned where there are no changes -type ErrAlreadyExists struct { - errorMsg string -} - -func (e ErrAlreadyExists) Error() string { - return fmt.Sprintf("Looks like there are no changes for %s", e.errorMsg) -} - // Create creates kubernetes resources from an io.reader // // Namespace will set the namespace @@ -104,8 +94,10 @@ func (c *Client) newBuilder(namespace string, reader io.Reader) *resource.Result } // Build validates for Kubernetes objects and returns resource Infos from a io.Reader. -func (c *Client) Build(namespace string, reader io.Reader) ([]*resource.Info, error) { - return c.newBuilder(namespace, reader).Infos() +func (c *Client) Build(namespace string, reader io.Reader) (Result, error) { + var result Result + result, err := c.newBuilder(namespace, reader).Infos() + return result, err } // Get gets kubernetes resources as pretty printed string @@ -167,22 +159,20 @@ func (c *Client) Get(namespace string, reader io.Reader) (string, error) { // not present in the target configuration // // Namespace will set the namespaces -func (c *Client) Update(namespace string, currentReader, targetReader io.Reader, recreate bool) error { - currentInfos, err := c.Build(namespace, currentReader) +func (c *Client) Update(namespace string, originalReader, targetReader io.Reader, recreate bool) error { + original, err := c.Build(namespace, originalReader) if err != nil { return fmt.Errorf("failed decoding reader into objects: %s", err) } - target := c.newBuilder(namespace, targetReader) - if target.Err() != nil { - return fmt.Errorf("failed decoding reader into objects: %s", target.Err()) + target, err := c.Build(namespace, targetReader) + if err != nil { + return fmt.Errorf("failed decoding reader into objects: %s", err) } - targetInfos := []*resource.Info{} updateErrors := []string{} err = target.Visit(func(info *resource.Info, err error) error { - targetInfos = append(targetInfos, info) if err != nil { return err } @@ -203,18 +193,19 @@ func (c *Client) Update(namespace string, currentReader, targetReader io.Reader, return nil } - currentObj, err := getCurrentObject(info, currentInfos) + originalInfo := original.Get(info) + if originalInfo == nil { + return fmt.Errorf("no resource with the name %s found", info.Name) + } + + versionedObject, err := originalInfo.Mapping.ConvertToVersion(originalInfo.Object, originalInfo.Mapping.GroupVersionKind.GroupVersion()) if err != nil { return err } - if err := updateResource(c, info, currentObj, recreate); err != nil { - if alreadyExistErr, ok := err.(ErrAlreadyExists); ok { - log.Printf(alreadyExistErr.errorMsg) - } else { - log.Printf("error updating the resource %s:\n\t %v", info.Name, err) - updateErrors = append(updateErrors, err.Error()) - } + if err := updateResource(c, info, versionedObject, recreate); err != nil { + log.Printf("error updating the resource %s:\n\t %v", info.Name, err) + updateErrors = append(updateErrors, err.Error()) } return nil @@ -225,7 +216,13 @@ func (c *Client) Update(namespace string, currentReader, targetReader io.Reader, } else if len(updateErrors) != 0 { return fmt.Errorf(strings.Join(updateErrors, " && ")) } - deleteUnwantedResources(currentInfos, targetInfos) + + for _, info := range original.Difference(target) { + log.Printf("Deleting %s in %s...", info.Name, info.Namespace) + if err := deleteResource(info); err != nil { + log.Printf("Failed to delete %s, err: %s", info.Name, err) + } + } return nil } @@ -311,7 +308,7 @@ func deleteResource(info *resource.Info) error { } func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, recreate bool) error { - encoder := api.Codecs.LegacyCodec(registered.EnabledVersions()...) + encoder := c.JSONEncoder() original, err := runtime.Encode(encoder, currentObj) if err != nil { return err @@ -323,7 +320,8 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, } if api.Semantic.DeepEqual(original, modified) { - return ErrAlreadyExists{target.Name} + log.Printf("Looks like there are no changes for %s", target.Name) + return nil } patch, err := strategicpatch.CreateTwoWayMergePatch(original, modified, currentObj) @@ -453,39 +451,6 @@ func waitForJob(e watch.Event, name string) (bool, error) { return false, nil } -func deleteUnwantedResources(currentInfos, targetInfos []*resource.Info) { - for _, cInfo := range currentInfos { - if _, ok := findMatchingInfo(cInfo, targetInfos); !ok { - log.Printf("Deleting %s...", cInfo.Name) - if err := deleteResource(cInfo); err != nil { - log.Printf("Failed to delete %s, err: %s", cInfo.Name, err) - } - } - } -} - -func getCurrentObject(target *resource.Info, infos []*resource.Info) (runtime.Object, error) { - if found, ok := findMatchingInfo(target, infos); ok { - return found.Mapping.ConvertToVersion(found.Object, found.Mapping.GroupVersionKind.GroupVersion()) - } - return nil, fmt.Errorf("no resource with the name %s found", target.Name) -} - -// isMatchingInfo returns true if infos match on Name and Kind. -func isMatchingInfo(a, b *resource.Info) bool { - return a.Name == b.Name && a.Mapping.GroupVersionKind.Kind == b.Mapping.GroupVersionKind.Kind -} - -// findMatchingInfo returns the first object that matches target. -func findMatchingInfo(target *resource.Info, infos []*resource.Info) (*resource.Info, bool) { - for _, info := range infos { - if isMatchingInfo(target, info) { - return info, true - } - } - return nil, false -} - // scrubValidationError removes kubectl info from the message func scrubValidationError(err error) error { const stopValidateMessage = "if you choose to ignore these errors, turn validation off with --validate=false" diff --git a/pkg/kube/client_test.go b/pkg/kube/client_test.go index 1f480e81d..08eb9a0cb 100644 --- a/pkg/kube/client_test.go +++ b/pkg/kube/client_test.go @@ -40,7 +40,10 @@ func objBody(codec runtime.Codec, obj runtime.Object) io.ReadCloser { func newPod(name string) api.Pod { return api.Pod{ - ObjectMeta: api.ObjectMeta{Name: name}, + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: api.NamespaceDefault, + }, Spec: api.PodSpec{ Containers: []api.Container{{ Name: "app:v4", @@ -90,13 +93,13 @@ func TestUpdate(t *testing.T) { p, m := req.URL.Path, req.Method actions[p] = m switch { - case p == "/namespaces/test/pods/starfish" && m == "GET": + case p == "/namespaces/default/pods/starfish" && m == "GET": return newResponse(200, &listA.Items[0]) - case p == "/namespaces/test/pods/otter" && m == "GET": + case p == "/namespaces/default/pods/otter" && m == "GET": return newResponse(200, &listA.Items[1]) - case p == "/namespaces/test/pods/dolphin" && m == "GET": + case p == "/namespaces/default/pods/dolphin" && m == "GET": return newResponse(404, notFoundBody()) - case p == "/namespaces/test/pods/starfish" && m == "PATCH": + case p == "/namespaces/default/pods/starfish" && m == "PATCH": data, err := ioutil.ReadAll(req.Body) if err != nil { t.Fatalf("could not dump request: %s", err) @@ -107,28 +110,28 @@ func TestUpdate(t *testing.T) { t.Errorf("expected patch %s, got %s", expected, string(data)) } return newResponse(200, &listB.Items[0]) - case p == "/namespaces/test/pods" && m == "POST": + case p == "/namespaces/default/pods" && m == "POST": return newResponse(200, &listB.Items[1]) - case p == "/namespaces/test/pods/squid" && m == "DELETE": + case p == "/namespaces/default/pods/squid" && m == "DELETE": return newResponse(200, &listB.Items[1]) default: - t.Fatalf("unexpected request: %#v\n%#v", req.URL, req) + t.Fatalf("unexpected request: %s %s", req.Method, req.URL.Path) return nil, nil } }), } c := &Client{Factory: f} - if err := c.Update("test", objBody(codec, &listA), objBody(codec, &listB), false); err != nil { + if err := c.Update(api.NamespaceDefault, objBody(codec, &listA), objBody(codec, &listB), false); err != nil { t.Fatal(err) } expectedActions := map[string]string{ - "/namespaces/test/pods/dolphin": "GET", - "/namespaces/test/pods/otter": "GET", - "/namespaces/test/pods/starfish": "PATCH", - "/namespaces/test/pods": "POST", - "/namespaces/test/pods/squid": "DELETE", + "/namespaces/default/pods/dolphin": "GET", + "/namespaces/default/pods/otter": "GET", + "/namespaces/default/pods/starfish": "PATCH", + "/namespaces/default/pods": "POST", + "/namespaces/default/pods/squid": "DELETE", } for k, v := range expectedActions { diff --git a/pkg/kube/result.go b/pkg/kube/result.go new file mode 100644 index 000000000..9f143feb5 --- /dev/null +++ b/pkg/kube/result.go @@ -0,0 +1,87 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kube // import "k8s.io/helm/pkg/kube" + +import "k8s.io/kubernetes/pkg/kubectl/resource" + +// Result provides convenience methods for comparing collections of Infos. +type Result []*resource.Info + +// Append adds an Info to the Result. +func (r *Result) Append(val *resource.Info) { + *r = append(*r, val) +} + +// Visit implements resource.Visitor. +func (r Result) Visit(fn resource.VisitorFunc) error { + for _, i := range r { + if err := fn(i, nil); err != nil { + return err + } + } + return nil +} + +// Filter returns a new Result with Infos that satisfy the predicate fn. +func (r Result) Filter(fn func(*resource.Info) bool) Result { + var result Result + for _, i := range r { + if fn(i) { + result.Append(i) + } + } + return result +} + +// Get returns the Info from the result that matches the name and kind. +func (r Result) Get(info *resource.Info) *resource.Info { + for _, i := range r { + if isMatchingInfo(i, info) { + return i + } + } + return nil +} + +// Contains checks to see if an object exists. +func (r Result) Contains(info *resource.Info) bool { + for _, i := range r { + if isMatchingInfo(i, info) { + return true + } + } + return false +} + +// Difference will return a new Result with objects not contained in rs. +func (r Result) Difference(rs Result) Result { + return r.Filter(func(info *resource.Info) bool { + return !rs.Contains(info) + }) +} + +// Intersect will return a new Result with objects contained in both Results. +func (r Result) Intersect(rs Result) Result { + return r.Filter(func(info *resource.Info) bool { + return rs.Contains(info) + }) +} + +// isMatchingInfo returns true if infos match on Name and GroupVersionKind. +func isMatchingInfo(a, b *resource.Info) bool { + return a.Name == b.Name && a.Mapping.GroupVersionKind == b.Mapping.GroupVersionKind +} diff --git a/pkg/kube/result_test.go b/pkg/kube/result_test.go new file mode 100644 index 000000000..8bc218c67 --- /dev/null +++ b/pkg/kube/result_test.go @@ -0,0 +1,58 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kube // import "k8s.io/helm/pkg/kube" + +import ( + "testing" + + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/kubectl/resource" +) + +func TestResult(t *testing.T) { + mapping, err := testapi.Default.RESTMapper().RESTMapping(unversioned.GroupKind{Kind: "Pod"}) + if err != nil { + t.Fatal(err) + } + + info := func(name string) *resource.Info { + return &resource.Info{Name: name, Mapping: mapping} + } + + var r1, r2 Result + r1 = []*resource.Info{info("foo"), info("bar")} + r2 = []*resource.Info{info("bar")} + + diff := r1.Difference(r2) + if len(diff) != 1 { + t.Error("expected 1 result") + } + + if !diff.Contains(info("foo")) { + t.Error("expected diff to return foo") + } + + inter := r1.Intersect(r2) + if len(inter) != 1 { + t.Error("expected 1 result") + } + + if !inter.Contains(info("bar")) { + t.Error("expected intersect to return bar") + } +} diff --git a/pkg/tiller/environment/environment.go b/pkg/tiller/environment/environment.go index 349d82a85..094ff940d 100644 --- a/pkg/tiller/environment/environment.go +++ b/pkg/tiller/environment/environment.go @@ -134,7 +134,7 @@ type KubeClient interface { // by "\n---\n"). Update(namespace string, originalReader, modifiedReader io.Reader, recreate bool) error - Build(namespace string, reader io.Reader) ([]*resource.Info, error) + Build(namespace string, reader io.Reader) (kube.Result, error) } // PrintingKubeClient implements KubeClient, but simply prints the reader to @@ -176,7 +176,7 @@ func (p *PrintingKubeClient) Update(ns string, currentReader, modifiedReader io. } // Build implements KubeClient Build. -func (p *PrintingKubeClient) Build(ns string, reader io.Reader) ([]*resource.Info, error) { +func (p *PrintingKubeClient) Build(ns string, reader io.Reader) (kube.Result, error) { return []*resource.Info{}, nil } diff --git a/pkg/tiller/environment/environment_test.go b/pkg/tiller/environment/environment_test.go index c3b85b778..cae6c5c9e 100644 --- a/pkg/tiller/environment/environment_test.go +++ b/pkg/tiller/environment/environment_test.go @@ -22,6 +22,7 @@ import ( "testing" "k8s.io/helm/pkg/chartutil" + "k8s.io/helm/pkg/kube" "k8s.io/helm/pkg/proto/hapi/chart" "k8s.io/kubernetes/pkg/kubectl/resource" ) @@ -51,7 +52,7 @@ func (k *mockKubeClient) Update(ns string, currentReader, modifiedReader io.Read func (k *mockKubeClient) WatchUntilReady(ns string, r io.Reader, t int64) error { return nil } -func (k *mockKubeClient) Build(ns string, reader io.Reader) ([]*resource.Info, error) { +func (k *mockKubeClient) Build(ns string, reader io.Reader) (kube.Result, error) { return []*resource.Info{}, nil }