feat(kube): add update logic to kube client

This commit adds an Update function to the client.
The Update function takes in the modified manifests and
the original manifests. It then iterates through the modified
objects, creates objects not found in kubernetes, and updates
objects that exists but have been modified. Finally, it
iterates through the original resources and checks to see if
they have been deleted in the modified configuration and then
proceeds to delete them. #690
pull/1026/head
Michelle Noorali 9 years ago
parent 584245eadf
commit f600b30c7a

@ -161,6 +161,15 @@ type KubeClient interface {
// For all other kinds, it means the kind was created or modified without
// error.
WatchUntilReady(namespace string, reader io.Reader) error
// Update updates one or more resources or creates the resource
// if it doesn't exist
//
// namespace must contain a valid existing namespace
//
// reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n").
Update(namespace string, originalReader, modifiedReader io.Reader) error
}
// PrintingKubeClient implements KubeClient, but simply prints the reader to
@ -189,6 +198,12 @@ func (p *PrintingKubeClient) WatchUntilReady(ns string, r io.Reader) error {
return err
}
// Update implements KubeClient Update.
func (p *PrintingKubeClient) Update(ns string, currentReader, modifiedReader io.Reader) error {
_, err := io.Copy(p.Out, modifiedReader)
return err
}
// Environment provides the context for executing a client request.
//
// All services in a context are concurrency safe.

