From 0338576fc5202d003e346f41069d50e4f006a285 Mon Sep 17 00:00:00 2001 From: Adam Reese Date: Wed, 15 May 2019 12:31:47 -0700 Subject: [PATCH] ref(pkg/kube): cleanup kube client interface * move the main interface to it's own file * removed summarizeKeptManifests() which was the last place kube.Get() was called * when polling for hooks, use external types * refactor out legacyschema * refactor detecting selectors from object * refactor creating test client Signed-off-by: Adam Reese --- Gopkg.lock | 103 +--------- Gopkg.toml | 2 +- cmd/helm/root.go | 2 +- pkg/action/action.go | 2 +- pkg/action/install.go | 59 ++++-- pkg/action/install_test.go | 8 +- pkg/action/printer.go | 2 +- pkg/action/resource_policy.go | 25 +-- pkg/action/rollback.go | 6 +- pkg/action/uninstall.go | 4 +- pkg/action/upgrade.go | 4 +- pkg/kube/client.go | 256 +++-------------------- pkg/kube/client_test.go | 73 +------ pkg/kube/converter.go | 10 +- pkg/kube/interface.go | 66 ++++++ pkg/kube/printer.go | 62 +----- pkg/kube/printer_test.go | 79 -------- pkg/kube/result.go | 4 +- pkg/kube/wait.go | 280 +++++++++++++------------- pkg/releasetesting/environment.go | 2 +- pkg/releasetesting/test_suite_test.go | 13 +- 21 files changed, 317 insertions(+), 745 deletions(-) create mode 100644 pkg/kube/interface.go delete mode 100644 pkg/kube/printer_test.go diff --git a/Gopkg.lock b/Gopkg.lock index de97fdbdf..f93bf58d3 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -187,19 +187,6 @@ revision = "7f2434bc10da710debe5c4315ed6d4df454b4024" version = "v0.1.0" -[[projects]] - branch = "master" - digest = "1:95e08278c876d185ba67533f045e9e63b3c9d02cbd60beb0f4dbaa2344a13ac2" - name = "github.com/chai2010/gettext-go" - packages = [ - "gettext", - "gettext/mo", - "gettext/plural", - "gettext/po", - ] - pruneopts = "UT" - revision = "bf70f2a70fb1b1f36d90d671a72795984eab0fcb" - [[projects]] digest = "1:37f8940c4d3c41536ea882b1ca3498e403c04892dfc34bd0d670ed9eafccda9a" name = "github.com/containerd/containerd" @@ -704,14 +691,6 @@ revision = "5c8c8bd35d3832f5d134ae1e1e375b69a4d25242" version = "v1.0.1" -[[projects]] - branch = "master" - digest = "1:10b85f58562d487a3bd7da6ba5b895bc221d5ecbd89df9c7c5a36004e827ade1" - name = "github.com/liggitt/tabwriter" - packages = ["."] - pruneopts = "UT" - revision = "89fcab3d43de07060e4fd4c1547430ed57e87f24" - [[projects]] branch = "master" digest = "1:84a5a2b67486d5d67060ac393aa255d05d24ed5ee41daecd5635ec22657b6492" @@ -1499,8 +1478,8 @@ "util/retry", ] pruneopts = "UT" - revision = "6ee68ca5fd8355d024d02f9db0b3b667e8357a0f" - version = "kubernetes-1.14.0" + revision = "1a26190bd76a9017e289958b9fba936430aa3704" + version = "kubernetes-1.14.1" [[projects]] branch = "master" @@ -1533,42 +1512,14 @@ [[projects]] branch = "release-1.14" - digest = "1:3e8a09f07ca1d0163720064d0bcb567fdc85338e02bd63c6d84786be8b24ebdb" + digest = "1:8e018df756b49f5fdc20e15041486520285829528df043457bb7a91886fa775b" name = "k8s.io/kubernetes" packages = [ "pkg/api/legacyscheme", "pkg/api/service", "pkg/api/v1/pod", "pkg/apis/apps", - "pkg/apis/apps/install", - "pkg/apis/apps/v1", - "pkg/apis/apps/v1beta1", - "pkg/apis/apps/v1beta2", - "pkg/apis/authentication", - "pkg/apis/authentication/install", - "pkg/apis/authentication/v1", - "pkg/apis/authentication/v1beta1", - "pkg/apis/authorization", - "pkg/apis/authorization/install", - "pkg/apis/authorization/v1", - "pkg/apis/authorization/v1beta1", "pkg/apis/autoscaling", - "pkg/apis/autoscaling/install", - "pkg/apis/autoscaling/v1", - "pkg/apis/autoscaling/v2beta1", - "pkg/apis/autoscaling/v2beta2", - "pkg/apis/batch", - "pkg/apis/batch/install", - "pkg/apis/batch/v1", - "pkg/apis/batch/v1beta1", - "pkg/apis/batch/v2alpha1", - "pkg/apis/certificates", - "pkg/apis/certificates/install", - "pkg/apis/certificates/v1beta1", - "pkg/apis/coordination", - "pkg/apis/coordination/install", - "pkg/apis/coordination/v1", - "pkg/apis/coordination/v1beta1", "pkg/apis/core", "pkg/apis/core/helper", "pkg/apis/core/install", @@ -1576,36 +1527,7 @@ "pkg/apis/core/v1", "pkg/apis/core/v1/helper", "pkg/apis/core/validation", - "pkg/apis/events", - "pkg/apis/events/install", - "pkg/apis/events/v1beta1", - "pkg/apis/extensions", - "pkg/apis/extensions/install", - "pkg/apis/extensions/v1beta1", - "pkg/apis/networking", - "pkg/apis/node", - "pkg/apis/policy", - "pkg/apis/policy/install", - "pkg/apis/policy/v1beta1", - "pkg/apis/rbac", - "pkg/apis/rbac/install", - "pkg/apis/rbac/v1", - "pkg/apis/rbac/v1alpha1", - "pkg/apis/rbac/v1beta1", "pkg/apis/scheduling", - "pkg/apis/scheduling/install", - "pkg/apis/scheduling/v1", - "pkg/apis/scheduling/v1alpha1", - "pkg/apis/scheduling/v1beta1", - "pkg/apis/settings", - "pkg/apis/settings/install", - "pkg/apis/settings/v1alpha1", - "pkg/apis/storage", - "pkg/apis/storage/install", - "pkg/apis/storage/util", - "pkg/apis/storage/v1", - "pkg/apis/storage/v1alpha1", - "pkg/apis/storage/v1beta1", "pkg/capabilities", "pkg/controller", "pkg/controller/deployment/util", @@ -1613,7 +1535,6 @@ "pkg/fieldpath", "pkg/kubectl", "pkg/kubectl/apps", - "pkg/kubectl/cmd/get", "pkg/kubectl/cmd/testing", "pkg/kubectl/cmd/util", "pkg/kubectl/cmd/util/openapi", @@ -1621,16 +1542,13 @@ "pkg/kubectl/cmd/util/openapi/validation", "pkg/kubectl/describe", "pkg/kubectl/describe/versioned", - "pkg/kubectl/generated", "pkg/kubectl/scheme", "pkg/kubectl/util", "pkg/kubectl/util/certificate", "pkg/kubectl/util/deployment", "pkg/kubectl/util/event", "pkg/kubectl/util/fieldpath", - "pkg/kubectl/util/i18n", "pkg/kubectl/util/podutils", - "pkg/kubectl/util/printers", "pkg/kubectl/util/qos", "pkg/kubectl/util/rbac", "pkg/kubectl/util/resource", @@ -1641,14 +1559,11 @@ "pkg/kubectl/validation", "pkg/kubelet/types", "pkg/master/ports", - "pkg/printers", - "pkg/printers/internalversion", "pkg/security/apparmor", "pkg/serviceaccount", "pkg/util/hash", "pkg/util/interrupt", "pkg/util/labels", - "pkg/util/node", "pkg/util/parsers", "pkg/util/taints", "pkg/version", @@ -1720,14 +1635,6 @@ revision = "fd68e9863619f6ec2fdd8625fe1f02e7c877e480" version = "v1.1.0" -[[projects]] - branch = "master" - digest = "1:9132eacc44d9bd1e03145ea2e9d4888800da7773d6edebb401f8cd34c9fb8380" - name = "vbom.ml/util" - packages = ["sortorder"] - pruneopts = "UT" - revision = "efcd4e0f97874370259c7d93e12aad57911dea81" - [solve-meta] analyzer-name = "dep" analyzer-version = 1 @@ -1783,7 +1690,6 @@ "k8s.io/apimachinery/pkg/api/errors", "k8s.io/apimachinery/pkg/api/meta", "k8s.io/apimachinery/pkg/apis/meta/v1", - "k8s.io/apimachinery/pkg/fields", "k8s.io/apimachinery/pkg/labels", "k8s.io/apimachinery/pkg/runtime", "k8s.io/apimachinery/pkg/runtime/schema", @@ -1805,12 +1711,9 @@ "k8s.io/client-go/tools/watch", "k8s.io/client-go/util/homedir", "k8s.io/kubernetes/pkg/api/legacyscheme", - "k8s.io/kubernetes/pkg/apis/batch", "k8s.io/kubernetes/pkg/controller/deployment/util", - "k8s.io/kubernetes/pkg/kubectl/cmd/get", "k8s.io/kubernetes/pkg/kubectl/cmd/testing", "k8s.io/kubernetes/pkg/kubectl/cmd/util", - "k8s.io/kubernetes/pkg/kubectl/scheme", "k8s.io/kubernetes/pkg/kubectl/validation", ] solver-name = "gps-cdcl" diff --git a/Gopkg.toml b/Gopkg.toml index c2e0ae425..26daeee8a 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -36,7 +36,7 @@ [[constraint]] name = "k8s.io/client-go" - version = "kubernetes-1.14.0" + version = "kubernetes-1.14.1" [[constraint]] name = "k8s.io/kubernetes" diff --git a/cmd/helm/root.go b/cmd/helm/root.go index 0fa1af8c4..69e9a1f87 100644 --- a/cmd/helm/root.go +++ b/cmd/helm/root.go @@ -82,7 +82,7 @@ func newRootCmd(actionConfig *action.Configuration, out io.Writer, args []string } actionConfig.RegistryClient = registry.NewClient(®istry.ClientOptions{ Debug: settings.Debug, - Out: out, + Out: out, Authorizer: registry.Authorizer{ Client: client, }, diff --git a/pkg/action/action.go b/pkg/action/action.go index 5fa941964..f69c16869 100644 --- a/pkg/action/action.go +++ b/pkg/action/action.go @@ -70,7 +70,7 @@ type Configuration struct { Releases *storage.Storage // KubeClient is a Kubernetes API client. - KubeClient kube.KubernetesClient + KubeClient kube.Interface // RegistryClient is a client for working with registries RegistryClient *registry.Client diff --git a/pkg/action/install.go b/pkg/action/install.go index cb7d48834..26fbc082d 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -191,6 +191,7 @@ func (i *Install) Run(chrt *chart.Chart) (*release.Release, error) { } if i.Wait { + buf := bytes.NewBufferString(rel.Manifest) if err := i.cfg.KubeClient.Wait(buf, i.Timeout); err != nil { rel.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", i.ReleaseName, err.Error())) i.recordRelease(rel) // Ignore the error, since we have another error to deal with. @@ -623,7 +624,7 @@ func (v *ValueOptions) MergeValues(settings cli.EnvSettings) error { return errors.Wrapf(err, "failed to parse %s", filePath) } // Merge with the previous map - base = MergeValues(base, currentMap) + base = mergeMaps(base, currentMap) } // User specified a value via --set @@ -644,31 +645,47 @@ func (v *ValueOptions) MergeValues(settings cli.EnvSettings) error { return nil } -// MergeValues merges source and destination map, preferring values from the source map -func MergeValues(dest, src map[string]interface{}) map[string]interface{} { +// mergeValues merges source and destination map, preferring values from the source map +func mergeValues(dest, src map[string]interface{}) map[string]interface{} { + out := make(map[string]interface{}) + for k, v := range dest { + out[k] = v + } for k, v := range src { - // If the key doesn't exist already, then just set the key to that value - if _, exists := dest[k]; !exists { - dest[k] = v - continue - } - nextMap, ok := v.(map[string]interface{}) - // If it isn't another map, overwrite the value - if !ok { - dest[k] = v + if _, ok := out[k]; !ok { + // If the key doesn't exist already, then just set the key to that value + } else if nextMap, ok := v.(map[string]interface{}); !ok { + // If it isn't another map, overwrite the value + } else if destMap, isMap := out[k].(map[string]interface{}); !isMap { + // Edge case: If the key exists in the destination, but isn't a map + // If the source map has a map for this key, prefer it + } else { + // If we got to this point, it is a map in both, so merge them + out[k] = mergeValues(destMap, nextMap) continue } - // Edge case: If the key exists in the destination, but isn't a map - destMap, isMap := dest[k].(map[string]interface{}) - // If the source map has a map for this key, prefer it - if !isMap { - dest[k] = v - continue + out[k] = v + } + return out +} + +func mergeMaps(a, b map[string]interface{}) map[string]interface{} { + out := make(map[string]interface{}, len(a)) + for k, v := range a { + out[k] = v + } + for k, v := range b { + if v, ok := v.(map[string]interface{}); ok { + if bv, ok := out[k]; ok { + if bv, ok := bv.(map[string]interface{}); ok { + out[k] = mergeMaps(bv, v) + continue + } + } } - // If we got to this point, it is a map in both, so merge them - dest[k] = MergeValues(destMap, nextMap) + out[k] = v } - return dest + return out } // readFile load a file from stdin, the local directory, or a remote file with a url. diff --git a/pkg/action/install_test.go b/pkg/action/install_test.go index 82a1420fb..063be75a0 100644 --- a/pkg/action/install_test.go +++ b/pkg/action/install_test.go @@ -322,25 +322,25 @@ func TestMergeValues(t *testing.T) { "testing": "fun", } - testMap := MergeValues(flatMap, nestedMap) + testMap := mergeValues(flatMap, nestedMap) equal := reflect.DeepEqual(testMap, nestedMap) if !equal { t.Errorf("Expected a nested map to overwrite a flat value. Expected: %v, got %v", nestedMap, testMap) } - testMap = MergeValues(nestedMap, flatMap) + testMap = mergeValues(nestedMap, flatMap) equal = reflect.DeepEqual(testMap, flatMap) if !equal { t.Errorf("Expected a flat value to overwrite a map. Expected: %v, got %v", flatMap, testMap) } - testMap = MergeValues(nestedMap, anotherNestedMap) + testMap = mergeValues(nestedMap, anotherNestedMap) equal = reflect.DeepEqual(testMap, anotherNestedMap) if !equal { t.Errorf("Expected a nested map to overwrite another nested map. Expected: %v, got %v", anotherNestedMap, testMap) } - testMap = MergeValues(anotherFlatMap, anotherNestedMap) + testMap = mergeValues(anotherFlatMap, anotherNestedMap) expectedMap := map[string]interface{}{ "testing": "fun", "foo": "bar", diff --git a/pkg/action/printer.go b/pkg/action/printer.go index abc0ce243..3e23448a9 100644 --- a/pkg/action/printer.go +++ b/pkg/action/printer.go @@ -57,7 +57,7 @@ func PrintRelease(out io.Writer, rel *release.Release) { } if strings.EqualFold(rel.Info.Description, "Dry run complete") { - fmt.Fprintf(out, "MANIFEST:\n%s\n", rel.Manifest) + fmt.Fprintf(out, "MANIFEST:\n%s\n", rel.Manifest) } if len(rel.Info.Notes) > 0 { diff --git a/pkg/action/resource_policy.go b/pkg/action/resource_policy.go index 74a547e8f..5a0244bcd 100644 --- a/pkg/action/resource_policy.go +++ b/pkg/action/resource_policy.go @@ -17,10 +17,8 @@ limitations under the License. package action import ( - "bytes" "strings" - "helm.sh/helm/pkg/kube" "helm.sh/helm/pkg/releaseutil" ) @@ -33,10 +31,7 @@ const resourcePolicyAnno = "helm.sh/resource-policy" // during an uninstallRelease action. const keepPolicy = "keep" -func filterManifestsToKeep(manifests []releaseutil.Manifest) ([]releaseutil.Manifest, []releaseutil.Manifest) { - remaining := []releaseutil.Manifest{} - keep := []releaseutil.Manifest{} - +func filterManifestsToKeep(manifests []releaseutil.Manifest) (keep, remaining []releaseutil.Manifest) { for _, m := range manifests { if m.Head.Metadata == nil || m.Head.Metadata.Annotations == nil || len(m.Head.Metadata.Annotations) == 0 { remaining = append(remaining, m) @@ -57,21 +52,3 @@ func filterManifestsToKeep(manifests []releaseutil.Manifest) ([]releaseutil.Mani } return keep, remaining } - -func summarizeKeptManifests(manifests []releaseutil.Manifest, kubeClient kube.KubernetesClient) string { - var message string - for _, m := range manifests { - // check if m is in fact present from k8s client's POV. - output, err := kubeClient.Get(bytes.NewBufferString(m.Content)) - if err != nil || strings.Contains(output, kube.MissingGetHeader) { - continue - } - - details := "[" + m.Head.Kind + "] " + m.Head.Metadata.Name + "\n" - if message == "" { - message = "These resources were kept due to the resource policy:\n" - } - message += details - } - return message -} diff --git a/pkg/action/rollback.go b/pkg/action/rollback.go index e3fcfee04..68f508ceb 100644 --- a/pkg/action/rollback.go +++ b/pkg/action/rollback.go @@ -235,11 +235,9 @@ func (r *Rollback) execHook(hs []*release.Hook, hook string) error { // deleteHookByPolicy deletes a hook if the hook policy instructs it to func deleteHookByPolicy(cfg *Configuration, h *release.Hook, policy string) error { - b := bytes.NewBufferString(h.Manifest) if hookHasDeletePolicy(h, policy) { - if errHookDelete := cfg.KubeClient.Delete(b); errHookDelete != nil { - return errHookDelete - } + b := bytes.NewBufferString(h.Manifest) + return cfg.KubeClient.Delete(b) } return nil } diff --git a/pkg/action/uninstall.go b/pkg/action/uninstall.go index f07b23ec1..e3a1c1309 100644 --- a/pkg/action/uninstall.go +++ b/pkg/action/uninstall.go @@ -218,8 +218,8 @@ func (u *Uninstall) deleteRelease(rel *release.Release) (kept string, errs []err } filesToKeep, filesToDelete := filterManifestsToKeep(files) - if len(filesToKeep) > 0 { - kept = summarizeKeptManifests(filesToKeep, u.cfg.KubeClient) + for _, f := range filesToKeep { + kept += f.Name + "\n" } for _, file := range filesToDelete { diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index 07ad5606a..f535c0d6c 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -275,8 +275,8 @@ func (u *Upgrade) reuseValues(chart *chart.Chart, current *release.Release) erro return nil } -func validateManifest(c kube.KubernetesClient, manifest []byte) error { - _, err := c.BuildUnstructured(bytes.NewReader(manifest)) +func validateManifest(c kube.Interface, manifest []byte) error { + _, err := c.Build(bytes.NewReader(manifest)) return err } diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 52bff7e3f..8df24bef5 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -17,7 +17,6 @@ limitations under the License. package kube // import "helm.sh/helm/pkg/kube" import ( - "bytes" "context" "encoding/json" "fmt" @@ -27,18 +26,12 @@ import ( "time" jsonpatch "github.com/evanphx/json-patch" - goerrors "github.com/pkg/errors" - appsv1 "k8s.io/api/apps/v1" - appsv1beta1 "k8s.io/api/apps/v1beta1" - appsv1beta2 "k8s.io/api/apps/v1beta2" + "github.com/pkg/errors" batch "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" - extv1beta1 "k8s.io/api/extensions/v1beta1" apiequality "k8s.io/apimachinery/pkg/api/equality" - "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" @@ -46,18 +39,13 @@ import ( "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" watchtools "k8s.io/client-go/tools/watch" - "k8s.io/kubernetes/pkg/api/legacyscheme" - batchinternal "k8s.io/kubernetes/pkg/apis/batch" - "k8s.io/kubernetes/pkg/kubectl/cmd/get" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" ) -// MissingGetHeader is added to Get's output when a resource is not found. -const MissingGetHeader = "==> MISSING\nKIND\t\tNAME\n" - // ErrNoObjectsVisited indicates that during a visit operation, no matching objects were found. -var ErrNoObjectsVisited = goerrors.New("no objects visited") +var ErrNoObjectsVisited = errors.New("no objects visited") // Client represents a client capable of communicating with the Kubernetes API. type Client struct { @@ -83,9 +71,6 @@ func (c *Client) KubernetesClientSet() (*kubernetes.Clientset, error) { var nopLogger = func(_ string, _ ...interface{}) {} -// resourceActorFunc performs an action on a single resource. -type resourceActorFunc func(*resource.Info) error - // Create creates Kubernetes resources from an io.reader. // // Namespace will set the namespace. @@ -96,8 +81,7 @@ func (c *Client) Create(reader io.Reader) error { return err } c.Log("creating %d resource(s)", len(infos)) - err = perform(infos, createResource) - return err + return perform(infos, createResource) } func (c *Client) Wait(reader io.Reader, timeout time.Duration) error { @@ -144,8 +128,6 @@ func (c *Client) validator() resource.ContentValidator { // BuildUnstructured validates for Kubernetes objects and returns unstructured infos. func (c *Client) BuildUnstructured(reader io.Reader) (Result, error) { - var result Result - result, err := c.newBuilder(). Unstructured(). Stream(reader, ""). @@ -155,9 +137,8 @@ func (c *Client) BuildUnstructured(reader io.Reader) (Result, error) { // Build validates for Kubernetes objects and returns resource Infos from a io.Reader. func (c *Client) Build(reader io.Reader) (Result, error) { - var result Result result, err := c.newBuilder(). - WithScheme(legacyscheme.Scheme). + WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...). Schema(c.validator()). Stream(reader, ""). Do(). @@ -165,82 +146,6 @@ func (c *Client) Build(reader io.Reader) (Result, error) { return result, scrubValidationError(err) } -// Get gets Kubernetes resources as pretty-printed string. -// -// Namespace will set the namespace. -func (c *Client) Get(reader io.Reader) (string, error) { - // Since we don't know what order the objects come in, let's group them by the types, so - // that when we print them, they come out looking good (headers apply to subgroups, etc.). - objs := make(map[string][]runtime.Object) - infos, err := c.BuildUnstructured(reader) - if err != nil { - return "", err - } - - var objPods = make(map[string][]v1.Pod) - - missing := []string{} - err = perform(infos, func(info *resource.Info) error { - c.Log("Doing get for %s: %q", info.Mapping.GroupVersionKind.Kind, info.Name) - if err := info.Get(); err != nil { - c.Log("WARNING: Failed Get for resource %q: %s", info.Name, err) - missing = append(missing, fmt.Sprintf("%v\t\t%s", info.Mapping.Resource, info.Name)) - return nil - } - - // Use APIVersion/Kind as grouping mechanism. I'm not sure if you can have multiple - // versions per cluster, but this certainly won't hurt anything, so let's be safe. - gvk := info.ResourceMapping().GroupVersionKind - vk := gvk.Version + "/" + gvk.Kind - objs[vk] = append(objs[vk], asVersioned(info)) - - // Get the relation pods - objPods, err = c.getSelectRelationPod(info, objPods) - if err != nil { - c.Log("Warning: get the relation pod is failed, err:%s", err) - } - - return nil - }) - if err != nil { - return "", err - } - - // here, we will add the objPods to the objs - for key, podItems := range objPods { - for i := range podItems { - objs[key+"(related)"] = append(objs[key+"(related)"], &podItems[i]) - } - } - - // Ok, now we have all the objects grouped by types (say, by v1/Pod, v1/Service, etc.), so - // spin through them and print them. Printer is cool since it prints the header only when - // an object type changes, so we can just rely on that. Problem is it doesn't seem to keep - // track of tab widths. - buf := new(bytes.Buffer) - p, _ := get.NewHumanPrintFlags().ToPrinter("") - for t, ot := range objs { - if _, err = buf.WriteString("==> " + t + "\n"); err != nil { - return "", err - } - for _, o := range ot { - if err := p.PrintObj(o, buf); err != nil { - return "", goerrors.Wrapf(err, "failed to print object type %s, object: %q", t, o) - } - } - if _, err := buf.WriteString("\n"); err != nil { - return "", err - } - } - if len(missing) > 0 { - buf.WriteString(MissingGetHeader) - for _, s := range missing { - fmt.Fprintln(buf, s) - } - } - return buf.String(), nil -} - // Update reads in the current configuration and a target configuration from io.reader // and creates resources that don't already exists, updates resources that have been modified // in the target configuration and deletes resources from the current configuration that are @@ -250,13 +155,13 @@ func (c *Client) Get(reader io.Reader) (string, error) { func (c *Client) Update(originalReader, targetReader io.Reader, force, recreate bool) error { original, err := c.BuildUnstructured(originalReader) if err != nil { - return goerrors.Wrap(err, "failed decoding reader into objects") + return errors.Wrap(err, "failed decoding reader into objects") } c.Log("building resources from updated manifest") target, err := c.BuildUnstructured(targetReader) if err != nil { - return goerrors.Wrap(err, "failed decoding reader into objects") + return errors.Wrap(err, "failed decoding reader into objects") } updateErrors := []string{} @@ -269,13 +174,13 @@ func (c *Client) Update(originalReader, targetReader io.Reader, force, recreate helper := resource.NewHelper(info.Client, info.Mapping) if _, err := helper.Get(info.Namespace, info.Name, info.Export); err != nil { - if !errors.IsNotFound(err) { - return goerrors.Wrap(err, "could not get information about the resource") + if !apierrors.IsNotFound(err) { + return errors.Wrap(err, "could not get information about the resource") } // Since the resource does not exist, create it. if err := createResource(info); err != nil { - return goerrors.Wrap(err, "failed to create resource") + return errors.Wrap(err, "failed to create resource") } kind := info.Mapping.GroupVersionKind.Kind @@ -286,7 +191,7 @@ func (c *Client) Update(originalReader, targetReader io.Reader, force, recreate originalInfo := original.Get(info) if originalInfo == nil { kind := info.Mapping.GroupVersionKind.Kind - return goerrors.Errorf("no %s with the name %q found", kind, info.Name) + return errors.Errorf("no %s with the name %q found", kind, info.Name) } if err := updateResource(c, info, originalInfo.Object, force, recreate); err != nil { @@ -301,7 +206,7 @@ func (c *Client) Update(originalReader, targetReader io.Reader, force, recreate case err != nil: return err case len(updateErrors) != 0: - return goerrors.Errorf(strings.Join(updateErrors, " && ")) + return errors.Errorf(strings.Join(updateErrors, " && ")) } for _, info := range original.Difference(target) { @@ -329,14 +234,14 @@ func (c *Client) Delete(reader io.Reader) error { } func (c *Client) skipIfNotFound(err error) error { - if errors.IsNotFound(err) { + if apierrors.IsNotFound(err) { c.Log("%v", err) return nil } return err } -func (c *Client) watchTimeout(t time.Duration) resourceActorFunc { +func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error { return func(info *resource.Info) error { return c.watchUntilReady(t, info) } @@ -364,7 +269,7 @@ func (c *Client) WatchUntilReady(reader io.Reader, timeout time.Duration) error return perform(infos, c.watchTimeout(timeout)) } -func perform(infos Result, fn resourceActorFunc) error { +func perform(infos Result, fn func(*resource.Info) error) error { if len(infos) == 0 { return ErrNoObjectsVisited } @@ -395,11 +300,11 @@ func deleteResource(info *resource.Info) error { func createPatch(target *resource.Info, current runtime.Object) ([]byte, types.PatchType, error) { oldData, err := json.Marshal(current) if err != nil { - return nil, types.StrategicMergePatchType, goerrors.Wrap(err, "serializing current configuration") + return nil, types.StrategicMergePatchType, errors.Wrap(err, "serializing current configuration") } newData, err := json.Marshal(target.Object) if err != nil { - return nil, types.StrategicMergePatchType, goerrors.Wrap(err, "serializing target configuration") + return nil, types.StrategicMergePatchType, errors.Wrap(err, "serializing target configuration") } // While different objects need different merge types, the parent function @@ -429,14 +334,14 @@ func createPatch(target *resource.Info, current runtime.Object) ([]byte, types.P func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, force, recreate bool) error { patch, patchType, err := createPatch(target, currentObj) if err != nil { - return goerrors.Wrap(err, "failed to create patch") + return errors.Wrap(err, "failed to create patch") } if patch == nil { c.Log("Looks like there are no changes for %s %q", target.Mapping.GroupVersionKind.Kind, target.Name) // This needs to happen to make sure that tiller has the latest info from the API // Otherwise there will be no labels and other functions that use labels will panic if err := target.Get(); err != nil { - return goerrors.Wrap(err, "error trying to refresh resource information") + return errors.Wrap(err, "error trying to refresh resource information") } } else { // send patch to server @@ -456,7 +361,7 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, // ... and recreate if err := createResource(target); err != nil { - return goerrors.Wrap(err, "failed to recreate resource") + return errors.Wrap(err, "failed to recreate resource") } log.Printf("Created a new %s called %q\n", kind, target.Name) @@ -478,7 +383,7 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, } versioned := asVersioned(target) - selector, err := getSelectorFromObject(versioned) + selector, err := selectorsForObject(versioned) if err != nil { return nil } @@ -489,8 +394,7 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, } pods, err := client.CoreV1().Pods(target.Namespace).List(metav1.ListOptions{ - FieldSelector: fields.Everything().String(), - LabelSelector: labels.Set(selector).AsSelector().String(), + LabelSelector: selector.String(), }) if err != nil { return err @@ -508,48 +412,6 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, return nil } -func getSelectorFromObject(obj runtime.Object) (map[string]string, error) { - switch typed := obj.(type) { - - case *v1.ReplicationController: - return typed.Spec.Selector, nil - - case *extv1beta1.ReplicaSet: - return typed.Spec.Selector.MatchLabels, nil - case *appsv1.ReplicaSet: - return typed.Spec.Selector.MatchLabels, nil - - case *extv1beta1.Deployment: - return typed.Spec.Selector.MatchLabels, nil - case *appsv1beta1.Deployment: - return typed.Spec.Selector.MatchLabels, nil - case *appsv1beta2.Deployment: - return typed.Spec.Selector.MatchLabels, nil - case *appsv1.Deployment: - return typed.Spec.Selector.MatchLabels, nil - - case *extv1beta1.DaemonSet: - return typed.Spec.Selector.MatchLabels, nil - case *appsv1beta2.DaemonSet: - return typed.Spec.Selector.MatchLabels, nil - case *appsv1.DaemonSet: - return typed.Spec.Selector.MatchLabels, nil - - case *batch.Job: - return typed.Spec.Selector.MatchLabels, nil - - case *appsv1beta1.StatefulSet: - return typed.Spec.Selector.MatchLabels, nil - case *appsv1beta2.StatefulSet: - return typed.Spec.Selector.MatchLabels, nil - case *appsv1.StatefulSet: - return typed.Spec.Selector.MatchLabels, nil - - default: - return nil, goerrors.Errorf("unsupported kind when getting selector: %v", obj) - } -} - func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error { w, err := resource.NewHelper(info.Client, info.Mapping).WatchSingle(info.Namespace, info.Name, info.ResourceVersion) if err != nil { @@ -585,7 +447,7 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err case watch.Error: // Handle error and return with an error. c.Log("Error event for %s", info.Name) - return true, goerrors.Errorf("failed to deploy %s", info.Name) + return true, errors.Errorf("failed to deploy %s", info.Name) default: return false, nil } @@ -597,16 +459,16 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err // // This operates on an event returned from a watcher. func (c *Client) waitForJob(e watch.Event, name string) (bool, error) { - o, ok := e.Object.(*batchinternal.Job) + o, ok := e.Object.(*batch.Job) if !ok { - return true, goerrors.Errorf("expected %s to be a *batch.Job, got %T", name, e.Object) + return true, errors.Errorf("expected %s to be a *batch.Job, got %T", name, e.Object) } for _, c := range o.Status.Conditions { - if c.Type == batchinternal.JobComplete && c.Status == "True" { + if c.Type == batch.JobComplete && c.Status == "True" { return true, nil - } else if c.Type == batchinternal.JobFailed && c.Status == "True" { - return true, goerrors.Errorf("job failed: %s", c.Reason) + } else if c.Type == batch.JobFailed && c.Status == "True" { + return true, errors.Errorf("job failed: %s", c.Reason) } } @@ -622,7 +484,7 @@ func scrubValidationError(err error) error { const stopValidateMessage = "if you choose to ignore these errors, turn validation off with --validate=false" if strings.Contains(err.Error(), stopValidateMessage) { - return goerrors.New(strings.ReplaceAll(err.Error(), "; "+stopValidateMessage, "")) + return errors.New(strings.ReplaceAll(err.Error(), "; "+stopValidateMessage, "")) } return err } @@ -652,61 +514,3 @@ func (c *Client) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) return v1.PodUnknown, err } - -// get a kubernetes resources' relation pods -// kubernetes resource used select labels to relate pods -func (c *Client) getSelectRelationPod(info *resource.Info, objPods map[string][]v1.Pod) (map[string][]v1.Pod, error) { - if info == nil { - return objPods, nil - } - - c.Log("get relation pod of object: %s/%s/%s", info.Namespace, info.Mapping.GroupVersionKind.Kind, info.Name) - - versioned := asVersioned(info) - - // We can ignore this error because it will only error if it isn't a type that doesn't - // have pods. In that case, we don't care - selector, _ := getSelectorFromObject(versioned) - - selectorString := labels.Set(selector).AsSelector().String() - - // If we have an empty selector, this likely is a service or config map, so bail out now - if selectorString == "" { - return objPods, nil - } - - client, _ := c.KubernetesClientSet() - - pods, err := client.CoreV1().Pods(info.Namespace).List(metav1.ListOptions{ - FieldSelector: fields.Everything().String(), - LabelSelector: labels.Set(selector).AsSelector().String(), - }) - if err != nil { - return objPods, err - } - - for _, pod := range pods.Items { - if pod.APIVersion == "" { - pod.APIVersion = "v1" - } - - if pod.Kind == "" { - pod.Kind = "Pod" - } - vk := pod.GroupVersionKind().Version + "/" + pod.GroupVersionKind().Kind - - if !isFoundPod(objPods[vk], pod) { - objPods[vk] = append(objPods[vk], pod) - } - } - return objPods, nil -} - -func isFoundPod(podItem []v1.Pod, pod v1.Pod) bool { - for _, value := range podItem { - if (value.Namespace == pod.Namespace) && (value.Name == pod.Name) { - return true - } - } - return false -} diff --git a/pkg/kube/client_test.go b/pkg/kube/client_test.go index 06bfb5e88..cd80d69d8 100644 --- a/pkg/kube/client_test.go +++ b/pkg/kube/client_test.go @@ -27,11 +27,10 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/cli-runtime/pkg/resource" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest/fake" cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing" - "k8s.io/kubernetes/pkg/kubectl/scheme" ) var unstructuredSerializer = resource.UnstructuredPlusDefaultContentConfig().NegotiatedSerializer @@ -92,15 +91,11 @@ func newResponse(code int, obj runtime.Object) (*http.Response, error) { return &http.Response{StatusCode: code, Header: header, Body: body}, nil } -type testClient struct { - *Client - *cmdtesting.TestFactory -} - -func newTestClient() *testClient { - tf := cmdtesting.NewTestFactory() - c := &Client{Factory: tf, Log: nopLogger} - return &testClient{Client: c, TestFactory: tf} +func newTestClient() *Client { + return &Client{ + Factory: cmdtesting.NewTestFactory().WithNamespace("default"), + Log: nopLogger, + } } func TestUpdate(t *testing.T) { @@ -112,9 +107,8 @@ func TestUpdate(t *testing.T) { var actions []string - tf := cmdtesting.NewTestFactory().WithNamespace("default") - defer tf.Cleanup() - tf.UnstructuredClient = &fake.RESTClient{ + c := newTestClient() + c.Factory.(*cmdtesting.TestFactory).UnstructuredClient = &fake.RESTClient{ NegotiatedSerializer: unstructuredSerializer, Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { p, m := req.URL.Path, req.Method @@ -148,11 +142,6 @@ func TestUpdate(t *testing.T) { } }), } - - c := &Client{ - Factory: tf, - Log: nopLogger, - } if err := c.Update(objBody(&listA), objBody(&listB), false, false); err != nil { t.Fatal(err) } @@ -210,8 +199,6 @@ func TestBuild(t *testing.T) { c := newTestClient() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - c.Cleanup() - // Test for an invalid manifest infos, err := c.Build(tt.reader) if err != nil && !tt.err { @@ -227,49 +214,6 @@ func TestBuild(t *testing.T) { } } -func TestGet(t *testing.T) { - list := newPodList("starfish", "otter") - c := newTestClient() - defer c.Cleanup() - c.TestFactory.UnstructuredClient = &fake.RESTClient{ - GroupVersion: schema.GroupVersion{Version: "v1"}, - NegotiatedSerializer: unstructuredSerializer, - Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { - p, m := req.URL.Path, req.Method - t.Logf("got request %s %s", p, m) - switch { - case p == "/namespaces/default/pods/starfish" && m == "GET": - return newResponse(404, notFoundBody()) - case p == "/namespaces/default/pods/otter" && m == "GET": - return newResponse(200, &list.Items[1]) - default: - t.Fatalf("unexpected request: %s %s", req.Method, req.URL.Path) - return nil, nil - } - }), - } - - // Test Success - data := strings.NewReader("kind: Pod\napiVersion: v1\nmetadata:\n name: otter") - o, err := c.Get(data) - if err != nil { - t.Errorf("Expected missing results, got %q", err) - } - if !strings.Contains(o, "==> v1/Pod") && !strings.Contains(o, "otter") { - t.Errorf("Expected v1/Pod otter, got %s", o) - } - - // Test failure - data = strings.NewReader("kind: Pod\napiVersion: v1\nmetadata:\n name: starfish") - o, err = c.Get(data) - if err != nil { - t.Errorf("Expected missing results, got %q", err) - } - if !strings.Contains(o, "MISSING") && !strings.Contains(o, "pods\t\tstarfish") { - t.Errorf("Expected missing starfish, got %s", o) - } -} - func TestPerform(t *testing.T) { tests := []struct { name string @@ -300,7 +244,6 @@ func TestPerform(t *testing.T) { } c := newTestClient() - defer c.Cleanup() infos, err := c.Build(tt.reader) if err != nil && err.Error() != tt.errMessage { t.Errorf("Error while building manifests: %v", err) diff --git a/pkg/kube/converter.go b/pkg/kube/converter.go index 9092094c3..eff61a530 100644 --- a/pkg/kube/converter.go +++ b/pkg/kube/converter.go @@ -20,17 +20,15 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/cli-runtime/pkg/resource" - "k8s.io/kubernetes/pkg/api/legacyscheme" + "k8s.io/client-go/kubernetes/scheme" ) func asVersioned(info *resource.Info) runtime.Object { - converter := runtime.ObjectConvertor(legacyscheme.Scheme) - groupVersioner := runtime.GroupVersioner(schema.GroupVersions(legacyscheme.Scheme.PrioritizedVersionsAllGroups())) + gv := runtime.GroupVersioner(schema.GroupVersions(scheme.Scheme.PrioritizedVersionsAllGroups())) if info.Mapping != nil { - groupVersioner = info.Mapping.GroupVersionKind.GroupVersion() + gv = info.Mapping.GroupVersionKind.GroupVersion() } - - if obj, err := converter.ConvertToVersion(info.Object, groupVersioner); err == nil { + if obj, err := runtime.ObjectConvertor(scheme.Scheme).ConvertToVersion(info.Object, gv); err == nil { return obj } return info.Object diff --git a/pkg/kube/interface.go b/pkg/kube/interface.go new file mode 100644 index 000000000..9256f5e1c --- /dev/null +++ b/pkg/kube/interface.go @@ -0,0 +1,66 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kube + +import ( + "io" + "time" + + v1 "k8s.io/api/core/v1" +) + +// KubernetesClient represents a client capable of communicating with the Kubernetes API. +// +// A KubernetesClient must be concurrency safe. +type Interface interface { + // Create creates one or more resources. + // + // reader must contain a YAML stream (one or more YAML documents separated + // by "\n---\n"). + Create(reader io.Reader) error + + Wait(r io.Reader, timeout time.Duration) error + + // Delete destroys one or more resources. + // + // reader must contain a YAML stream (one or more YAML documents separated + // by "\n---\n"). + Delete(io.Reader) error + + // Watch the resource in reader until it is "ready". + // + // For Jobs, "ready" means the job ran to completion (excited without error). + // For all other kinds, it means the kind was created or modified without + // error. + WatchUntilReady(reader io.Reader, timeout time.Duration) error + + // Update updates one or more resources or creates the resource + // if it doesn't exist. + // + // reader must contain a YAML stream (one or more YAML documents separated + // by "\n---\n"). + Update(originalReader, modifiedReader io.Reader, force bool, recreate bool) error + + Build(reader io.Reader) (Result, error) + BuildUnstructured(reader io.Reader) (Result, error) + + // WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase + // and returns said phase (PodSucceeded or PodFailed qualify). + WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) +} + +var _ Interface = (*Client)(nil) diff --git a/pkg/kube/printer.go b/pkg/kube/printer.go index 1a9a8daee..7ad4f44ea 100644 --- a/pkg/kube/printer.go +++ b/pkg/kube/printer.go @@ -24,53 +24,6 @@ import ( "k8s.io/cli-runtime/pkg/resource" ) -// KubernetesClient represents a client capable of communicating with the Kubernetes API. -// -// A KubernetesClient must be concurrency safe. -type KubernetesClient interface { - // Create creates one or more resources. - // - // reader must contain a YAML stream (one or more YAML documents separated - // by "\n---\n"). - Create(reader io.Reader) error - - Wait(r io.Reader, timeout time.Duration) error - - // Get gets one or more resources. Returned string hsa the format like kubectl - // provides with the column headers separating the resource types. - // - // reader must contain a YAML stream (one or more YAML documents separated - // by "\n---\n"). - Get(reader io.Reader) (string, error) - - // Delete destroys one or more resources. - // - // reader must contain a YAML stream (one or more YAML documents separated - // by "\n---\n"). - Delete(reader io.Reader) error - - // Watch the resource in reader until it is "ready". - // - // For Jobs, "ready" means the job ran to completion (excited without error). - // For all other kinds, it means the kind was created or modified without - // error. - WatchUntilReady(reader io.Reader, timeout time.Duration) error - - // Update updates one or more resources or creates the resource - // if it doesn't exist. - // - // reader must contain a YAML stream (one or more YAML documents separated - // by "\n---\n"). - Update(originalReader, modifiedReader io.Reader, force bool, recreate bool) error - - Build(reader io.Reader) (Result, error) - BuildUnstructured(reader io.Reader) (Result, error) - - // WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase - // and returns said phase (PodSucceeded or PodFailed qualify). - WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) -} - // PrintingKubeClient implements KubeClient, but simply prints the reader to // the given output. type PrintingKubeClient struct { @@ -83,7 +36,7 @@ func (p *PrintingKubeClient) Create(r io.Reader) error { return err } -func (p *PrintingKubeClient) Wait(r io.Reader, timeout time.Duration) error { +func (p *PrintingKubeClient) Wait(r io.Reader, _ time.Duration) error { _, err := io.Copy(p.Out, r) return err } @@ -103,28 +56,27 @@ func (p *PrintingKubeClient) Delete(r io.Reader) error { } // WatchUntilReady implements KubeClient WatchUntilReady. -func (p *PrintingKubeClient) WatchUntilReady(r io.Reader, timeout time.Duration) error { +func (p *PrintingKubeClient) WatchUntilReady(r io.Reader, _ time.Duration) error { _, err := io.Copy(p.Out, r) return err } // Update implements KubeClient Update. -func (p *PrintingKubeClient) Update(currentReader, modifiedReader io.Reader, force, recreate bool) error { +func (p *PrintingKubeClient) Update(_, modifiedReader io.Reader, _, _ bool) error { _, err := io.Copy(p.Out, modifiedReader) return err } // Build implements KubeClient Build. -func (p *PrintingKubeClient) Build(reader io.Reader) (Result, error) { +func (p *PrintingKubeClient) Build(_ io.Reader) (Result, error) { return []*resource.Info{}, nil } -// BuildUnstructured implements KubeClient BuildUnstructured. -func (p *PrintingKubeClient) BuildUnstructured(reader io.Reader) (Result, error) { - return []*resource.Info{}, nil +func (p *PrintingKubeClient) BuildUnstructured(_ io.Reader) (Result, error) { + return p.Build(nil) } // WaitAndGetCompletedPodPhase implements KubeClient WaitAndGetCompletedPodPhase. -func (p *PrintingKubeClient) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) { +func (p *PrintingKubeClient) WaitAndGetCompletedPodPhase(_ string, _ time.Duration) (v1.PodPhase, error) { return v1.PodSucceeded, nil } diff --git a/pkg/kube/printer_test.go b/pkg/kube/printer_test.go deleted file mode 100644 index 876b280bb..000000000 --- a/pkg/kube/printer_test.go +++ /dev/null @@ -1,79 +0,0 @@ -/* -Copyright The Helm Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kube - -import ( - "bytes" - "io" - "testing" - "time" - - v1 "k8s.io/api/core/v1" - "k8s.io/cli-runtime/pkg/resource" -) - -type mockKubeClient struct{} - -func (k *mockKubeClient) Wait(r io.Reader, _ time.Duration) error { - return nil -} -func (k *mockKubeClient) Create(r io.Reader) error { - return nil -} -func (k *mockKubeClient) Get(r io.Reader) (string, error) { - return "", nil -} -func (k *mockKubeClient) Delete(r io.Reader) error { - return nil -} -func (k *mockKubeClient) Update(currentReader, modifiedReader io.Reader, force, recreate bool) error { - return nil -} -func (k *mockKubeClient) WatchUntilReady(r io.Reader, timeout time.Duration) error { - return nil -} -func (k *mockKubeClient) Build(reader io.Reader) (Result, error) { - return []*resource.Info{}, nil -} -func (k *mockKubeClient) BuildUnstructured(reader io.Reader) (Result, error) { - return []*resource.Info{}, nil -} -func (k *mockKubeClient) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) { - return v1.PodUnknown, nil -} - -var _ KubernetesClient = &mockKubeClient{} -var _ KubernetesClient = &PrintingKubeClient{} - -func TestKubeClient(t *testing.T) { - kc := &mockKubeClient{} - - manifests := map[string]string{ - "foo": "name: value\n", - "bar": "name: value\n", - } - - b := bytes.NewBuffer(nil) - for _, content := range manifests { - b.WriteString("\n---\n") - b.WriteString(content) - } - - if err := kc.Create(b); err != nil { - t.Errorf("Kubeclient failed: %s", err) - } -} diff --git a/pkg/kube/result.go b/pkg/kube/result.go index ef6dd1e6f..a527727ca 100644 --- a/pkg/kube/result.go +++ b/pkg/kube/result.go @@ -76,9 +76,7 @@ func (r Result) Difference(rs Result) Result { // Intersect will return a new Result with objects contained in both Results. func (r Result) Intersect(rs Result) Result { - return r.Filter(func(info *resource.Info) bool { - return rs.Contains(info) - }) + return r.Filter(rs.Contains) } // isMatchingInfo returns true if infos match on Name and GroupVersionKind. diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index e839fe0a8..c51512bcf 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -17,27 +17,24 @@ limitations under the License. package kube // import "helm.sh/helm/pkg/kube" import ( + "fmt" "time" + "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" appsv1beta1 "k8s.io/api/apps/v1beta1" appsv1beta2 "k8s.io/api/apps/v1beta2" - v1 "k8s.io/api/core/v1" - extensions "k8s.io/api/extensions/v1beta1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + extensionsv1beta1 "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" ) -// deployment holds associated replicaSets for a deployment -type deployment struct { - replicaSets *appsv1.ReplicaSet - deployment *appsv1.Deployment -} - type waiter struct { c kubernetes.Interface timeout time.Duration @@ -50,26 +47,17 @@ func (w *waiter) waitForResources(created Result) error { w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) return wait.Poll(2*time.Second, w.timeout, func() (bool, error) { - var ( - pods []v1.Pod - services []v1.Service - pvc []v1.PersistentVolumeClaim - deployments []deployment - ) for _, v := range created[:0] { + var ( + ok bool + err error + ) switch value := asVersioned(v).(type) { - case *v1.ReplicationController: - list, err := getPods(w.c, value.Namespace, value.Spec.Selector) - if err != nil { - return false, err - } - pods = append(pods, list...) - case *v1.Pod: + case *corev1.Pod: pod, err := w.c.CoreV1().Pods(value.Namespace).Get(value.Name, metav1.GetOptions{}) - if err != nil { + if err != nil || !w.isPodReady(pod) { return false, err } - pods = append(pods, *pod) case *appsv1.Deployment: currentDeployment, err := w.c.AppsV1().Deployments(value.Namespace).Get(value.Name, metav1.GetOptions{}) if err != nil { @@ -80,11 +68,9 @@ func (w *waiter) waitForResources(created Result) error { if err != nil || newReplicaSet == nil { return false, err } - newDeployment := deployment{ - newReplicaSet, - currentDeployment, + if !w.deploymentReady(newReplicaSet, currentDeployment) { + return false, nil } - deployments = append(deployments, newDeployment) case *appsv1beta1.Deployment: currentDeployment, err := w.c.AppsV1().Deployments(value.Namespace).Get(value.Name, metav1.GetOptions{}) if err != nil { @@ -95,11 +81,9 @@ func (w *waiter) waitForResources(created Result) error { if err != nil || newReplicaSet == nil { return false, err } - newDeployment := deployment{ - newReplicaSet, - currentDeployment, + if !w.deploymentReady(newReplicaSet, currentDeployment) { + return false, nil } - deployments = append(deployments, newDeployment) case *appsv1beta2.Deployment: currentDeployment, err := w.c.AppsV1().Deployments(value.Namespace).Get(value.Name, metav1.GetOptions{}) if err != nil { @@ -110,12 +94,10 @@ func (w *waiter) waitForResources(created Result) error { if err != nil || newReplicaSet == nil { return false, err } - newDeployment := deployment{ - newReplicaSet, - currentDeployment, + if !w.deploymentReady(newReplicaSet, currentDeployment) { + return false, nil } - deployments = append(deployments, newDeployment) - case *extensions.Deployment: + case *extensionsv1beta1.Deployment: currentDeployment, err := w.c.AppsV1().Deployments(value.Namespace).Get(value.Name, metav1.GetOptions{}) if err != nil { return false, err @@ -125,155 +107,175 @@ func (w *waiter) waitForResources(created Result) error { if err != nil || newReplicaSet == nil { return false, err } - newDeployment := deployment{ - newReplicaSet, - currentDeployment, + if !w.deploymentReady(newReplicaSet, currentDeployment) { + return false, nil } - deployments = append(deployments, newDeployment) - case *extensions.DaemonSet: - list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels) + case *corev1.PersistentVolumeClaim: + claim, err := w.c.CoreV1().PersistentVolumeClaims(value.Namespace).Get(value.Name, metav1.GetOptions{}) if err != nil { return false, err } - pods = append(pods, list...) - case *appsv1.DaemonSet: - list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels) - if err != nil { - return false, err + if !w.volumeReady(claim) { + return false, nil } - pods = append(pods, list...) - case *appsv1beta2.DaemonSet: - list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels) + case *corev1.Service: + svc, err := w.c.CoreV1().Services(value.Namespace).Get(value.Name, metav1.GetOptions{}) if err != nil { return false, err } - pods = append(pods, list...) - case *appsv1.StatefulSet: - list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels) - if err != nil { - return false, err + if !w.serviceReady(svc) { + return false, nil } - pods = append(pods, list...) + case *corev1.ReplicationController: + ok, err = w.podsReadyForObject(value.Namespace, value) + case *extensionsv1beta1.DaemonSet: + ok, err = w.podsReadyForObject(value.Namespace, value) + case *appsv1.DaemonSet: + ok, err = w.podsReadyForObject(value.Namespace, value) + case *appsv1beta2.DaemonSet: + ok, err = w.podsReadyForObject(value.Namespace, value) + case *appsv1.StatefulSet: + ok, err = w.podsReadyForObject(value.Namespace, value) case *appsv1beta1.StatefulSet: - list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels) - if err != nil { - return false, err - } - pods = append(pods, list...) + ok, err = w.podsReadyForObject(value.Namespace, value) case *appsv1beta2.StatefulSet: - list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels) - if err != nil { - return false, err - } - pods = append(pods, list...) - case *extensions.ReplicaSet: - list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels) - if err != nil { - return false, err - } - pods = append(pods, list...) + ok, err = w.podsReadyForObject(value.Namespace, value) + case *extensionsv1beta1.ReplicaSet: + ok, err = w.podsReadyForObject(value.Namespace, value) case *appsv1beta2.ReplicaSet: - list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels) - if err != nil { - return false, err - } - pods = append(pods, list...) + ok, err = w.podsReadyForObject(value.Namespace, value) case *appsv1.ReplicaSet: - list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels) - if err != nil { - return false, err - } - pods = append(pods, list...) - case *v1.PersistentVolumeClaim: - claim, err := w.c.CoreV1().PersistentVolumeClaims(value.Namespace).Get(value.Name, metav1.GetOptions{}) - if err != nil { - return false, err - } - pvc = append(pvc, *claim) - case *v1.Service: - svc, err := w.c.CoreV1().Services(value.Namespace).Get(value.Name, metav1.GetOptions{}) - if err != nil { - return false, err - } - services = append(services, *svc) + ok, err = w.podsReadyForObject(value.Namespace, value) + } + if !ok || err != nil { + return false, err } } - isReady := w.podsReady(pods) && w.servicesReady(services) && w.volumesReady(pvc) && w.deploymentsReady(deployments) - return isReady, nil + return true, nil }) } -func (w *waiter) podsReady(pods []v1.Pod) bool { +func (w *waiter) podsReadyForObject(namespace string, obj runtime.Object) (bool, error) { + pods, err := w.podsforObject(namespace, obj) + if err != nil { + return false, err + } for _, pod := range pods { - if !IsPodReady(&pod) { - w.log("Pod is not ready: %s/%s", pod.GetNamespace(), pod.GetName()) - return false + if !w.isPodReady(&pod) { + return false, nil } } - return true + return true, nil +} + +func (w *waiter) podsforObject(namespace string, obj runtime.Object) ([]corev1.Pod, error) { + selector, err := selectorsForObject(obj) + if err != nil { + return nil, err + } + list, err := getPods(w.c, namespace, selector.String()) + return list, err } -// IsPodReady returns true if a pod is ready; false otherwise. -func IsPodReady(pod *v1.Pod) bool { +// isPodReady returns true if a pod is ready; false otherwise. +func (w *waiter) isPodReady(pod *corev1.Pod) bool { for _, c := range pod.Status.Conditions { - if c.Type == v1.PodReady && c.Status == v1.ConditionTrue { + if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue { return true } } + w.log("Pod is not ready: %s/%s", pod.GetNamespace(), pod.GetName()) return false } -func (w *waiter) servicesReady(svc []v1.Service) bool { - for _, s := range svc { - // ExternalName Services are external to cluster so helm shouldn't be checking to see if they're 'ready' (i.e. have an IP Set) - if s.Spec.Type == v1.ServiceTypeExternalName { - continue - } - - // Make sure the service is not explicitly set to "None" before checking the IP - if s.Spec.ClusterIP != v1.ClusterIPNone && !IsServiceIPSet(&s) { - w.log("Service is not ready: %s/%s", s.GetNamespace(), s.GetName()) - return false - } +func (w *waiter) serviceReady(s *corev1.Service) bool { + // ExternalName Services are external to cluster so helm shouldn't be checking to see if they're 'ready' (i.e. have an IP Set) + if s.Spec.Type == corev1.ServiceTypeExternalName { + return true + } + // Make sure the service is not explicitly set to "None" before checking the IP + if s.Spec.ClusterIP != corev1.ClusterIPNone && !isServiceIPSet(s) || // This checks if the service has a LoadBalancer and that balancer has an Ingress defined - if s.Spec.Type == v1.ServiceTypeLoadBalancer && s.Status.LoadBalancer.Ingress == nil { - w.log("Service is not ready: %s/%s", s.GetNamespace(), s.GetName()) - return false - } + s.Spec.Type == corev1.ServiceTypeLoadBalancer && s.Status.LoadBalancer.Ingress == nil { + w.log("Service is not ready: %s/%s", s.GetNamespace(), s.GetName()) + return false } return true } -// IsServiceIPSet aims to check if the service's ClusterIP is set or not +// isServiceIPSet aims to check if the service's ClusterIP is set or not // the objective is not to perform validation here -func IsServiceIPSet(service *v1.Service) bool { - return service.Spec.ClusterIP != v1.ClusterIPNone && service.Spec.ClusterIP != "" +func isServiceIPSet(service *corev1.Service) bool { + return service.Spec.ClusterIP != corev1.ClusterIPNone && service.Spec.ClusterIP != "" } -func (w *waiter) volumesReady(vols []v1.PersistentVolumeClaim) bool { - for _, v := range vols { - if v.Status.Phase != v1.ClaimBound { - w.log("PersistentVolumeClaim is not ready: %s/%s", v.GetNamespace(), v.GetName()) - return false - } +func (w *waiter) volumeReady(v *corev1.PersistentVolumeClaim) bool { + if v.Status.Phase != corev1.ClaimBound { + w.log("PersistentVolumeClaim is not ready: %s/%s", v.GetNamespace(), v.GetName()) + return false } return true } -func (w *waiter) deploymentsReady(deployments []deployment) bool { - for _, v := range deployments { - if !(v.replicaSets.Status.ReadyReplicas >= *v.deployment.Spec.Replicas-deploymentutil.MaxUnavailable(*v.deployment)) { - w.log("Deployment is not ready: %s/%s", v.deployment.GetNamespace(), v.deployment.GetName()) - return false - } +func (w *waiter) deploymentReady(replicaSet *appsv1.ReplicaSet, deployment *appsv1.Deployment) bool { + if !(replicaSet.Status.ReadyReplicas >= *deployment.Spec.Replicas-deploymentutil.MaxUnavailable(*deployment)) { + w.log("Deployment is not ready: %s/%s", deployment.GetNamespace(), deployment.GetName()) + return false } return true } -func getPods(client kubernetes.Interface, namespace string, selector map[string]string) ([]v1.Pod, error) { +func getPods(client kubernetes.Interface, namespace, selector string) ([]corev1.Pod, error) { list, err := client.CoreV1().Pods(namespace).List(metav1.ListOptions{ - FieldSelector: fields.Everything().String(), - LabelSelector: labels.Set(selector).AsSelector().String(), + LabelSelector: selector, }) return list.Items, err } + +// selectorsForObject returns the pod label selector for a given object +// +// Modified version of https://github.com/kubernetes/kubernetes/blob/v1.14.1/pkg/kubectl/polymorphichelpers/helpers.go#L84 +func selectorsForObject(object runtime.Object) (selector labels.Selector, err error) { + switch t := object.(type) { + case *extensionsv1beta1.ReplicaSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1.ReplicaSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1beta2.ReplicaSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *corev1.ReplicationController: + selector = labels.SelectorFromSet(t.Spec.Selector) + case *appsv1.StatefulSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1beta1.StatefulSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1beta2.StatefulSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *extensionsv1beta1.DaemonSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1.DaemonSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1beta2.DaemonSet: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *extensionsv1beta1.Deployment: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1.Deployment: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1beta1.Deployment: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *appsv1beta2.Deployment: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *batchv1.Job: + selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) + case *corev1.Service: + if t.Spec.Selector == nil || len(t.Spec.Selector) == 0 { + return nil, fmt.Errorf("invalid service '%s': Service is defined without a selector", t.Name) + } + selector = labels.SelectorFromSet(t.Spec.Selector) + + default: + return nil, fmt.Errorf("selector for %T not implemented", object) + } + + return selector, errors.Wrap(err, "invalid label selector") +} diff --git a/pkg/releasetesting/environment.go b/pkg/releasetesting/environment.go index ec3677c7e..7bff936b8 100644 --- a/pkg/releasetesting/environment.go +++ b/pkg/releasetesting/environment.go @@ -31,7 +31,7 @@ import ( // Environment encapsulates information about where test suite executes and returns results type Environment struct { Namespace string - KubeClient kube.KubernetesClient + KubeClient kube.Interface Messages chan *release.TestReleaseResponse Timeout time.Duration } diff --git a/pkg/releasetesting/test_suite_test.go b/pkg/releasetesting/test_suite_test.go index 37908fdae..9256df467 100644 --- a/pkg/releasetesting/test_suite_test.go +++ b/pkg/releasetesting/test_suite_test.go @@ -244,7 +244,7 @@ func testEnvFixture() *Environment { } type mockKubeClient struct { - kube.KubernetesClient + kube.Interface podFail bool err error } @@ -255,12 +255,5 @@ func (c *mockKubeClient) WaitAndGetCompletedPodPhase(_ string, _ time.Duration) } return v1.PodSucceeded, nil } -func (c *mockKubeClient) Get(_ io.Reader) (string, error) { - return "", nil -} -func (c *mockKubeClient) Create(_ io.Reader) error { - return c.err -} -func (c *mockKubeClient) Delete(_ io.Reader) error { - return nil -} +func (c *mockKubeClient) Create(_ io.Reader) error { return c.err } +func (c *mockKubeClient) Delete(_ io.Reader) error { return nil }