ref(pkg/kube): cleanup kube client interface

* move the main interface to it's own file
* removed summarizeKeptManifests() which was the last place kube.Get()
  was called
* when polling for hooks, use external types
* refactor out legacyschema
* refactor detecting selectors from object
* refactor creating test client

Signed-off-by: Adam Reese <adam@reese.io>
pull/5735/head
Adam Reese 6 years ago
parent c62a3a4896
commit 0338576fc5
No known key found for this signature in database
GPG Key ID: 06F35E60A7A18DD6

103
Gopkg.lock generated

@ -187,19 +187,6 @@
revision = "7f2434bc10da710debe5c4315ed6d4df454b4024"
version = "v0.1.0"
[[projects]]
branch = "master"
digest = "1:95e08278c876d185ba67533f045e9e63b3c9d02cbd60beb0f4dbaa2344a13ac2"
name = "github.com/chai2010/gettext-go"
packages = [
"gettext",
"gettext/mo",
"gettext/plural",
"gettext/po",
]
pruneopts = "UT"
revision = "bf70f2a70fb1b1f36d90d671a72795984eab0fcb"
[[projects]]
digest = "1:37f8940c4d3c41536ea882b1ca3498e403c04892dfc34bd0d670ed9eafccda9a"
name = "github.com/containerd/containerd"
@ -704,14 +691,6 @@
revision = "5c8c8bd35d3832f5d134ae1e1e375b69a4d25242"
version = "v1.0.1"
[[projects]]
branch = "master"
digest = "1:10b85f58562d487a3bd7da6ba5b895bc221d5ecbd89df9c7c5a36004e827ade1"
name = "github.com/liggitt/tabwriter"
packages = ["."]
pruneopts = "UT"
revision = "89fcab3d43de07060e4fd4c1547430ed57e87f24"
[[projects]]
branch = "master"
digest = "1:84a5a2b67486d5d67060ac393aa255d05d24ed5ee41daecd5635ec22657b6492"
@ -1499,8 +1478,8 @@
"util/retry",
]
pruneopts = "UT"
revision = "6ee68ca5fd8355d024d02f9db0b3b667e8357a0f"
version = "kubernetes-1.14.0"
revision = "1a26190bd76a9017e289958b9fba936430aa3704"
version = "kubernetes-1.14.1"
[[projects]]
branch = "master"
@ -1533,42 +1512,14 @@
[[projects]]
branch = "release-1.14"
digest = "1:3e8a09f07ca1d0163720064d0bcb567fdc85338e02bd63c6d84786be8b24ebdb"
digest = "1:8e018df756b49f5fdc20e15041486520285829528df043457bb7a91886fa775b"
name = "k8s.io/kubernetes"
packages = [
"pkg/api/legacyscheme",
"pkg/api/service",
"pkg/api/v1/pod",
"pkg/apis/apps",
"pkg/apis/apps/install",
"pkg/apis/apps/v1",
"pkg/apis/apps/v1beta1",
"pkg/apis/apps/v1beta2",
"pkg/apis/authentication",
"pkg/apis/authentication/install",
"pkg/apis/authentication/v1",
"pkg/apis/authentication/v1beta1",
"pkg/apis/authorization",
"pkg/apis/authorization/install",
"pkg/apis/authorization/v1",
"pkg/apis/authorization/v1beta1",
"pkg/apis/autoscaling",
"pkg/apis/autoscaling/install",
"pkg/apis/autoscaling/v1",
"pkg/apis/autoscaling/v2beta1",
"pkg/apis/autoscaling/v2beta2",
"pkg/apis/batch",
"pkg/apis/batch/install",
"pkg/apis/batch/v1",
"pkg/apis/batch/v1beta1",
"pkg/apis/batch/v2alpha1",
"pkg/apis/certificates",
"pkg/apis/certificates/install",
"pkg/apis/certificates/v1beta1",
"pkg/apis/coordination",
"pkg/apis/coordination/install",
"pkg/apis/coordination/v1",
"pkg/apis/coordination/v1beta1",
"pkg/apis/core",
"pkg/apis/core/helper",
"pkg/apis/core/install",
@ -1576,36 +1527,7 @@
"pkg/apis/core/v1",
"pkg/apis/core/v1/helper",
"pkg/apis/core/validation",
"pkg/apis/events",
"pkg/apis/events/install",
"pkg/apis/events/v1beta1",
"pkg/apis/extensions",
"pkg/apis/extensions/install",
"pkg/apis/extensions/v1beta1",
"pkg/apis/networking",
"pkg/apis/node",
"pkg/apis/policy",
"pkg/apis/policy/install",
"pkg/apis/policy/v1beta1",
"pkg/apis/rbac",
"pkg/apis/rbac/install",
"pkg/apis/rbac/v1",
"pkg/apis/rbac/v1alpha1",
"pkg/apis/rbac/v1beta1",
"pkg/apis/scheduling",
"pkg/apis/scheduling/install",
"pkg/apis/scheduling/v1",
"pkg/apis/scheduling/v1alpha1",
"pkg/apis/scheduling/v1beta1",
"pkg/apis/settings",
"pkg/apis/settings/install",
"pkg/apis/settings/v1alpha1",
"pkg/apis/storage",
"pkg/apis/storage/install",
"pkg/apis/storage/util",
"pkg/apis/storage/v1",
"pkg/apis/storage/v1alpha1",
"pkg/apis/storage/v1beta1",
"pkg/capabilities",
"pkg/controller",
"pkg/controller/deployment/util",
@ -1613,7 +1535,6 @@
"pkg/fieldpath",
"pkg/kubectl",
"pkg/kubectl/apps",
"pkg/kubectl/cmd/get",
"pkg/kubectl/cmd/testing",
"pkg/kubectl/cmd/util",
"pkg/kubectl/cmd/util/openapi",
@ -1621,16 +1542,13 @@
"pkg/kubectl/cmd/util/openapi/validation",
"pkg/kubectl/describe",
"pkg/kubectl/describe/versioned",
"pkg/kubectl/generated",
"pkg/kubectl/scheme",
"pkg/kubectl/util",
"pkg/kubectl/util/certificate",
"pkg/kubectl/util/deployment",
"pkg/kubectl/util/event",
"pkg/kubectl/util/fieldpath",
"pkg/kubectl/util/i18n",
"pkg/kubectl/util/podutils",
"pkg/kubectl/util/printers",
"pkg/kubectl/util/qos",
"pkg/kubectl/util/rbac",
"pkg/kubectl/util/resource",
@ -1641,14 +1559,11 @@
"pkg/kubectl/validation",
"pkg/kubelet/types",
"pkg/master/ports",
"pkg/printers",
"pkg/printers/internalversion",
"pkg/security/apparmor",
"pkg/serviceaccount",
"pkg/util/hash",
"pkg/util/interrupt",
"pkg/util/labels",
"pkg/util/node",
"pkg/util/parsers",
"pkg/util/taints",
"pkg/version",
@ -1720,14 +1635,6 @@
revision = "fd68e9863619f6ec2fdd8625fe1f02e7c877e480"
version = "v1.1.0"
[[projects]]
branch = "master"
digest = "1:9132eacc44d9bd1e03145ea2e9d4888800da7773d6edebb401f8cd34c9fb8380"
name = "vbom.ml/util"
packages = ["sortorder"]
pruneopts = "UT"
revision = "efcd4e0f97874370259c7d93e12aad57911dea81"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
@ -1783,7 +1690,6 @@
"k8s.io/apimachinery/pkg/api/errors",
"k8s.io/apimachinery/pkg/api/meta",
"k8s.io/apimachinery/pkg/apis/meta/v1",
"k8s.io/apimachinery/pkg/fields",
"k8s.io/apimachinery/pkg/labels",
"k8s.io/apimachinery/pkg/runtime",
"k8s.io/apimachinery/pkg/runtime/schema",
@ -1805,12 +1711,9 @@
"k8s.io/client-go/tools/watch",
"k8s.io/client-go/util/homedir",
"k8s.io/kubernetes/pkg/api/legacyscheme",
"k8s.io/kubernetes/pkg/apis/batch",
"k8s.io/kubernetes/pkg/controller/deployment/util",
"k8s.io/kubernetes/pkg/kubectl/cmd/get",
"k8s.io/kubernetes/pkg/kubectl/cmd/testing",
"k8s.io/kubernetes/pkg/kubectl/cmd/util",
"k8s.io/kubernetes/pkg/kubectl/scheme",
"k8s.io/kubernetes/pkg/kubectl/validation",
]
solver-name = "gps-cdcl"

