feat(kube): use jsonpatch to update ThirdPartyResources

pull/1941/head
Adam Reese 9 years ago
parent 4cdb2ac538
commit 0f461ba8b5

@ -82,7 +82,11 @@ func main() {
p.StringVarP(&grpcAddr, "listen", "l", ":44134", "address:port to listen on") p.StringVarP(&grpcAddr, "listen", "l", ":44134", "address:port to listen on")
p.StringVar(&store, "storage", storageConfigMap, "storage driver to use. One of 'configmap' or 'memory'") p.StringVar(&store, "storage", storageConfigMap, "storage driver to use. One of 'configmap' or 'memory'")
p.BoolVar(&enableTracing, "trace", false, "enable rpc tracing") p.BoolVar(&enableTracing, "trace", false, "enable rpc tracing")
rootCommand.Execute()
if err := rootCommand.Execute(); err != nil {
fmt.Fprint(os.Stderr, err)
os.Exit(1)
}
} }
func start(c *cobra.Command, args []string) { func start(c *cobra.Command, args []string) {

4
glide.lock generated

@ -1,5 +1,5 @@
hash: f59cd1f34ecf299aeb8ca9e16cfc181d8aeaf5a5762b89d76f18d08be35e4d67 hash: d9b023509801b816bc80b3abd67eb80532af1625e71ad4e0ff8ef98664f96ded
updated: 2017-02-09T11:27:34.6825226-08:00 updated: 2017-02-10T11:42:12.50337033-08:00
imports: imports:
- name: cloud.google.com/go - name: cloud.google.com/go
version: 3b1ae45394a234c385be014e9a488f2bb6eef821 version: 3b1ae45394a234c385be014e9a488f2bb6eef821

@ -55,6 +55,7 @@ import:
- openpgp - openpgp
- package: github.com/gobwas/glob - package: github.com/gobwas/glob
version: ^0.2.1 version: ^0.2.1
- package: github.com/evanphx/json-patch
testImports: testImports:
- package: github.com/stretchr/testify - package: github.com/stretchr/testify
version: ^1.1.4 version: ^1.1.4

@ -18,6 +18,7 @@ package kube // import "k8s.io/helm/pkg/kube"
import ( import (
"bytes" "bytes"
"encoding/json"
goerrors "errors" goerrors "errors"
"fmt" "fmt"
"io" "io"
@ -25,6 +26,7 @@ import (
"strings" "strings"
"time" "time"
jsonpatch "github.com/evanphx/json-patch"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
@ -116,7 +118,8 @@ func (c *Client) BuildUnstructured(namespace string, reader io.Reader) (Result,
mapper, typer, err := c.UnstructuredObject() mapper, typer, err := c.UnstructuredObject()
if err != nil { if err != nil {
log.Printf("warning: failed to load mapper: %s", err) log.Printf("failed to load mapper: %s", err)
return nil, err
} }
var result Result var result Result
result, err = resource.NewBuilder(mapper, typer, resource.ClientMapperFunc(c.UnstructuredClientForMapping), runtime.UnstructuredJSONScheme). result, err = resource.NewBuilder(mapper, typer, resource.ClientMapperFunc(c.UnstructuredClientForMapping), runtime.UnstructuredJSONScheme).
@ -239,12 +242,7 @@ func (c *Client) Update(namespace string, originalReader, targetReader io.Reader
return fmt.Errorf("no resource with the name %s found", info.Name) return fmt.Errorf("no resource with the name %s found", info.Name)
} }
versionedObject, err := originalInfo.Mapping.ConvertToVersion(originalInfo.Object, originalInfo.Mapping.GroupVersionKind.GroupVersion()) if err := updateResource(c, info, originalInfo.Object, recreate); err != nil {
if err != nil {
return err
}
if err := updateResource(c, info, versionedObject, recreate); err != nil {
log.Printf("error updating the resource %s:\n\t %v", info.Name, err) log.Printf("error updating the resource %s:\n\t %v", info.Name, err)
updateErrors = append(updateErrors, err.Error()) updateErrors = append(updateErrors, err.Error())
} }
@ -268,10 +266,7 @@ func (c *Client) Update(namespace string, originalReader, targetReader io.Reader
if shouldWait { if shouldWait {
err = c.waitForResources(time.Duration(timeout)*time.Second, target) err = c.waitForResources(time.Duration(timeout)*time.Second, target)
} }
if err != nil { return err
return err
}
return nil
} }
// Delete deletes kubernetes resources from an io.reader // Delete deletes kubernetes resources from an io.reader
@ -360,32 +355,51 @@ func deleteResource(c *Client, info *resource.Info) error {
return reaper.Stop(info.Namespace, info.Name, 0, nil) return reaper.Stop(info.Namespace, info.Name, 0, nil)
} }
func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, recreate bool) error { func createPatch(target *resource.Info, currentObj runtime.Object) ([]byte, api.PatchType, error) {
encoder := c.JSONEncoder() // Get a versioned object
original, err := runtime.Encode(encoder, currentObj) versionedObject, err := api.Scheme.New(target.Mapping.GroupVersionKind)
if err != nil { if err != nil {
return err return nil, api.StrategicMergePatchType, fmt.Errorf("failed to get versionedObject: %s", err)
} }
modified, err := runtime.Encode(encoder, target.Object) oldData, err := json.Marshal(currentObj)
if err != nil { if err != nil {
return err return nil, api.StrategicMergePatchType, fmt.Errorf("serializing current configuration: %s", err)
}
newData, err := json.Marshal(target.Object)
if err != nil {
return nil, api.StrategicMergePatchType, fmt.Errorf("serializing target configuration: %s", err)
} }
if api.Semantic.DeepEqual(original, modified) { if api.Semantic.DeepEqual(oldData, newData) {
log.Printf("Looks like there are no changes for %s", target.Name) return nil, api.StrategicMergePatchType, nil
return nil
} }
patch, err := strategicpatch.CreateTwoWayMergePatch(original, modified, currentObj) switch target.Object.(type) {
case *runtime.Unstructured:
patch, err := jsonpatch.CreateMergePatch(oldData, newData)
return patch, api.MergePatchType, err
default:
log.Printf("generating strategic merge patch for %T", target.Object)
patch, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, versionedObject)
return patch, api.StrategicMergePatchType, err
}
}
func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, recreate bool) error {
patch, patchType, err := createPatch(target, currentObj)
if err != nil { if err != nil {
return err return fmt.Errorf("failed to create patch: %s", err)
}
if patch == nil {
log.Printf("Looks like there are no changes for %s", target.Name)
return nil
} }
// send patch to server // send patch to server
helper := resource.NewHelper(target.Client, target.Mapping) helper := resource.NewHelper(target.Client, target.Mapping)
var obj runtime.Object var obj runtime.Object
if obj, err = helper.Patch(target.Namespace, target.Name, api.StrategicMergePatchType, patch); err != nil { if obj, err = helper.Patch(target.Namespace, target.Name, patchType, patch); err != nil {
return err return err
} }

@ -169,9 +169,9 @@ func TestUpdate(t *testing.T) {
t.Fatalf("could not dump request: %s", err) t.Fatalf("could not dump request: %s", err)
} }
req.Body.Close() req.Body.Close()
expected := `{"spec":{"containers":[{"name":"app:v4","ports":[{"containerPort":443,"name":"https","protocol":"TCP"},{"$patch":"delete","containerPort":80}]}]}}` expected := `{"spec":{"containers":[{"image":"abc/app:v4","name":"app:v4","ports":[{"containerPort":443,"name":"https"}],"resources":{}}]}}`
if string(data) != expected { if string(data) != expected {
t.Errorf("expected patch %s, got %s", expected, string(data)) t.Errorf("expected patch\n%s\ngot\n%s", expected, string(data))
} }
return newResponse(200, &listB.Items[0]) return newResponse(200, &listB.Items[0])
case p == "/namespaces/default/pods" && m == "POST": case p == "/namespaces/default/pods" && m == "POST":

Loading…
Cancel
Save