@ -83,6 +83,9 @@ func (k *mockKubeClient) Create(ns string, r io.Reader) error {
func (k *mockKubeClient) Delete(ns string, r io.Reader) error {
return nil
}
func (k *mockKubeClient) Update(ns string, currentReader, modifiedReader io.Reader) error {
return nil
}
func (k *mockKubeClient) WatchUntilReady(ns string, r io.Reader) error {
return nil
}

@ -20,15 +20,21 @@ import (
"fmt"
"io"
"log"
"reflect"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/kubectl"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/resource"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/strategicpatch"
"k8s.io/kubernetes/pkg/util/yaml"
"k8s.io/kubernetes/pkg/watch"
)
@ -57,6 +63,77 @@ func (c *Client) Create(namespace string, reader io.Reader) error {
return perform(c, namespace, reader, createResource)
}
// Update reads in the current configuration and a modified configuration from io.reader
// and creates resources that don't already exists, updates resources that have been modified
// and deletes resources from the current configuration that are not present in the
// modified configuration
//
// Namespace will set the namespaces
func (c *Client) Update(namespace string, currentReader, modifiedReader io.Reader) error {
current := c.NewBuilder(includeThirdPartyAPIs).
ContinueOnError().
NamespaceParam(namespace).
DefaultNamespace().
Stream(currentReader, "").
Flatten().
Do()
modified := c.NewBuilder(includeThirdPartyAPIs).
ContinueOnError().
NamespaceParam(namespace).
DefaultNamespace().
Stream(modifiedReader, "").
Flatten().
Do()
currentInfos, err := current.Infos()
if err != nil {
return err
}
modifiedInfos := []*resource.Info{}
modified.Visit(func(info *resource.Info, err error) error {
modifiedInfos = append(modifiedInfos, info)
if err != nil {
return err
}
resourceName := info.Name
helper := resource.NewHelper(info.Client, info.Mapping)
if _, err := helper.Get(info.Namespace, resourceName, info.Export); err != nil {
if !errors.IsNotFound(err) {
return fmt.Errorf("Could not get information about the resource: err: %s", err)
}
// Since the resource does not exist, create it.
if err := createResource(info); err != nil {
return err
}
kind := info.Mapping.GroupVersionKind.Kind
log.Printf("Created a new %s called %s\n", kind, resourceName)
return nil
}
currentObj, err := getCurrentObject(resourceName, currentInfos)
if err != nil {
return err
}
if err := updateResource(info, currentObj); err != nil {
log.Printf("error updating the resource %s:\n\t %v", resourceName, err)
return err
}
return err
})
deleteUnwantedResources(currentInfos, modifiedInfos)
return nil
}
// Delete deletes kubernetes resources from an io.reader
//
// Namespace will set the namespace
@ -136,6 +213,52 @@ func createResource(info *resource.Info) error {
return err
}
func deleteResource(info *resource.Info) error {
return resource.NewHelper(info.Client, info.Mapping).Delete(info.Namespace, info.Name)
}
func updateResource(modified *resource.Info, currentObj runtime.Object) error {
encoder := api.Codecs.LegacyCodec(registered.EnabledVersions()...)
originalSerialization, err := runtime.Encode(encoder, currentObj)
if err != nil {
return err
}
editedSerialization, err := runtime.Encode(encoder, modified.Object)
if err != nil {
return err
}
originalJS, err := yaml.ToJSON(originalSerialization)
if err != nil {
return err
}
editedJS, err := yaml.ToJSON(editedSerialization)
if err != nil {
return err
}
if reflect.DeepEqual(originalJS, editedJS) {
return fmt.Errorf("Looks like there are no changes for %s", modified.Name)
}
patch, err := strategicpatch.CreateStrategicMergePatch(originalJS, editedJS, currentObj)
if err != nil {
return err
}
// send patch to server
helper := resource.NewHelper(modified.Client, modified.Mapping)
_, err = helper.Patch(modified.Namespace, modified.Name, api.StrategicMergePatchType, patch)
if err != nil {
return err
}
return nil
}
func watchUntilReady(info *resource.Info) error {
w, err := resource.NewHelper(info.Client, info.Mapping).WatchSingle(info.Namespace, info.Name, info.ResourceVersion)
if err != nil {
@ -213,3 +336,37 @@ func (c *Client) ensureNamespace(namespace string) error {
}
return nil
}
func deleteUnwantedResources(currentInfos, modifiedInfos []*resource.Info) {
for _, cInfo := range currentInfos {
found := false
for _, m := range modifiedInfos {
if m.Name == cInfo.Name {
found = true
}
}
if !found {
log.Printf("Deleting %s...", cInfo.Name)
if err := deleteResource(cInfo); err != nil {
log.Printf("Failed to delete %s, err: %s", cInfo.Name, err)
}
}
}
}
func getCurrentObject(targetName string, infos []*resource.Info) (runtime.Object, error) {
var curr *resource.Info
for _, currInfo := range infos {
if currInfo.Name == targetName {
curr = currInfo
}
}
if curr == nil {
return nil, fmt.Errorf("No resource with the name %s found.", targetName)
}
encoder := api.Codecs.LegacyCodec(registered.EnabledVersions()...)
defaultVersion := unversioned.GroupVersion{}
return resource.AsVersionedObject([]*resource.Info{curr}, false, defaultVersion, encoder)
}

@ -17,15 +17,55 @@ limitations under the License.
package kube
import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"net/http"
"strings"
"testing"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
api "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/unversioned/fake"
"k8s.io/kubernetes/pkg/kubectl/resource"
"k8s.io/kubernetes/pkg/runtime"
)
func TestUpdateResource(t *testing.T) {
tests := []struct {
name string
namespace string
modified *resource.Info
currentObj runtime.Object
err bool
errMessage string
}{
{
name: "no changes when updating resources",
modified: createFakeInfo("nginx", nil),
currentObj: createFakePod("nginx", nil),
err: true,
errMessage: "Looks like there are no changes for nginx",
},
//{
//name: "valid update input",
//modified: createFakeInfo("nginx", map[string]string{"app": "nginx"}),
//currentObj: createFakePod("nginx", nil),
//},
}
for _, tt := range tests {
err := updateResource(tt.modified, tt.currentObj)
if err != nil && err.Error() != tt.errMessage {
t.Errorf("%q. expected error message: %v, got %v", tt.name, tt.errMessage, err)
}
}
}
func TestPerform(t *testing.T) {
tests := []struct {
name string
@ -214,3 +254,53 @@ spec:
ports:
- containerPort: 80
`
func createFakePod(name string, labels map[string]string) runtime.Object {
objectMeta := createObjectMeta(name, labels)
object := &api.Pod{
ObjectMeta: objectMeta,
}
return object
}
func createFakeInfo(name string, labels map[string]string) *resource.Info {
pod := createFakePod(name, labels)
marshaledObj, _ := json.Marshal(pod)
mapping := &meta.RESTMapping{
Resource: name,
Scope: meta.RESTScopeNamespace,
GroupVersionKind: unversioned.GroupVersionKind{
Kind: "Pod",
Version: "v1",
}}
client := &fake.RESTClient{
Codec: testapi.Default.Codec(),
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
header := http.Header{}
header.Set("Content-Type", runtime.ContentTypeJSON)
return &http.Response{
StatusCode: 200,
Header: header,
Body: ioutil.NopCloser(bytes.NewReader(marshaledObj)),
}, nil
})}
info := resource.NewInfo(client, mapping, "default", "nginx", false)
info.Object = pod
return info
}
func createObjectMeta(name string, labels map[string]string) api.ObjectMeta {
objectMeta := api.ObjectMeta{Name: name, Namespace: "default"}
if labels != nil {
objectMeta.Labels = labels
}
return objectMeta
}

Loading…
Cancel
Save