From 4cdb2ac538b6bf474c3d16a0fd9b4ae5f1922d00 Mon Sep 17 00:00:00 2001 From: Adam Reese Date: Thu, 9 Feb 2017 15:45:54 -0800 Subject: [PATCH 1/2] feat(kube): support thirdpartyresources --- glide.lock | 5 ++-- pkg/kube/client.go | 35 ++++++++++++++++++---- pkg/tiller/environment/environment.go | 6 ++++ pkg/tiller/environment/environment_test.go | 3 ++ pkg/tiller/release_server.go | 2 +- 5 files changed, 42 insertions(+), 9 deletions(-) diff --git a/glide.lock b/glide.lock index e76cb318c..dfc5e995a 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ hash: f59cd1f34ecf299aeb8ca9e16cfc181d8aeaf5a5762b89d76f18d08be35e4d67 -updated: 2016-12-21T11:19:14.731981344-07:00 +updated: 2017-02-09T11:27:34.6825226-08:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 @@ -290,7 +290,7 @@ imports: - name: gopkg.in/yaml.v2 version: a83829b6f1293c91addabc89d0571c246397bbf4 - name: k8s.io/kubernetes - version: 5f332aab13e58173f85fd204a2c77731f7a2573f + version: 08e099554f3c31f6e6f07b448ab3ed78d0520507 subpackages: - cmd/kubeadm/app/apis/kubeadm - cmd/kubeadm/app/apis/kubeadm/install @@ -482,6 +482,7 @@ imports: - pkg/util/yaml - pkg/version - pkg/watch + - pkg/watch/json - pkg/watch/versioned - plugin/pkg/client/auth - plugin/pkg/client/auth/gcp diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 850dfe1a9..b4b965f1f 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -78,7 +78,7 @@ func (c *Client) Create(namespace string, reader io.Reader, timeout int64, shoul if err := ensureNamespace(client, namespace); err != nil { return err } - infos, buildErr := c.Build(namespace, reader) + infos, buildErr := c.BuildUnstructured(namespace, reader) if buildErr != nil { return buildErr } @@ -107,6 +107,29 @@ func (c *Client) newBuilder(namespace string, reader io.Reader) *resource.Result Do() } +// BuildUnstructured validates for Kubernetes objects and returns unstructured infos. +func (c *Client) BuildUnstructured(namespace string, reader io.Reader) (Result, error) { + schema, err := c.Validator(true, c.SchemaCacheDir) + if err != nil { + log.Printf("warning: failed to load schema: %s", err) + } + + mapper, typer, err := c.UnstructuredObject() + if err != nil { + log.Printf("warning: failed to load mapper: %s", err) + } + var result Result + result, err = resource.NewBuilder(mapper, typer, resource.ClientMapperFunc(c.UnstructuredClientForMapping), runtime.UnstructuredJSONScheme). + ContinueOnError(). + Schema(schema). + NamespaceParam(namespace). + DefaultNamespace(). + Stream(reader, ""). + Flatten(). + Do().Infos() + return result, scrubValidationError(err) +} + // Build validates for Kubernetes objects and returns resource Infos from a io.Reader. func (c *Client) Build(namespace string, reader io.Reader) (Result, error) { var result Result @@ -121,7 +144,7 @@ func (c *Client) Get(namespace string, reader io.Reader) (string, error) { // Since we don't know what order the objects come in, let's group them by the types, so // that when we print them, they come looking good (headers apply to subgroups, etc.) objs := make(map[string][]runtime.Object) - infos, err := c.Build(namespace, reader) + infos, err := c.BuildUnstructured(namespace, reader) if err != nil { return "", err } @@ -178,12 +201,12 @@ func (c *Client) Get(namespace string, reader io.Reader) (string, error) { // // Namespace will set the namespaces func (c *Client) Update(namespace string, originalReader, targetReader io.Reader, recreate bool, timeout int64, shouldWait bool) error { - original, err := c.Build(namespace, originalReader) + original, err := c.BuildUnstructured(namespace, originalReader) if err != nil { return fmt.Errorf("failed decoding reader into objects: %s", err) } - target, err := c.Build(namespace, targetReader) + target, err := c.BuildUnstructured(namespace, targetReader) if err != nil { return fmt.Errorf("failed decoding reader into objects: %s", err) } @@ -255,7 +278,7 @@ func (c *Client) Update(namespace string, originalReader, targetReader io.Reader // // Namespace will set the namespace func (c *Client) Delete(namespace string, reader io.Reader) error { - infos, err := c.Build(namespace, reader) + infos, err := c.BuildUnstructured(namespace, reader) if err != nil { return err } @@ -585,7 +608,7 @@ func (c *Client) waitForResources(timeout time.Duration, created Result) error { func waitForJob(e watch.Event, name string) (bool, error) { o, ok := e.Object.(*batch.Job) if !ok { - return true, fmt.Errorf("Expected %s to be a *batch.Job, got %T", name, o) + return true, fmt.Errorf("Expected %s to be a *batch.Job, got %T", name, e.Object) } for _, c := range o.Status.Conditions { diff --git a/pkg/tiller/environment/environment.go b/pkg/tiller/environment/environment.go index fa5d1ecab..d218818e8 100644 --- a/pkg/tiller/environment/environment.go +++ b/pkg/tiller/environment/environment.go @@ -137,6 +137,7 @@ type KubeClient interface { Update(namespace string, originalReader, modifiedReader io.Reader, recreate bool, timeout int64, shouldWait bool) error Build(namespace string, reader io.Reader) (kube.Result, error) + BuildUnstructured(namespace string, reader io.Reader) (kube.Result, error) // WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase // and returns said phase (PodSucceeded or PodFailed qualify) @@ -186,6 +187,11 @@ func (p *PrintingKubeClient) Build(ns string, reader io.Reader) (kube.Result, er return []*resource.Info{}, nil } +// BuildUnstructured implements KubeClient BuildUnstructured. +func (p *PrintingKubeClient) BuildUnstructured(ns string, reader io.Reader) (kube.Result, error) { + return []*resource.Info{}, nil +} + // WaitAndGetCompletedPodPhase implements KubeClient WaitAndGetCompletedPodPhase func (p *PrintingKubeClient) WaitAndGetCompletedPodPhase(namespace string, reader io.Reader, timeout time.Duration) (api.PodPhase, error) { _, err := io.Copy(p.Out, reader) diff --git a/pkg/tiller/environment/environment_test.go b/pkg/tiller/environment/environment_test.go index cb36de356..716836438 100644 --- a/pkg/tiller/environment/environment_test.go +++ b/pkg/tiller/environment/environment_test.go @@ -57,6 +57,9 @@ func (k *mockKubeClient) WatchUntilReady(ns string, r io.Reader, timeout int64, func (k *mockKubeClient) Build(ns string, reader io.Reader) (kube.Result, error) { return []*resource.Info{}, nil } +func (k *mockKubeClient) BuildUnstructured(ns string, reader io.Reader) (kube.Result, error) { + return []*resource.Info{}, nil +} func (k *mockKubeClient) WaitAndGetCompletedPodPhase(namespace string, reader io.Reader, timeout time.Duration) (api.PodPhase, error) { return api.PodUnknown, nil } diff --git a/pkg/tiller/release_server.go b/pkg/tiller/release_server.go index 29cb335b0..25150bcda 100644 --- a/pkg/tiller/release_server.go +++ b/pkg/tiller/release_server.go @@ -1047,7 +1047,7 @@ func (s *ReleaseServer) UninstallRelease(c ctx.Context, req *services.UninstallR func validateManifest(c environment.KubeClient, ns string, manifest []byte) error { r := bytes.NewReader(manifest) - _, err := c.Build(ns, r) + _, err := c.BuildUnstructured(ns, r) return err } From 0f461ba8b55cdfad6d10c8fa9caa658ecddefbd6 Mon Sep 17 00:00:00 2001 From: Adam Reese Date: Fri, 10 Feb 2017 11:46:04 -0800 Subject: [PATCH 2/2] feat(kube): use jsonpatch to update ThirdPartyResources --- cmd/tiller/tiller.go | 6 ++++- glide.lock | 4 +-- glide.yaml | 1 + pkg/kube/client.go | 60 +++++++++++++++++++++++++---------------- pkg/kube/client_test.go | 4 +-- 5 files changed, 47 insertions(+), 28 deletions(-) diff --git a/cmd/tiller/tiller.go b/cmd/tiller/tiller.go index 11baa5045..72388d307 100644 --- a/cmd/tiller/tiller.go +++ b/cmd/tiller/tiller.go @@ -82,7 +82,11 @@ func main() { p.StringVarP(&grpcAddr, "listen", "l", ":44134", "address:port to listen on") p.StringVar(&store, "storage", storageConfigMap, "storage driver to use. One of 'configmap' or 'memory'") p.BoolVar(&enableTracing, "trace", false, "enable rpc tracing") - rootCommand.Execute() + + if err := rootCommand.Execute(); err != nil { + fmt.Fprint(os.Stderr, err) + os.Exit(1) + } } func start(c *cobra.Command, args []string) { diff --git a/glide.lock b/glide.lock index dfc5e995a..35cd2846e 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: f59cd1f34ecf299aeb8ca9e16cfc181d8aeaf5a5762b89d76f18d08be35e4d67 -updated: 2017-02-09T11:27:34.6825226-08:00 +hash: d9b023509801b816bc80b3abd67eb80532af1625e71ad4e0ff8ef98664f96ded +updated: 2017-02-10T11:42:12.50337033-08:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 diff --git a/glide.yaml b/glide.yaml index 1c9ed6ece..60966b966 100644 --- a/glide.yaml +++ b/glide.yaml @@ -55,6 +55,7 @@ import: - openpgp - package: github.com/gobwas/glob version: ^0.2.1 +- package: github.com/evanphx/json-patch testImports: - package: github.com/stretchr/testify version: ^1.1.4 diff --git a/pkg/kube/client.go b/pkg/kube/client.go index b4b965f1f..eb0f63019 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -18,6 +18,7 @@ package kube // import "k8s.io/helm/pkg/kube" import ( "bytes" + "encoding/json" goerrors "errors" "fmt" "io" @@ -25,6 +26,7 @@ import ( "strings" "time" + jsonpatch "github.com/evanphx/json-patch" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/v1" @@ -116,7 +118,8 @@ func (c *Client) BuildUnstructured(namespace string, reader io.Reader) (Result, mapper, typer, err := c.UnstructuredObject() if err != nil { - log.Printf("warning: failed to load mapper: %s", err) + log.Printf("failed to load mapper: %s", err) + return nil, err } var result Result result, err = resource.NewBuilder(mapper, typer, resource.ClientMapperFunc(c.UnstructuredClientForMapping), runtime.UnstructuredJSONScheme). @@ -239,12 +242,7 @@ func (c *Client) Update(namespace string, originalReader, targetReader io.Reader return fmt.Errorf("no resource with the name %s found", info.Name) } - versionedObject, err := originalInfo.Mapping.ConvertToVersion(originalInfo.Object, originalInfo.Mapping.GroupVersionKind.GroupVersion()) - if err != nil { - return err - } - - if err := updateResource(c, info, versionedObject, recreate); err != nil { + if err := updateResource(c, info, originalInfo.Object, recreate); err != nil { log.Printf("error updating the resource %s:\n\t %v", info.Name, err) updateErrors = append(updateErrors, err.Error()) } @@ -268,10 +266,7 @@ func (c *Client) Update(namespace string, originalReader, targetReader io.Reader if shouldWait { err = c.waitForResources(time.Duration(timeout)*time.Second, target) } - if err != nil { - return err - } - return nil + return err } // Delete deletes kubernetes resources from an io.reader @@ -360,32 +355,51 @@ func deleteResource(c *Client, info *resource.Info) error { return reaper.Stop(info.Namespace, info.Name, 0, nil) } -func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, recreate bool) error { - encoder := c.JSONEncoder() - original, err := runtime.Encode(encoder, currentObj) +func createPatch(target *resource.Info, currentObj runtime.Object) ([]byte, api.PatchType, error) { + // Get a versioned object + versionedObject, err := api.Scheme.New(target.Mapping.GroupVersionKind) if err != nil { - return err + return nil, api.StrategicMergePatchType, fmt.Errorf("failed to get versionedObject: %s", err) } - modified, err := runtime.Encode(encoder, target.Object) + oldData, err := json.Marshal(currentObj) if err != nil { - return err + return nil, api.StrategicMergePatchType, fmt.Errorf("serializing current configuration: %s", err) + } + newData, err := json.Marshal(target.Object) + if err != nil { + return nil, api.StrategicMergePatchType, fmt.Errorf("serializing target configuration: %s", err) } - if api.Semantic.DeepEqual(original, modified) { - log.Printf("Looks like there are no changes for %s", target.Name) - return nil + if api.Semantic.DeepEqual(oldData, newData) { + return nil, api.StrategicMergePatchType, nil } - patch, err := strategicpatch.CreateTwoWayMergePatch(original, modified, currentObj) + switch target.Object.(type) { + case *runtime.Unstructured: + patch, err := jsonpatch.CreateMergePatch(oldData, newData) + return patch, api.MergePatchType, err + default: + log.Printf("generating strategic merge patch for %T", target.Object) + patch, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, versionedObject) + return patch, api.StrategicMergePatchType, err + } +} + +func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, recreate bool) error { + patch, patchType, err := createPatch(target, currentObj) if err != nil { - return err + return fmt.Errorf("failed to create patch: %s", err) + } + if patch == nil { + log.Printf("Looks like there are no changes for %s", target.Name) + return nil } // send patch to server helper := resource.NewHelper(target.Client, target.Mapping) var obj runtime.Object - if obj, err = helper.Patch(target.Namespace, target.Name, api.StrategicMergePatchType, patch); err != nil { + if obj, err = helper.Patch(target.Namespace, target.Name, patchType, patch); err != nil { return err } diff --git a/pkg/kube/client_test.go b/pkg/kube/client_test.go index 0c9cf788b..38946a06b 100644 --- a/pkg/kube/client_test.go +++ b/pkg/kube/client_test.go @@ -169,9 +169,9 @@ func TestUpdate(t *testing.T) { t.Fatalf("could not dump request: %s", err) } req.Body.Close() - expected := `{"spec":{"containers":[{"name":"app:v4","ports":[{"containerPort":443,"name":"https","protocol":"TCP"},{"$patch":"delete","containerPort":80}]}]}}` + expected := `{"spec":{"containers":[{"image":"abc/app:v4","name":"app:v4","ports":[{"containerPort":443,"name":"https"}],"resources":{}}]}}` if string(data) != expected { - t.Errorf("expected patch %s, got %s", expected, string(data)) + t.Errorf("expected patch\n%s\ngot\n%s", expected, string(data)) } return newResponse(200, &listB.Items[0]) case p == "/namespaces/default/pods" && m == "POST":