ref(kube): add Result to abstract a collection of resources

pull/1823/head
Adam Reese 8 years ago
parent 6f77869d41
commit b22e6fcfaa

@ -28,7 +28,6 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apimachinery/registered"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
@ -65,15 +64,6 @@ func New(config clientcmd.ClientConfig) *Client {
// ResourceActorFunc performs an action on a single resource.
type ResourceActorFunc func(*resource.Info) error
// ErrAlreadyExists can be returned where there are no changes
type ErrAlreadyExists struct {
errorMsg string
}
func (e ErrAlreadyExists) Error() string {
return fmt.Sprintf("Looks like there are no changes for %s", e.errorMsg)
}
// Create creates kubernetes resources from an io.reader
//
// Namespace will set the namespace
@ -104,8 +94,10 @@ func (c *Client) newBuilder(namespace string, reader io.Reader) *resource.Result
}
// Build validates for Kubernetes objects and returns resource Infos from a io.Reader.
func (c *Client) Build(namespace string, reader io.Reader) ([]*resource.Info, error) {
return c.newBuilder(namespace, reader).Infos()
func (c *Client) Build(namespace string, reader io.Reader) (Result, error) {
var result Result
result, err := c.newBuilder(namespace, reader).Infos()
return result, err
}
// Get gets kubernetes resources as pretty printed string
@ -167,22 +159,20 @@ func (c *Client) Get(namespace string, reader io.Reader) (string, error) {
// not present in the target configuration
//
// Namespace will set the namespaces
func (c *Client) Update(namespace string, currentReader, targetReader io.Reader, recreate bool) error {
currentInfos, err := c.Build(namespace, currentReader)
func (c *Client) Update(namespace string, originalReader, targetReader io.Reader, recreate bool) error {
original, err := c.Build(namespace, originalReader)
if err != nil {
return fmt.Errorf("failed decoding reader into objects: %s", err)
}
target := c.newBuilder(namespace, targetReader)
if target.Err() != nil {
return fmt.Errorf("failed decoding reader into objects: %s", target.Err())
target, err := c.Build(namespace, targetReader)
if err != nil {
return fmt.Errorf("failed decoding reader into objects: %s", err)
}
targetInfos := []*resource.Info{}
updateErrors := []string{}
err = target.Visit(func(info *resource.Info, err error) error {
targetInfos = append(targetInfos, info)
if err != nil {
return err
}
@ -203,18 +193,19 @@ func (c *Client) Update(namespace string, currentReader, targetReader io.Reader,
return nil
}
currentObj, err := getCurrentObject(info, currentInfos)
originalInfo := original.Get(info)
if originalInfo == nil {
return fmt.Errorf("no resource with the name %s found", info.Name)
}
versionedObject, err := originalInfo.Mapping.ConvertToVersion(originalInfo.Object, originalInfo.Mapping.GroupVersionKind.GroupVersion())
if err != nil {
return err
}
if err := updateResource(c, info, currentObj, recreate); err != nil {
if alreadyExistErr, ok := err.(ErrAlreadyExists); ok {
log.Printf(alreadyExistErr.errorMsg)
} else {
log.Printf("error updating the resource %s:\n\t %v", info.Name, err)
updateErrors = append(updateErrors, err.Error())
}
if err := updateResource(c, info, versionedObject, recreate); err != nil {
log.Printf("error updating the resource %s:\n\t %v", info.Name, err)
updateErrors = append(updateErrors, err.Error())
}
return nil
@ -225,7 +216,13 @@ func (c *Client) Update(namespace string, currentReader, targetReader io.Reader,
} else if len(updateErrors) != 0 {
return fmt.Errorf(strings.Join(updateErrors, " && "))
}
deleteUnwantedResources(currentInfos, targetInfos)
for _, info := range original.Difference(target) {
log.Printf("Deleting %s in %s...", info.Name, info.Namespace)
if err := deleteResource(info); err != nil {
log.Printf("Failed to delete %s, err: %s", info.Name, err)
}
}
return nil
}
@ -311,7 +308,7 @@ func deleteResource(info *resource.Info) error {
}
func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, recreate bool) error {
encoder := api.Codecs.LegacyCodec(registered.EnabledVersions()...)
encoder := c.JSONEncoder()
original, err := runtime.Encode(encoder, currentObj)
if err != nil {
return err
@ -323,7 +320,8 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object,
}
if api.Semantic.DeepEqual(original, modified) {
return ErrAlreadyExists{target.Name}
log.Printf("Looks like there are no changes for %s", target.Name)
return nil
}
patch, err := strategicpatch.CreateTwoWayMergePatch(original, modified, currentObj)
@ -453,39 +451,6 @@ func waitForJob(e watch.Event, name string) (bool, error) {
return false, nil
}
func deleteUnwantedResources(currentInfos, targetInfos []*resource.Info) {
for _, cInfo := range currentInfos {
if _, ok := findMatchingInfo(cInfo, targetInfos); !ok {
log.Printf("Deleting %s...", cInfo.Name)
if err := deleteResource(cInfo); err != nil {
log.Printf("Failed to delete %s, err: %s", cInfo.Name, err)
}
}
}
}
func getCurrentObject(target *resource.Info, infos []*resource.Info) (runtime.Object, error) {
if found, ok := findMatchingInfo(target, infos); ok {
return found.Mapping.ConvertToVersion(found.Object, found.Mapping.GroupVersionKind.GroupVersion())
}
return nil, fmt.Errorf("no resource with the name %s found", target.Name)
}
// isMatchingInfo returns true if infos match on Name and Kind.
func isMatchingInfo(a, b *resource.Info) bool {
return a.Name == b.Name && a.Mapping.GroupVersionKind.Kind == b.Mapping.GroupVersionKind.Kind
}
// findMatchingInfo returns the first object that matches target.
func findMatchingInfo(target *resource.Info, infos []*resource.Info) (*resource.Info, bool) {
for _, info := range infos {
if isMatchingInfo(target, info) {
return info, true
}
}
return nil, false
}
// scrubValidationError removes kubectl info from the message
func scrubValidationError(err error) error {
const stopValidateMessage = "if you choose to ignore these errors, turn validation off with --validate=false"

@ -40,7 +40,10 @@ func objBody(codec runtime.Codec, obj runtime.Object) io.ReadCloser {
func newPod(name string) api.Pod {
return api.Pod{
ObjectMeta: api.ObjectMeta{Name: name},
ObjectMeta: api.ObjectMeta{
Name: name,
Namespace: api.NamespaceDefault,
},
Spec: api.PodSpec{
Containers: []api.Container{{
Name: "app:v4",
@ -90,13 +93,13 @@ func TestUpdate(t *testing.T) {
p, m := req.URL.Path, req.Method
actions[p] = m
switch {
case p == "/namespaces/test/pods/starfish" && m == "GET":
case p == "/namespaces/default/pods/starfish" && m == "GET":
return newResponse(200, &listA.Items[0])
case p == "/namespaces/test/pods/otter" && m == "GET":
case p == "/namespaces/default/pods/otter" && m == "GET":
return newResponse(200, &listA.Items[1])
case p == "/namespaces/test/pods/dolphin" && m == "GET":
case p == "/namespaces/default/pods/dolphin" && m == "GET":
return newResponse(404, notFoundBody())
case p == "/namespaces/test/pods/starfish" && m == "PATCH":
case p == "/namespaces/default/pods/starfish" && m == "PATCH":
data, err := ioutil.ReadAll(req.Body)
if err != nil {
t.Fatalf("could not dump request: %s", err)
@ -107,28 +110,28 @@ func TestUpdate(t *testing.T) {
t.Errorf("expected patch %s, got %s", expected, string(data))
}
return newResponse(200, &listB.Items[0])
case p == "/namespaces/test/pods" && m == "POST":
case p == "/namespaces/default/pods" && m == "POST":
return newResponse(200, &listB.Items[1])
case p == "/namespaces/test/pods/squid" && m == "DELETE":
case p == "/namespaces/default/pods/squid" && m == "DELETE":
return newResponse(200, &listB.Items[1])
default:
t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
t.Fatalf("unexpected request: %s %s", req.Method, req.URL.Path)
return nil, nil
}
}),
}
c := &Client{Factory: f}
if err := c.Update("test", objBody(codec, &listA), objBody(codec, &listB), false); err != nil {
if err := c.Update(api.NamespaceDefault, objBody(codec, &listA), objBody(codec, &listB), false); err != nil {
t.Fatal(err)
}
expectedActions := map[string]string{
"/namespaces/test/pods/dolphin": "GET",
"/namespaces/test/pods/otter": "GET",
"/namespaces/test/pods/starfish": "PATCH",
"/namespaces/test/pods": "POST",
"/namespaces/test/pods/squid": "DELETE",
"/namespaces/default/pods/dolphin": "GET",
"/namespaces/default/pods/otter": "GET",
"/namespaces/default/pods/starfish": "PATCH",
"/namespaces/default/pods": "POST",
"/namespaces/default/pods/squid": "DELETE",
}
for k, v := range expectedActions {

@ -0,0 +1,87 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube // import "k8s.io/helm/pkg/kube"
import "k8s.io/kubernetes/pkg/kubectl/resource"
// Result provides convenience methods for comparing collections of Infos.
type Result []*resource.Info
// Append adds an Info to the Result.
func (r *Result) Append(val *resource.Info) {
*r = append(*r, val)
}
// Visit implements resource.Visitor.
func (r Result) Visit(fn resource.VisitorFunc) error {
for _, i := range r {
if err := fn(i, nil); err != nil {
return err
}
}
return nil
}
// Filter returns a new Result with Infos that satisfy the predicate fn.
func (r Result) Filter(fn func(*resource.Info) bool) Result {
var result Result
for _, i := range r {
if fn(i) {
result.Append(i)
}
}
return result
}
// Get returns the Info from the result that matches the name and kind.
func (r Result) Get(info *resource.Info) *resource.Info {
for _, i := range r {
if isMatchingInfo(i, info) {
return i
}
}
return nil
}
// Contains checks to see if an object exists.
func (r Result) Contains(info *resource.Info) bool {
for _, i := range r {
if isMatchingInfo(i, info) {
return true
}
}
return false
}
// Difference will return a new Result with objects not contained in rs.
func (r Result) Difference(rs Result) Result {
return r.Filter(func(info *resource.Info) bool {
return !rs.Contains(info)
})
}
// Intersect will return a new Result with objects contained in both Results.
func (r Result) Intersect(rs Result) Result {
return r.Filter(func(info *resource.Info) bool {
return rs.Contains(info)
})
}
// isMatchingInfo returns true if infos match on Name and GroupVersionKind.
func isMatchingInfo(a, b *resource.Info) bool {
return a.Name == b.Name && a.Mapping.GroupVersionKind == b.Mapping.GroupVersionKind
}

@ -0,0 +1,58 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube // import "k8s.io/helm/pkg/kube"
import (
"testing"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/kubectl/resource"
)
func TestResult(t *testing.T) {
mapping, err := testapi.Default.RESTMapper().RESTMapping(unversioned.GroupKind{Kind: "Pod"})
if err != nil {
t.Fatal(err)
}
info := func(name string) *resource.Info {
return &resource.Info{Name: name, Mapping: mapping}
}
var r1, r2 Result
r1 = []*resource.Info{info("foo"), info("bar")}
r2 = []*resource.Info{info("bar")}
diff := r1.Difference(r2)
if len(diff) != 1 {
t.Error("expected 1 result")
}
if !diff.Contains(info("foo")) {
t.Error("expected diff to return foo")
}
inter := r1.Intersect(r2)
if len(inter) != 1 {
t.Error("expected 1 result")
}
if !inter.Contains(info("bar")) {
t.Error("expected intersect to return bar")
}
}

@ -134,7 +134,7 @@ type KubeClient interface {
// by "\n---\n").
Update(namespace string, originalReader, modifiedReader io.Reader, recreate bool) error
Build(namespace string, reader io.Reader) ([]*resource.Info, error)
Build(namespace string, reader io.Reader) (kube.Result, error)
}
// PrintingKubeClient implements KubeClient, but simply prints the reader to
@ -176,7 +176,7 @@ func (p *PrintingKubeClient) Update(ns string, currentReader, modifiedReader io.
}
// Build implements KubeClient Build.
func (p *PrintingKubeClient) Build(ns string, reader io.Reader) ([]*resource.Info, error) {
func (p *PrintingKubeClient) Build(ns string, reader io.Reader) (kube.Result, error) {
return []*resource.Info{}, nil
}

@ -22,6 +22,7 @@ import (
"testing"
"k8s.io/helm/pkg/chartutil"
"k8s.io/helm/pkg/kube"
"k8s.io/helm/pkg/proto/hapi/chart"
"k8s.io/kubernetes/pkg/kubectl/resource"
)
@ -51,7 +52,7 @@ func (k *mockKubeClient) Update(ns string, currentReader, modifiedReader io.Read
func (k *mockKubeClient) WatchUntilReady(ns string, r io.Reader, t int64) error {
return nil
}
func (k *mockKubeClient) Build(ns string, reader io.Reader) ([]*resource.Info, error) {
func (k *mockKubeClient) Build(ns string, reader io.Reader) (kube.Result, error) {
return []*resource.Info{}, nil
}

Loading…
Cancel
Save