@ -36,7 +36,7 @@
[[constraint]]
name = "k8s.io/client-go"
version = "kubernetes-1.14.0"
version = "kubernetes-1.14.1"
[[constraint]]
name = "k8s.io/kubernetes"

@ -70,7 +70,7 @@ type Configuration struct {
Releases *storage.Storage
// KubeClient is a Kubernetes API client.
KubeClient kube.KubernetesClient
KubeClient kube.Interface
// RegistryClient is a client for working with registries
RegistryClient *registry.Client

@ -191,6 +191,7 @@ func (i *Install) Run(chrt *chart.Chart) (*release.Release, error) {
}
if i.Wait {
buf := bytes.NewBufferString(rel.Manifest)
if err := i.cfg.KubeClient.Wait(buf, i.Timeout); err != nil {
rel.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", i.ReleaseName, err.Error()))
i.recordRelease(rel) // Ignore the error, since we have another error to deal with.
@ -623,7 +624,7 @@ func (v *ValueOptions) MergeValues(settings cli.EnvSettings) error {
return errors.Wrapf(err, "failed to parse %s", filePath)
}
// Merge with the previous map
base = MergeValues(base, currentMap)
base = mergeMaps(base, currentMap)
}
// User specified a value via --set
@ -644,31 +645,47 @@ func (v *ValueOptions) MergeValues(settings cli.EnvSettings) error {
return nil
}
// MergeValues merges source and destination map, preferring values from the source map
func MergeValues(dest, src map[string]interface{}) map[string]interface{} {
// mergeValues merges source and destination map, preferring values from the source map
func mergeValues(dest, src map[string]interface{}) map[string]interface{} {
out := make(map[string]interface{})
for k, v := range dest {
out[k] = v
}
for k, v := range src {
if _, ok := out[k]; !ok {
// If the key doesn't exist already, then just set the key to that value
if _, exists := dest[k]; !exists {
dest[k] = v
continue
}
nextMap, ok := v.(map[string]interface{})
} else if nextMap, ok := v.(map[string]interface{}); !ok {
// If it isn't another map, overwrite the value
if !ok {
dest[k] = v
continue
}
} else if destMap, isMap := out[k].(map[string]interface{}); !isMap {
// Edge case: If the key exists in the destination, but isn't a map
destMap, isMap := dest[k].(map[string]interface{})
// If the source map has a map for this key, prefer it
if !isMap {
dest[k] = v
} else {
// If we got to this point, it is a map in both, so merge them
out[k] = mergeValues(destMap, nextMap)
continue
}
// If we got to this point, it is a map in both, so merge them
dest[k] = MergeValues(destMap, nextMap)
out[k] = v
}
return out
}
func mergeMaps(a, b map[string]interface{}) map[string]interface{} {
out := make(map[string]interface{}, len(a))
for k, v := range a {
out[k] = v
}
for k, v := range b {
if v, ok := v.(map[string]interface{}); ok {
if bv, ok := out[k]; ok {
if bv, ok := bv.(map[string]interface{}); ok {
out[k] = mergeMaps(bv, v)
continue
}
}
}
out[k] = v
}
return dest
return out
}
// readFile load a file from stdin, the local directory, or a remote file with a url.

@ -322,25 +322,25 @@ func TestMergeValues(t *testing.T) {
"testing": "fun",
}
testMap := MergeValues(flatMap, nestedMap)
testMap := mergeValues(flatMap, nestedMap)
equal := reflect.DeepEqual(testMap, nestedMap)
if !equal {
t.Errorf("Expected a nested map to overwrite a flat value. Expected: %v, got %v", nestedMap, testMap)
}
testMap = MergeValues(nestedMap, flatMap)
testMap = mergeValues(nestedMap, flatMap)
equal = reflect.DeepEqual(testMap, flatMap)
if !equal {
t.Errorf("Expected a flat value to overwrite a map. Expected: %v, got %v", flatMap, testMap)
}
testMap = MergeValues(nestedMap, anotherNestedMap)
testMap = mergeValues(nestedMap, anotherNestedMap)
equal = reflect.DeepEqual(testMap, anotherNestedMap)
if !equal {
t.Errorf("Expected a nested map to overwrite another nested map. Expected: %v, got %v", anotherNestedMap, testMap)
}
testMap = MergeValues(anotherFlatMap, anotherNestedMap)
testMap = mergeValues(anotherFlatMap, anotherNestedMap)
expectedMap := map[string]interface{}{
"testing": "fun",
"foo": "bar",

@ -17,10 +17,8 @@ limitations under the License.
package action
import (
"bytes"
"strings"
"helm.sh/helm/pkg/kube"
"helm.sh/helm/pkg/releaseutil"
)
@ -33,10 +31,7 @@ const resourcePolicyAnno = "helm.sh/resource-policy"
// during an uninstallRelease action.
const keepPolicy = "keep"
func filterManifestsToKeep(manifests []releaseutil.Manifest) ([]releaseutil.Manifest, []releaseutil.Manifest) {
remaining := []releaseutil.Manifest{}
keep := []releaseutil.Manifest{}
func filterManifestsToKeep(manifests []releaseutil.Manifest) (keep, remaining []releaseutil.Manifest) {
for _, m := range manifests {
if m.Head.Metadata == nil || m.Head.Metadata.Annotations == nil || len(m.Head.Metadata.Annotations) == 0 {
remaining = append(remaining, m)
@ -57,21 +52,3 @@ func filterManifestsToKeep(manifests []releaseutil.Manifest) ([]releaseutil.Mani
}
return keep, remaining
}
func summarizeKeptManifests(manifests []releaseutil.Manifest, kubeClient kube.KubernetesClient) string {
var message string
for _, m := range manifests {
// check if m is in fact present from k8s client's POV.
output, err := kubeClient.Get(bytes.NewBufferString(m.Content))
if err != nil || strings.Contains(output, kube.MissingGetHeader) {
continue
}
details := "[" + m.Head.Kind + "] " + m.Head.Metadata.Name + "\n"
if message == "" {
message = "These resources were kept due to the resource policy:\n"
}
message += details
}
return message
}

@ -235,11 +235,9 @@ func (r *Rollback) execHook(hs []*release.Hook, hook string) error {
// deleteHookByPolicy deletes a hook if the hook policy instructs it to
func deleteHookByPolicy(cfg *Configuration, h *release.Hook, policy string) error {
b := bytes.NewBufferString(h.Manifest)
if hookHasDeletePolicy(h, policy) {
if errHookDelete := cfg.KubeClient.Delete(b); errHookDelete != nil {
return errHookDelete
}
b := bytes.NewBufferString(h.Manifest)
return cfg.KubeClient.Delete(b)
}
return nil
}

@ -218,8 +218,8 @@ func (u *Uninstall) deleteRelease(rel *release.Release) (kept string, errs []err
}
filesToKeep, filesToDelete := filterManifestsToKeep(files)
if len(filesToKeep) > 0 {
kept = summarizeKeptManifests(filesToKeep, u.cfg.KubeClient)
for _, f := range filesToKeep {
kept += f.Name + "\n"
}
for _, file := range filesToDelete {

@ -275,8 +275,8 @@ func (u *Upgrade) reuseValues(chart *chart.Chart, current *release.Release) erro
return nil
}
func validateManifest(c kube.KubernetesClient, manifest []byte) error {
_, err := c.BuildUnstructured(bytes.NewReader(manifest))
func validateManifest(c kube.Interface, manifest []byte) error {
_, err := c.Build(bytes.NewReader(manifest))
return err
}

@ -17,7 +17,6 @@ limitations under the License.
package kube // import "helm.sh/helm/pkg/kube"
import (
"bytes"
"context"
"encoding/json"
"fmt"
@ -27,18 +26,12 @@ import (
"time"
jsonpatch "github.com/evanphx/json-patch"
goerrors "github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
appsv1beta1 "k8s.io/api/apps/v1beta1"
appsv1beta2 "k8s.io/api/apps/v1beta2"
"github.com/pkg/errors"
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
extv1beta1 "k8s.io/api/extensions/v1beta1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
@ -46,18 +39,13 @@ import (
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/pkg/api/legacyscheme"
batchinternal "k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/kubectl/cmd/get"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
)
// MissingGetHeader is added to Get's output when a resource is not found.
const MissingGetHeader = "==> MISSING\nKIND\t\tNAME\n"
// ErrNoObjectsVisited indicates that during a visit operation, no matching objects were found.
var ErrNoObjectsVisited = goerrors.New("no objects visited")
var ErrNoObjectsVisited = errors.New("no objects visited")
// Client represents a client capable of communicating with the Kubernetes API.
type Client struct {
@ -83,9 +71,6 @@ func (c *Client) KubernetesClientSet() (*kubernetes.Clientset, error) {
var nopLogger = func(_ string, _ ...interface{}) {}
// resourceActorFunc performs an action on a single resource.
type resourceActorFunc func(*resource.Info) error
// Create creates Kubernetes resources from an io.reader.
//
// Namespace will set the namespace.
@ -96,8 +81,7 @@ func (c *Client) Create(reader io.Reader) error {
return err
}
c.Log("creating %d resource(s)", len(infos))
err = perform(infos, createResource)
return err
return perform(infos, createResource)
}
func (c *Client) Wait(reader io.Reader, timeout time.Duration) error {
@ -144,8 +128,6 @@ func (c *Client) validator() resource.ContentValidator {
// BuildUnstructured validates for Kubernetes objects and returns unstructured infos.
func (c *Client) BuildUnstructured(reader io.Reader) (Result, error) {
var result Result
result, err := c.newBuilder().
Unstructured().
Stream(reader, "").
@ -155,9 +137,8 @@ func (c *Client) BuildUnstructured(reader io.Reader) (Result, error) {
// Build validates for Kubernetes objects and returns resource Infos from a io.Reader.
func (c *Client) Build(reader io.Reader) (Result, error) {
var result Result
result, err := c.newBuilder().
WithScheme(legacyscheme.Scheme).
WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
Schema(c.validator()).
Stream(reader, "").
Do().
@ -165,82 +146,6 @@ func (c *Client) Build(reader io.Reader) (Result, error) {
return result, scrubValidationError(err)
}
// Get gets Kubernetes resources as pretty-printed string.
//
// Namespace will set the namespace.
func (c *Client) Get(reader io.Reader) (string, error) {
// Since we don't know what order the objects come in, let's group them by the types, so
// that when we print them, they come out looking good (headers apply to subgroups, etc.).
objs := make(map[string][]runtime.Object)
infos, err := c.BuildUnstructured(reader)
if err != nil {
return "", err
}
var objPods = make(map[string][]v1.Pod)
missing := []string{}
err = perform(infos, func(info *resource.Info) error {
c.Log("Doing get for %s: %q", info.Mapping.GroupVersionKind.Kind, info.Name)
if err := info.Get(); err != nil {
c.Log("WARNING: Failed Get for resource %q: %s", info.Name, err)
missing = append(missing, fmt.Sprintf("%v\t\t%s", info.Mapping.Resource, info.Name))
return nil
}
// Use APIVersion/Kind as grouping mechanism. I'm not sure if you can have multiple
// versions per cluster, but this certainly won't hurt anything, so let's be safe.
gvk := info.ResourceMapping().GroupVersionKind
vk := gvk.Version + "/" + gvk.Kind
objs[vk] = append(objs[vk], asVersioned(info))
// Get the relation pods
objPods, err = c.getSelectRelationPod(info, objPods)
if err != nil {
c.Log("Warning: get the relation pod is failed, err:%s", err)
}
return nil
})
if err != nil {
return "", err
}
// here, we will add the objPods to the objs
for key, podItems := range objPods {
for i := range podItems {
objs[key+"(related)"] = append(objs[key+"(related)"], &podItems[i])
}
}
// Ok, now we have all the objects grouped by types (say, by v1/Pod, v1/Service, etc.), so
// spin through them and print them. Printer is cool since it prints the header only when
// an object type changes, so we can just rely on that. Problem is it doesn't seem to keep
// track of tab widths.
buf := new(bytes.Buffer)
p, _ := get.NewHumanPrintFlags().ToPrinter("")
for t, ot := range objs {
if _, err = buf.WriteString("==> " + t + "\n"); err != nil {
return "", err
}
for _, o := range ot {
if err := p.PrintObj(o, buf); err != nil {
return "", goerrors.Wrapf(err, "failed to print object type %s, object: %q", t, o)
}
}
if _, err := buf.WriteString("\n"); err != nil {
return "", err
}
}
if len(missing) > 0 {
buf.WriteString(MissingGetHeader)
for _, s := range missing {
fmt.Fprintln(buf, s)
}
}
return buf.String(), nil
}
// Update reads in the current configuration and a target configuration from io.reader
// and creates resources that don't already exists, updates resources that have been modified
// in the target configuration and deletes resources from the current configuration that are
@ -250,13 +155,13 @@ func (c *Client) Get(reader io.Reader) (string, error) {
func (c *Client) Update(originalReader, targetReader io.Reader, force, recreate bool) error {
original, err := c.BuildUnstructured(originalReader)
if err != nil {
return goerrors.Wrap(err, "failed decoding reader into objects")
return errors.Wrap(err, "failed decoding reader into objects")
}
c.Log("building resources from updated manifest")
target, err := c.BuildUnstructured(targetReader)
if err != nil {
return goerrors.Wrap(err, "failed decoding reader into objects")
return errors.Wrap(err, "failed decoding reader into objects")
}
updateErrors := []string{}
@ -269,13 +174,13 @@ func (c *Client) Update(originalReader, targetReader io.Reader, force, recreate
helper := resource.NewHelper(info.Client, info.Mapping)
if _, err := helper.Get(info.Namespace, info.Name, info.Export); err != nil {
if !errors.IsNotFound(err) {
return goerrors.Wrap(err, "could not get information about the resource")
if !apierrors.IsNotFound(err) {
return errors.Wrap(err, "could not get information about the resource")
}
// Since the resource does not exist, create it.
if err := createResource(info); err != nil {
return goerrors.Wrap(err, "failed to create resource")
return errors.Wrap(err, "failed to create resource")
}
kind := info.Mapping.GroupVersionKind.Kind
@ -286,7 +191,7 @@ func (c *Client) Update(originalReader, targetReader io.Reader, force, recreate
originalInfo := original.Get(info)
if originalInfo == nil {
kind := info.Mapping.GroupVersionKind.Kind
return goerrors.Errorf("no %s with the name %q found", kind, info.Name)
return errors.Errorf("no %s with the name %q found", kind, info.Name)
}
if err := updateResource(c, info, originalInfo.Object, force, recreate); err != nil {
@ -301,7 +206,7 @@ func (c *Client) Update(originalReader, targetReader io.Reader, force, recreate
case err != nil:
return err
case len(updateErrors) != 0:
return goerrors.Errorf(strings.Join(updateErrors, " && "))
return errors.Errorf(strings.Join(updateErrors, " && "))
}
for _, info := range original.Difference(target) {
@ -329,14 +234,14 @@ func (c *Client) Delete(reader io.Reader) error {
}
func (c *Client) skipIfNotFound(err error) error {
if errors.IsNotFound(err) {
if apierrors.IsNotFound(err) {
c.Log("%v", err)
return nil
}
return err
}
func (c *Client) watchTimeout(t time.Duration) resourceActorFunc {
func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error {
return func(info *resource.Info) error {
return c.watchUntilReady(t, info)
}
@ -364,7 +269,7 @@ func (c *Client) WatchUntilReady(reader io.Reader, timeout time.Duration) error
return perform(infos, c.watchTimeout(timeout))
}
func perform(infos Result, fn resourceActorFunc) error {
func perform(infos Result, fn func(*resource.Info) error) error {
if len(infos) == 0 {
return ErrNoObjectsVisited
}
@ -395,11 +300,11 @@ func deleteResource(info *resource.Info) error {
func createPatch(target *resource.Info, current runtime.Object) ([]byte, types.PatchType, error) {
oldData, err := json.Marshal(current)
if err != nil {
return nil, types.StrategicMergePatchType, goerrors.Wrap(err, "serializing current configuration")
return nil, types.StrategicMergePatchType, errors.Wrap(err, "serializing current configuration")
}
newData, err := json.Marshal(target.Object)
if err != nil {
return nil, types.StrategicMergePatchType, goerrors.Wrap(err, "serializing target configuration")
return nil, types.StrategicMergePatchType, errors.Wrap(err, "serializing target configuration")
}
// While different objects need different merge types, the parent function
@ -429,14 +334,14 @@ func createPatch(target *resource.Info, current runtime.Object) ([]byte, types.P
func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, force, recreate bool) error {
patch, patchType, err := createPatch(target, currentObj)
if err != nil {
return goerrors.Wrap(err, "failed to create patch")
return errors.Wrap(err, "failed to create patch")
}
if patch == nil {
c.Log("Looks like there are no changes for %s %q", target.Mapping.GroupVersionKind.Kind, target.Name)
// This needs to happen to make sure that tiller has the latest info from the API
// Otherwise there will be no labels and other functions that use labels will panic
if err := target.Get(); err != nil {
return goerrors.Wrap(err, "error trying to refresh resource information")
return errors.Wrap(err, "error trying to refresh resource information")
}
} else {
// send patch to server
@ -456,7 +361,7 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object,
// ... and recreate
if err := createResource(target); err != nil {
return goerrors.Wrap(err, "failed to recreate resource")
return errors.Wrap(err, "failed to recreate resource")
}
log.Printf("Created a new %s called %q\n", kind, target.Name)
@ -478,7 +383,7 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object,
}
versioned := asVersioned(target)
selector, err := getSelectorFromObject(versioned)
selector, err := selectorsForObject(versioned)
if err != nil {
return nil
}
@ -489,8 +394,7 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object,
}
pods, err := client.CoreV1().Pods(target.Namespace).List(metav1.ListOptions{
FieldSelector: fields.Everything().String(),
LabelSelector: labels.Set(selector).AsSelector().String(),
LabelSelector: selector.String(),
})
if err != nil {
return err
@ -508,48 +412,6 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object,
return nil
}
func getSelectorFromObject(obj runtime.Object) (map[string]string, error) {
switch typed := obj.(type) {
case *v1.ReplicationController:
return typed.Spec.Selector, nil
case *extv1beta1.ReplicaSet:
return typed.Spec.Selector.MatchLabels, nil
case *appsv1.ReplicaSet:
return typed.Spec.Selector.MatchLabels, nil
case *extv1beta1.Deployment:
return typed.Spec.Selector.MatchLabels, nil
case *appsv1beta1.Deployment:
return typed.Spec.Selector.MatchLabels, nil
case *appsv1beta2.Deployment:
return typed.Spec.Selector.MatchLabels, nil
case *appsv1.Deployment:
return typed.Spec.Selector.MatchLabels, nil
case *extv1beta1.DaemonSet:
return typed.Spec.Selector.MatchLabels, nil
case *appsv1beta2.DaemonSet:
return typed.Spec.Selector.MatchLabels, nil
case *appsv1.DaemonSet:
return typed.Spec.Selector.MatchLabels, nil
case *batch.Job:
return typed.Spec.Selector.MatchLabels, nil
case *appsv1beta1.StatefulSet:
return typed.Spec.Selector.MatchLabels, nil
case *appsv1beta2.StatefulSet:
return typed.Spec.Selector.MatchLabels, nil
case *appsv1.StatefulSet:
return typed.Spec.Selector.MatchLabels, nil
default:
return nil, goerrors.Errorf("unsupported kind when getting selector: %v", obj)
}
}
func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error {
w, err := resource.NewHelper(info.Client, info.Mapping).WatchSingle(info.Namespace, info.Name, info.ResourceVersion)
if err != nil {
@ -585,7 +447,7 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err
case watch.Error:
// Handle error and return with an error.
c.Log("Error event for %s", info.Name)
return true, goerrors.Errorf("failed to deploy %s", info.Name)
return true, errors.Errorf("failed to deploy %s", info.Name)
default:
return false, nil
}
@ -597,16 +459,16 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err
//
// This operates on an event returned from a watcher.
func (c *Client) waitForJob(e watch.Event, name string) (bool, error) {
o, ok := e.Object.(*batchinternal.Job)
o, ok := e.Object.(*batch.Job)
if !ok {
return true, goerrors.Errorf("expected %s to be a *batch.Job, got %T", name, e.Object)
return true, errors.Errorf("expected %s to be a *batch.Job, got %T", name, e.Object)
}
for _, c := range o.Status.Conditions {
if c.Type == batchinternal.JobComplete && c.Status == "True" {
if c.Type == batch.JobComplete && c.Status == "True" {
return true, nil
} else if c.Type == batchinternal.JobFailed && c.Status == "True" {
return true, goerrors.Errorf("job failed: %s", c.Reason)
} else if c.Type == batch.JobFailed && c.Status == "True" {
return true, errors.Errorf("job failed: %s", c.Reason)
}
}
@ -622,7 +484,7 @@ func scrubValidationError(err error) error {
const stopValidateMessage = "if you choose to ignore these errors, turn validation off with --validate=false"
if strings.Contains(err.Error(), stopValidateMessage) {
return goerrors.New(strings.ReplaceAll(err.Error(), "; "+stopValidateMessage, ""))
return errors.New(strings.ReplaceAll(err.Error(), "; "+stopValidateMessage, ""))
}
return err
}
@ -652,61 +514,3 @@ func (c *Client) WaitAndGetCompletedPodPhase(name string, timeout time.Duration)
return v1.PodUnknown, err
}
// get a kubernetes resources' relation pods
// kubernetes resource used select labels to relate pods
func (c *Client) getSelectRelationPod(info *resource.Info, objPods map[string][]v1.Pod) (map[string][]v1.Pod, error) {
if info == nil {
return objPods, nil
}
c.Log("get relation pod of object: %s/%s/%s", info.Namespace, info.Mapping.GroupVersionKind.Kind, info.Name)
versioned := asVersioned(info)
// We can ignore this error because it will only error if it isn't a type that doesn't
// have pods. In that case, we don't care
selector, _ := getSelectorFromObject(versioned)
selectorString := labels.Set(selector).AsSelector().String()
// If we have an empty selector, this likely is a service or config map, so bail out now
if selectorString == "" {
return objPods, nil
}
client, _ := c.KubernetesClientSet()
pods, err := client.CoreV1().Pods(info.Namespace).List(metav1.ListOptions{
FieldSelector: fields.Everything().String(),
LabelSelector: labels.Set(selector).AsSelector().String(),
})
if err != nil {
return objPods, err
}
for _, pod := range pods.Items {
if pod.APIVersion == "" {
pod.APIVersion = "v1"
}
if pod.Kind == "" {
pod.Kind = "Pod"
}
vk := pod.GroupVersionKind().Version + "/" + pod.GroupVersionKind().Kind
if !isFoundPod(objPods[vk], pod) {
objPods[vk] = append(objPods[vk], pod)
}
}
return objPods, nil
}
func isFoundPod(podItem []v1.Pod, pod v1.Pod) bool {
for _, value := range podItem {
if (value.Namespace == pod.Namespace) && (value.Name == pod.Name) {
return true
}
}
return false
}

@ -27,11 +27,10 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest/fake"
cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing"
"k8s.io/kubernetes/pkg/kubectl/scheme"
)
var unstructuredSerializer = resource.UnstructuredPlusDefaultContentConfig().NegotiatedSerializer
@ -92,15 +91,11 @@ func newResponse(code int, obj runtime.Object) (*http.Response, error) {
return &http.Response{StatusCode: code, Header: header, Body: body}, nil
}
type testClient struct {
*Client
*cmdtesting.TestFactory
}
func newTestClient() *testClient {
tf := cmdtesting.NewTestFactory()
c := &Client{Factory: tf, Log: nopLogger}
return &testClient{Client: c, TestFactory: tf}
func newTestClient() *Client {
return &Client{
Factory: cmdtesting.NewTestFactory().WithNamespace("default"),
Log: nopLogger,
}
}
func TestUpdate(t *testing.T) {
@ -112,9 +107,8 @@ func TestUpdate(t *testing.T) {
var actions []string
tf := cmdtesting.NewTestFactory().WithNamespace("default")
defer tf.Cleanup()
tf.UnstructuredClient = &fake.RESTClient{
c := newTestClient()
c.Factory.(*cmdtesting.TestFactory).UnstructuredClient = &fake.RESTClient{
NegotiatedSerializer: unstructuredSerializer,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
p, m := req.URL.Path, req.Method
@ -148,11 +142,6 @@ func TestUpdate(t *testing.T) {
}
}),
}
c := &Client{
Factory: tf,
Log: nopLogger,
}
if err := c.Update(objBody(&listA), objBody(&listB), false, false); err != nil {
t.Fatal(err)
}
@ -210,8 +199,6 @@ func TestBuild(t *testing.T) {
c := newTestClient()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c.Cleanup()
// Test for an invalid manifest
infos, err := c.Build(tt.reader)
if err != nil && !tt.err {
@ -227,49 +214,6 @@ func TestBuild(t *testing.T) {
}
}
func TestGet(t *testing.T) {
list := newPodList("starfish", "otter")
c := newTestClient()
defer c.Cleanup()
c.TestFactory.UnstructuredClient = &fake.RESTClient{
GroupVersion: schema.GroupVersion{Version: "v1"},
NegotiatedSerializer: unstructuredSerializer,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
p, m := req.URL.Path, req.Method
t.Logf("got request %s %s", p, m)
switch {
case p == "/namespaces/default/pods/starfish" && m == "GET":
return newResponse(404, notFoundBody())
case p == "/namespaces/default/pods/otter" && m == "GET":
return newResponse(200, &list.Items[1])
default:
t.Fatalf("unexpected request: %s %s", req.Method, req.URL.Path)
return nil, nil
}
}),
}
// Test Success
data := strings.NewReader("kind: Pod\napiVersion: v1\nmetadata:\n name: otter")
o, err := c.Get(data)
if err != nil {
t.Errorf("Expected missing results, got %q", err)
}
if !strings.Contains(o, "==> v1/Pod") && !strings.Contains(o, "otter") {
t.Errorf("Expected v1/Pod otter, got %s", o)
}
// Test failure
data = strings.NewReader("kind: Pod\napiVersion: v1\nmetadata:\n name: starfish")
o, err = c.Get(data)
if err != nil {
t.Errorf("Expected missing results, got %q", err)
}
if !strings.Contains(o, "MISSING") && !strings.Contains(o, "pods\t\tstarfish") {
t.Errorf("Expected missing starfish, got %s", o)
}
}
func TestPerform(t *testing.T) {
tests := []struct {
name string
@ -300,7 +244,6 @@ func TestPerform(t *testing.T) {
}
c := newTestClient()
defer c.Cleanup()
infos, err := c.Build(tt.reader)
if err != nil && err.Error() != tt.errMessage {
t.Errorf("Error while building manifests: %v", err)

@ -20,17 +20,15 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/client-go/kubernetes/scheme"
)
func asVersioned(info *resource.Info) runtime.Object {
converter := runtime.ObjectConvertor(legacyscheme.Scheme)
groupVersioner := runtime.GroupVersioner(schema.GroupVersions(legacyscheme.Scheme.PrioritizedVersionsAllGroups()))
gv := runtime.GroupVersioner(schema.GroupVersions(scheme.Scheme.PrioritizedVersionsAllGroups()))
if info.Mapping != nil {
groupVersioner = info.Mapping.GroupVersionKind.GroupVersion()
gv = info.Mapping.GroupVersionKind.GroupVersion()
}
if obj, err := converter.ConvertToVersion(info.Object, groupVersioner); err == nil {
if obj, err := runtime.ObjectConvertor(scheme.Scheme).ConvertToVersion(info.Object, gv); err == nil {
return obj
}
return info.Object

@ -0,0 +1,66 @@
/*
Copyright The Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube
import (
"io"
"time"
v1 "k8s.io/api/core/v1"
)
// KubernetesClient represents a client capable of communicating with the Kubernetes API.
//
// A KubernetesClient must be concurrency safe.
type Interface interface {
// Create creates one or more resources.
//
// reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n").
Create(reader io.Reader) error
Wait(r io.Reader, timeout time.Duration) error
// Delete destroys one or more resources.
//
// reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n").
Delete(io.Reader) error
// Watch the resource in reader until it is "ready".
//
// For Jobs, "ready" means the job ran to completion (excited without error).
// For all other kinds, it means the kind was created or modified without
// error.
WatchUntilReady(reader io.Reader, timeout time.Duration) error
// Update updates one or more resources or creates the resource
// if it doesn't exist.
//
// reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n").
Update(originalReader, modifiedReader io.Reader, force bool, recreate bool) error
Build(reader io.Reader) (Result, error)
BuildUnstructured(reader io.Reader) (Result, error)
// WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase
// and returns said phase (PodSucceeded or PodFailed qualify).
WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error)
}
var _ Interface = (*Client)(nil)

@ -24,53 +24,6 @@ import (
"k8s.io/cli-runtime/pkg/resource"
)
// KubernetesClient represents a client capable of communicating with the Kubernetes API.
//
// A KubernetesClient must be concurrency safe.
type KubernetesClient interface {
// Create creates one or more resources.
//
// reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n").
Create(reader io.Reader) error
Wait(r io.Reader, timeout time.Duration) error
// Get gets one or more resources. Returned string hsa the format like kubectl
// provides with the column headers separating the resource types.
//
// reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n").
Get(reader io.Reader) (string, error)
// Delete destroys one or more resources.
//
// reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n").
Delete(reader io.Reader) error
// Watch the resource in reader until it is "ready".
//
// For Jobs, "ready" means the job ran to completion (excited without error).
// For all other kinds, it means the kind was created or modified without
// error.
WatchUntilReady(reader io.Reader, timeout time.Duration) error
// Update updates one or more resources or creates the resource
// if it doesn't exist.
//
// reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n").
Update(originalReader, modifiedReader io.Reader, force bool, recreate bool) error
Build(reader io.Reader) (Result, error)
BuildUnstructured(reader io.Reader) (Result, error)
// WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase
// and returns said phase (PodSucceeded or PodFailed qualify).
WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error)
}
// PrintingKubeClient implements KubeClient, but simply prints the reader to
// the given output.
type PrintingKubeClient struct {
@ -83,7 +36,7 @@ func (p *PrintingKubeClient) Create(r io.Reader) error {
return err
}
func (p *PrintingKubeClient) Wait(r io.Reader, timeout time.Duration) error {
func (p *PrintingKubeClient) Wait(r io.Reader, _ time.Duration) error {
_, err := io.Copy(p.Out, r)
return err
}
@ -103,28 +56,27 @@ func (p *PrintingKubeClient) Delete(r io.Reader) error {
}
// WatchUntilReady implements KubeClient WatchUntilReady.
func (p *PrintingKubeClient) WatchUntilReady(r io.Reader, timeout time.Duration) error {
func (p *PrintingKubeClient) WatchUntilReady(r io.Reader, _ time.Duration) error {
_, err := io.Copy(p.Out, r)
return err
}
// Update implements KubeClient Update.
func (p *PrintingKubeClient) Update(currentReader, modifiedReader io.Reader, force, recreate bool) error {
func (p *PrintingKubeClient) Update(_, modifiedReader io.Reader, _, _ bool) error {
_, err := io.Copy(p.Out, modifiedReader)
return err
}
// Build implements KubeClient Build.
func (p *PrintingKubeClient) Build(reader io.Reader) (Result, error) {
func (p *PrintingKubeClient) Build(_ io.Reader) (Result, error) {
return []*resource.Info{}, nil
}
// BuildUnstructured implements KubeClient BuildUnstructured.
func (p *PrintingKubeClient) BuildUnstructured(reader io.Reader) (Result, error) {
return []*resource.Info{}, nil
func (p *PrintingKubeClient) BuildUnstructured(_ io.Reader) (Result, error) {
return p.Build(nil)
}
// WaitAndGetCompletedPodPhase implements KubeClient WaitAndGetCompletedPodPhase.
func (p *PrintingKubeClient) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) {
func (p *PrintingKubeClient) WaitAndGetCompletedPodPhase(_ string, _ time.Duration) (v1.PodPhase, error) {
return v1.PodSucceeded, nil
}

@ -1,79 +0,0 @@
/*
Copyright The Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube
import (
"bytes"
"io"
"testing"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/cli-runtime/pkg/resource"
)
type mockKubeClient struct{}
func (k *mockKubeClient) Wait(r io.Reader, _ time.Duration) error {
return nil
}
func (k *mockKubeClient) Create(r io.Reader) error {
return nil
}
func (k *mockKubeClient) Get(r io.Reader) (string, error) {
return "", nil
}
func (k *mockKubeClient) Delete(r io.Reader) error {
return nil
}
func (k *mockKubeClient) Update(currentReader, modifiedReader io.Reader, force, recreate bool) error {
return nil
}
func (k *mockKubeClient) WatchUntilReady(r io.Reader, timeout time.Duration) error {
return nil
}
func (k *mockKubeClient) Build(reader io.Reader) (Result, error) {
return []*resource.Info{}, nil
}
func (k *mockKubeClient) BuildUnstructured(reader io.Reader) (Result, error) {
return []*resource.Info{}, nil
}
func (k *mockKubeClient) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) {
return v1.PodUnknown, nil
}
var _ KubernetesClient = &mockKubeClient{}
var _ KubernetesClient = &PrintingKubeClient{}
func TestKubeClient(t *testing.T) {
kc := &mockKubeClient{}
manifests := map[string]string{
"foo": "name: value\n",
"bar": "name: value\n",
}
b := bytes.NewBuffer(nil)
for _, content := range manifests {
b.WriteString("\n---\n")
b.WriteString(content)
}
if err := kc.Create(b); err != nil {
t.Errorf("Kubeclient failed: %s", err)
}
}

@ -76,9 +76,7 @@ func (r Result) Difference(rs Result) Result {
// Intersect will return a new Result with objects contained in both Results.
func (r Result) Intersect(rs Result) Result {
return r.Filter(func(info *resource.Info) bool {
return rs.Contains(info)
})
return r.Filter(rs.Contains)
}
// isMatchingInfo returns true if infos match on Name and GroupVersionKind.

@ -17,27 +17,24 @@ limitations under the License.
package kube // import "helm.sh/helm/pkg/kube"
import (
"fmt"
"time"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
appsv1beta1 "k8s.io/api/apps/v1beta1"
appsv1beta2 "k8s.io/api/apps/v1beta2"
v1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
)
// deployment holds associated replicaSets for a deployment
type deployment struct {
replicaSets *appsv1.ReplicaSet
deployment *appsv1.Deployment
}
type waiter struct {
c kubernetes.Interface
timeout time.Duration
@ -50,26 +47,17 @@ func (w *waiter) waitForResources(created Result) error {
w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout)
return wait.Poll(2*time.Second, w.timeout, func() (bool, error) {
for _, v := range created[:0] {
var (
pods []v1.Pod
services []v1.Service
pvc []v1.PersistentVolumeClaim
deployments []deployment
ok bool
err error
)
for _, v := range created[:0] {
switch value := asVersioned(v).(type) {
case *v1.ReplicationController:
list, err := getPods(w.c, value.Namespace, value.Spec.Selector)
if err != nil {
return false, err
}
pods = append(pods, list...)
case *v1.Pod:
case *corev1.Pod:
pod, err := w.c.CoreV1().Pods(value.Namespace).Get(value.Name, metav1.GetOptions{})
if err != nil {
if err != nil || !w.isPodReady(pod) {
return false, err
}
pods = append(pods, *pod)
case *appsv1.Deployment:
currentDeployment, err := w.c.AppsV1().Deployments(value.Namespace).Get(value.Name, metav1.GetOptions{})
if err != nil {
@ -80,11 +68,9 @@ func (w *waiter) waitForResources(created Result) error {
if err != nil || newReplicaSet == nil {
return false, err
}
newDeployment := deployment{
newReplicaSet,
currentDeployment,
if !w.deploymentReady(newReplicaSet, currentDeployment) {
return false, nil
}
deployments = append(deployments, newDeployment)
case *appsv1beta1.Deployment:
currentDeployment, err := w.c.AppsV1().Deployments(value.Namespace).Get(value.Name, metav1.GetOptions{})
if err != nil {
@ -95,11 +81,9 @@ func (w *waiter) waitForResources(created Result) error {
if err != nil || newReplicaSet == nil {
return false, err
}
newDeployment := deployment{
newReplicaSet,
currentDeployment,
if !w.deploymentReady(newReplicaSet, currentDeployment) {
return false, nil
}
deployments = append(deployments, newDeployment)
case *appsv1beta2.Deployment:
currentDeployment, err := w.c.AppsV1().Deployments(value.Namespace).Get(value.Name, metav1.GetOptions{})
if err != nil {
@ -110,12 +94,10 @@ func (w *waiter) waitForResources(created Result) error {
if err != nil || newReplicaSet == nil {
return false, err
}
newDeployment := deployment{
newReplicaSet,
currentDeployment,
if !w.deploymentReady(newReplicaSet, currentDeployment) {
return false, nil
}
deployments = append(deployments, newDeployment)
case *extensions.Deployment:
case *extensionsv1beta1.Deployment:
currentDeployment, err := w.c.AppsV1().Deployments(value.Namespace).Get(value.Name, metav1.GetOptions{})
if err != nil {
return false, err
@ -125,155 +107,175 @@ func (w *waiter) waitForResources(created Result) error {
if err != nil || newReplicaSet == nil {
return false, err
}
newDeployment := deployment{
newReplicaSet,
currentDeployment,
if !w.deploymentReady(newReplicaSet, currentDeployment) {
return false, nil
}
deployments = append(deployments, newDeployment)
case *extensions.DaemonSet:
list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels)
case *corev1.PersistentVolumeClaim:
claim, err := w.c.CoreV1().PersistentVolumeClaims(value.Namespace).Get(value.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
pods = append(pods, list...)
case *appsv1.DaemonSet:
list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels)
if err != nil {
return false, err
if !w.volumeReady(claim) {
return false, nil
}
pods = append(pods, list...)
case *appsv1beta2.DaemonSet:
list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels)
case *corev1.Service:
svc, err := w.c.CoreV1().Services(value.Namespace).Get(value.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
pods = append(pods, list...)
case *appsv1.StatefulSet:
list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels)
if err != nil {
return false, err
if !w.serviceReady(svc) {
return false, nil
}
pods = append(pods, list...)
case *corev1.ReplicationController:
ok, err = w.podsReadyForObject(value.Namespace, value)
case *extensionsv1beta1.DaemonSet:
ok, err = w.podsReadyForObject(value.Namespace, value)
case *appsv1.DaemonSet:
ok, err = w.podsReadyForObject(value.Namespace, value)
case *appsv1beta2.DaemonSet:
ok, err = w.podsReadyForObject(value.Namespace, value)
case *appsv1.StatefulSet:
ok, err = w.podsReadyForObject(value.Namespace, value)
case *appsv1beta1.StatefulSet:
list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels)
if err != nil {
return false, err
}
pods = append(pods, list...)
ok, err = w.podsReadyForObject(value.Namespace, value)
case *appsv1beta2.StatefulSet:
list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels)
if err != nil {
return false, err
}
pods = append(pods, list...)
case *extensions.ReplicaSet:
list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels)
if err != nil {
return false, err
}
pods = append(pods, list...)
ok, err = w.podsReadyForObject(value.Namespace, value)
case *extensionsv1beta1.ReplicaSet:
ok, err = w.podsReadyForObject(value.Namespace, value)
case *appsv1beta2.ReplicaSet:
list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels)
if err != nil {
return false, err
}
pods = append(pods, list...)
ok, err = w.podsReadyForObject(value.Namespace, value)
case *appsv1.ReplicaSet:
list, err := getPods(w.c, value.Namespace, value.Spec.Selector.MatchLabels)
if err != nil {
return false, err
ok, err = w.podsReadyForObject(value.Namespace, value)
}
pods = append(pods, list...)
case *v1.PersistentVolumeClaim:
claim, err := w.c.CoreV1().PersistentVolumeClaims(value.Namespace).Get(value.Name, metav1.GetOptions{})
if err != nil {
if !ok || err != nil {
return false, err
}
pvc = append(pvc, *claim)
case *v1.Service:
svc, err := w.c.CoreV1().Services(value.Namespace).Get(value.Name, metav1.GetOptions{})
}
return true, nil
})
}
func (w *waiter) podsReadyForObject(namespace string, obj runtime.Object) (bool, error) {
pods, err := w.podsforObject(namespace, obj)
if err != nil {
return false, err
}
services = append(services, *svc)
for _, pod := range pods {
if !w.isPodReady(&pod) {
return false, nil
}
}
isReady := w.podsReady(pods) && w.servicesReady(services) && w.volumesReady(pvc) && w.deploymentsReady(deployments)
return isReady, nil
})
return true, nil
}
func (w *waiter) podsReady(pods []v1.Pod) bool {
for _, pod := range pods {
if !IsPodReady(&pod) {
w.log("Pod is not ready: %s/%s", pod.GetNamespace(), pod.GetName())
return false
}
func (w *waiter) podsforObject(namespace string, obj runtime.Object) ([]corev1.Pod, error) {
selector, err := selectorsForObject(obj)
if err != nil {
return nil, err
}
return true
list, err := getPods(w.c, namespace, selector.String())
return list, err
}
// IsPodReady returns true if a pod is ready; false otherwise.
func IsPodReady(pod *v1.Pod) bool {
// isPodReady returns true if a pod is ready; false otherwise.
func (w *waiter) isPodReady(pod *corev1.Pod) bool {
for _, c := range pod.Status.Conditions {
if c.Type == v1.PodReady && c.Status == v1.ConditionTrue {
if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue {
return true
}
}
w.log("Pod is not ready: %s/%s", pod.GetNamespace(), pod.GetName())
return false
}
func (w *waiter) servicesReady(svc []v1.Service) bool {
for _, s := range svc {
func (w *waiter) serviceReady(s *corev1.Service) bool {
// ExternalName Services are external to cluster so helm shouldn't be checking to see if they're 'ready' (i.e. have an IP Set)
if s.Spec.Type == v1.ServiceTypeExternalName {
continue
if s.Spec.Type == corev1.ServiceTypeExternalName {
return true
}
// Make sure the service is not explicitly set to "None" before checking the IP
if s.Spec.ClusterIP != v1.ClusterIPNone && !IsServiceIPSet(&s) {
w.log("Service is not ready: %s/%s", s.GetNamespace(), s.GetName())
return false
}
if s.Spec.ClusterIP != corev1.ClusterIPNone && !isServiceIPSet(s) ||
// This checks if the service has a LoadBalancer and that balancer has an Ingress defined
if s.Spec.Type == v1.ServiceTypeLoadBalancer && s.Status.LoadBalancer.Ingress == nil {
s.Spec.Type == corev1.ServiceTypeLoadBalancer && s.Status.LoadBalancer.Ingress == nil {
w.log("Service is not ready: %s/%s", s.GetNamespace(), s.GetName())
return false
}
}
return true
}
// IsServiceIPSet aims to check if the service's ClusterIP is set or not
// isServiceIPSet aims to check if the service's ClusterIP is set or not
// the objective is not to perform validation here
func IsServiceIPSet(service *v1.Service) bool {
return service.Spec.ClusterIP != v1.ClusterIPNone && service.Spec.ClusterIP != ""
func isServiceIPSet(service *corev1.Service) bool {
return service.Spec.ClusterIP != corev1.ClusterIPNone && service.Spec.ClusterIP != ""
}
func (w *waiter) volumesReady(vols []v1.PersistentVolumeClaim) bool {
for _, v := range vols {
if v.Status.Phase != v1.ClaimBound {
func (w *waiter) volumeReady(v *corev1.PersistentVolumeClaim) bool {
if v.Status.Phase != corev1.ClaimBound {
w.log("PersistentVolumeClaim is not ready: %s/%s", v.GetNamespace(), v.GetName())
return false
}
}
return true
}
func (w *waiter) deploymentsReady(deployments []deployment) bool {
for _, v := range deployments {
if !(v.replicaSets.Status.ReadyReplicas >= *v.deployment.Spec.Replicas-deploymentutil.MaxUnavailable(*v.deployment)) {
w.log("Deployment is not ready: %s/%s", v.deployment.GetNamespace(), v.deployment.GetName())
func (w *waiter) deploymentReady(replicaSet *appsv1.ReplicaSet, deployment *appsv1.Deployment) bool {
if !(replicaSet.Status.ReadyReplicas >= *deployment.Spec.Replicas-deploymentutil.MaxUnavailable(*deployment)) {
w.log("Deployment is not ready: %s/%s", deployment.GetNamespace(), deployment.GetName())
return false
}
}
return true
}
func getPods(client kubernetes.Interface, namespace string, selector map[string]string) ([]v1.Pod, error) {
func getPods(client kubernetes.Interface, namespace, selector string) ([]corev1.Pod, error) {
list, err := client.CoreV1().Pods(namespace).List(metav1.ListOptions{
FieldSelector: fields.Everything().String(),
LabelSelector: labels.Set(selector).AsSelector().String(),
LabelSelector: selector,
})
return list.Items, err
}
// selectorsForObject returns the pod label selector for a given object
//
// Modified version of https://github.com/kubernetes/kubernetes/blob/v1.14.1/pkg/kubectl/polymorphichelpers/helpers.go#L84
func selectorsForObject(object runtime.Object) (selector labels.Selector, err error) {
switch t := object.(type) {
case *extensionsv1beta1.ReplicaSet:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *appsv1.ReplicaSet:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *appsv1beta2.ReplicaSet:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *corev1.ReplicationController:
selector = labels.SelectorFromSet(t.Spec.Selector)
case *appsv1.StatefulSet:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *appsv1beta1.StatefulSet:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *appsv1beta2.StatefulSet:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *extensionsv1beta1.DaemonSet:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *appsv1.DaemonSet:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *appsv1beta2.DaemonSet:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *extensionsv1beta1.Deployment:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *appsv1.Deployment:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *appsv1beta1.Deployment:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *appsv1beta2.Deployment:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *batchv1.Job:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)
case *corev1.Service:
if t.Spec.Selector == nil || len(t.Spec.Selector) == 0 {
return nil, fmt.Errorf("invalid service '%s': Service is defined without a selector", t.Name)
}
selector = labels.SelectorFromSet(t.Spec.Selector)
default:
return nil, fmt.Errorf("selector for %T not implemented", object)
}
return selector, errors.Wrap(err, "invalid label selector")
}

@ -31,7 +31,7 @@ import (
// Environment encapsulates information about where test suite executes and returns results
type Environment struct {
Namespace string
KubeClient kube.KubernetesClient
KubeClient kube.Interface
Messages chan *release.TestReleaseResponse
Timeout time.Duration
}

@ -244,7 +244,7 @@ func testEnvFixture() *Environment {
}
type mockKubeClient struct {
kube.KubernetesClient
kube.Interface
podFail bool
err error
}
@ -255,12 +255,5 @@ func (c *mockKubeClient) WaitAndGetCompletedPodPhase(_ string, _ time.Duration)
}
return v1.PodSucceeded, nil
}
func (c *mockKubeClient) Get(_ io.Reader) (string, error) {
return "", nil
}
func (c *mockKubeClient) Create(_ io.Reader) error {
return c.err
}
func (c *mockKubeClient) Delete(_ io.Reader) error {
return nil
}
func (c *mockKubeClient) Create(_ io.Reader) error { return c.err }
func (c *mockKubeClient) Delete(_ io.Reader) error { return nil }

Loading…
Cancel
Save