Merge pull request #1026 from michelleN/feat/690-helm-upgrade

feat(kube): add update logic to kube client
pull/1031/head
Michelle Noorali 9 years ago committed by GitHub
commit dd8b0ffba0

@ -161,6 +161,15 @@ type KubeClient interface {
// For all other kinds, it means the kind was created or modified without
// error.
WatchUntilReady(namespace string, reader io.Reader) error
// Update updates one or more resources or creates the resource
// if it doesn't exist
//
// namespace must contain a valid existing namespace
//
// reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n").
Update(namespace string, originalReader, modifiedReader io.Reader) error
}
// PrintingKubeClient implements KubeClient, but simply prints the reader to
@ -189,6 +198,12 @@ func (p *PrintingKubeClient) WatchUntilReady(ns string, r io.Reader) error {
return err
}
// Update implements KubeClient Update.
func (p *PrintingKubeClient) Update(ns string, currentReader, modifiedReader io.Reader) error {
_, err := io.Copy(p.Out, modifiedReader)
return err
}
// Environment provides the context for executing a client request.
//
// All services in a context are concurrency safe.

@ -83,6 +83,9 @@ func (k *mockKubeClient) Create(ns string, r io.Reader) error {
func (k *mockKubeClient) Delete(ns string, r io.Reader) error {
return nil
}
func (k *mockKubeClient) Update(ns string, currentReader, modifiedReader io.Reader) error {
return nil
}
func (k *mockKubeClient) WatchUntilReady(ns string, r io.Reader) error {
return nil
}

@ -27,6 +27,8 @@ import:
- pkg/api
- pkg/api/meta
- pkg/api/error
- pkg/api/unversioned
- pkg/apimachinery/registered
- pkg/client/restclient
- pkg/client/unversioned
- pkg/apis/batch
@ -40,6 +42,8 @@ import:
- pkg/labels
- pkg/runtime
- pkg/watch
- pkg/util/strategicpatch
- pkg/util/yaml
- package: github.com/gosuri/uitable
- package: speter.net/go/exp/math/dec/inf
version: ^0.9.0

@ -20,15 +20,21 @@ import (
"fmt"
"io"
"log"
"reflect"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/kubectl"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/resource"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/strategicpatch"
"k8s.io/kubernetes/pkg/util/yaml"
"k8s.io/kubernetes/pkg/watch"
)
@ -57,6 +63,77 @@ func (c *Client) Create(namespace string, reader io.Reader) error {
return perform(c, namespace, reader, createResource)
}
// Update reads in the current configuration and a modified configuration from io.reader
// and creates resources that don't already exists, updates resources that have been modified
// and deletes resources from the current configuration that are not present in the
// modified configuration
//
// Namespace will set the namespaces
func (c *Client) Update(namespace string, currentReader, modifiedReader io.Reader) error {
current := c.NewBuilder(includeThirdPartyAPIs).
ContinueOnError().
NamespaceParam(namespace).
DefaultNamespace().
Stream(currentReader, "").
Flatten().
Do()
modified := c.NewBuilder(includeThirdPartyAPIs).
ContinueOnError().
NamespaceParam(namespace).
DefaultNamespace().
Stream(modifiedReader, "").
Flatten().
Do()
currentInfos, err := current.Infos()
if err != nil {
return err
}
modifiedInfos := []*resource.Info{}
modified.Visit(func(info *resource.Info, err error) error {
modifiedInfos = append(modifiedInfos, info)
if err != nil {
return err
}
resourceName := info.Name
helper := resource.NewHelper(info.Client, info.Mapping)
if _, err := helper.Get(info.Namespace, resourceName, info.Export); err != nil {
if !errors.IsNotFound(err) {
return fmt.Errorf("Could not get information about the resource: err: %s", err)
}
// Since the resource does not exist, create it.
if err := createResource(info); err != nil {
return err
}
kind := info.Mapping.GroupVersionKind.Kind
log.Printf("Created a new %s called %s\n", kind, resourceName)
return nil
}
currentObj, err := getCurrentObject(resourceName, currentInfos)
if err != nil {
return err
}
if err := updateResource(info, currentObj); err != nil {
log.Printf("error updating the resource %s:\n\t %v", resourceName, err)
return err
}
return err
})
deleteUnwantedResources(currentInfos, modifiedInfos)
return nil
}
// Delete deletes kubernetes resources from an io.reader
//
// Namespace will set the namespace
@ -136,6 +213,51 @@ func createResource(info *resource.Info) error {
return err
}
func deleteResource(info *resource.Info) error {
return resource.NewHelper(info.Client, info.Mapping).Delete(info.Namespace, info.Name)
}
func updateResource(modified *resource.Info, currentObj runtime.Object) error {
encoder := api.Codecs.LegacyCodec(registered.EnabledVersions()...)
originalSerialization, err := runtime.Encode(encoder, currentObj)
if err != nil {
return err
}
editedSerialization, err := runtime.Encode(encoder, modified.Object)
if err != nil {
return err
}
originalJS, err := yaml.ToJSON(originalSerialization)
if err != nil {
return err
}
editedJS, err := yaml.ToJSON(editedSerialization)
if err != nil {
return err
}
if reflect.DeepEqual(originalJS, editedJS) {
return fmt.Errorf("Looks like there are no changes for %s", modified.Name)
}
patch, err := strategicpatch.CreateStrategicMergePatch(originalJS, editedJS, currentObj)
if err != nil {
return err
}
// send patch to server
helper := resource.NewHelper(modified.Client, modified.Mapping)
if _, err = helper.Patch(modified.Namespace, modified.Name, api.StrategicMergePatchType, patch); err != nil {
return err
}
return nil
}
func watchUntilReady(info *resource.Info) error {
w, err := resource.NewHelper(info.Client, info.Mapping).WatchSingle(info.Namespace, info.Name, info.ResourceVersion)
if err != nil {
@ -213,3 +335,37 @@ func (c *Client) ensureNamespace(namespace string) error {
}
return nil
}
func deleteUnwantedResources(currentInfos, modifiedInfos []*resource.Info) {
for _, cInfo := range currentInfos {
found := false
for _, m := range modifiedInfos {
if m.Name == cInfo.Name {
found = true
}
}
if !found {
log.Printf("Deleting %s...", cInfo.Name)
if err := deleteResource(cInfo); err != nil {
log.Printf("Failed to delete %s, err: %s", cInfo.Name, err)
}
}
}
}
func getCurrentObject(targetName string, infos []*resource.Info) (runtime.Object, error) {
var curr *resource.Info
for _, currInfo := range infos {
if currInfo.Name == targetName {
curr = currInfo
}
}
if curr == nil {
return nil, fmt.Errorf("No resource with the name %s found.", targetName)
}
encoder := api.Codecs.LegacyCodec(registered.EnabledVersions()...)
defaultVersion := unversioned.GroupVersion{}
return resource.AsVersionedObject([]*resource.Info{curr}, false, defaultVersion, encoder)
}

@ -17,15 +17,55 @@ limitations under the License.
package kube
import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"net/http"
"strings"
"testing"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
api "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/unversioned/fake"
"k8s.io/kubernetes/pkg/kubectl/resource"
"k8s.io/kubernetes/pkg/runtime"
)
func TestUpdateResource(t *testing.T) {
tests := []struct {
name string
namespace string
modified *resource.Info
currentObj runtime.Object
err bool
errMessage string
}{
{
name: "no changes when updating resources",
modified: createFakeInfo("nginx", nil),
currentObj: createFakePod("nginx", nil),
err: true,
errMessage: "Looks like there are no changes for nginx",
},
//{
//name: "valid update input",
//modified: createFakeInfo("nginx", map[string]string{"app": "nginx"}),
//currentObj: createFakePod("nginx", nil),
//},
}
for _, tt := range tests {
err := updateResource(tt.modified, tt.currentObj)
if err != nil && err.Error() != tt.errMessage {
t.Errorf("%q. expected error message: %v, got %v", tt.name, tt.errMessage, err)
}
}
}
func TestPerform(t *testing.T) {
tests := []struct {
name string
@ -214,3 +254,53 @@ spec:
ports:
- containerPort: 80
`
func createFakePod(name string, labels map[string]string) runtime.Object {
objectMeta := createObjectMeta(name, labels)
object := &api.Pod{
ObjectMeta: objectMeta,
}
return object
}
func createFakeInfo(name string, labels map[string]string) *resource.Info {
pod := createFakePod(name, labels)
marshaledObj, _ := json.Marshal(pod)
mapping := &meta.RESTMapping{
Resource: name,
Scope: meta.RESTScopeNamespace,
GroupVersionKind: unversioned.GroupVersionKind{
Kind: "Pod",
Version: "v1",
}}
client := &fake.RESTClient{
Codec: testapi.Default.Codec(),
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
header := http.Header{}
header.Set("Content-Type", runtime.ContentTypeJSON)
return &http.Response{
StatusCode: 200,
Header: header,
Body: ioutil.NopCloser(bytes.NewReader(marshaledObj)),
}, nil
})}
info := resource.NewInfo(client, mapping, "default", "nginx", false)
info.Object = pod
return info
}
func createObjectMeta(name string, labels map[string]string) api.ObjectMeta {
objectMeta := api.ObjectMeta{Name: name, Namespace: "default"}
if labels != nil {
objectMeta.Labels = labels
}
return objectMeta
}

Loading…
Cancel
Save