Merge branch 'dev-v3' of https://github.com/helm/helm into test-as-hook

pull/6054/head
Jacob LeGrone 5 years ago
commit 5ab42504f1
No known key found for this signature in database
GPG Key ID: 5FD0852F235368C1

438
Gopkg.lock generated

File diff suppressed because it is too large Load Diff

@ -58,6 +58,18 @@
name = "github.com/stretchr/testify" name = "github.com/stretchr/testify"
version = "^1.3.0" version = "^1.3.0"
[[constraint]]
name = "github.com/xeipuuv/gojsonschema"
version = "1.1.0"
[[constraint]]
name = "github.com/spf13/cobra"
version = "0.0.4"
[[constraint]]
name = "sigs.k8s.io/yaml"
version = "1.1.0"
[[override]] [[override]]
name = "sigs.k8s.io/kustomize" name = "sigs.k8s.io/kustomize"
version = "2.0.3" version = "2.0.3"
@ -104,15 +116,3 @@
[prune] [prune]
go-tests = true go-tests = true
[[constraint]]
name = "github.com/xeipuuv/gojsonschema"
version = "1.1.0"
[[constraint]]
name = "github.com/spf13/cobra"
version = "0.0.4"
[[constraint]]
name = "sigs.k8s.io/yaml"
version = "1.1.0"

@ -82,7 +82,7 @@ func initActionConfig(actionConfig *action.Configuration, allNamespaces bool) {
kc := kube.New(kubeConfig()) kc := kube.New(kubeConfig())
kc.Log = logf kc.Log = logf
clientset, err := kc.KubernetesClientSet() clientset, err := kc.Factory.KubernetesClientSet()
if err != nil { if err != nil {
// TODO return error // TODO return error
log.Fatal(err) log.Fatal(err)

@ -24,6 +24,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/discovery" "k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"helm.sh/helm/pkg/chartutil" "helm.sh/helm/pkg/chartutil"
@ -110,6 +111,15 @@ func (c *Configuration) getCapabilities() (*chartutil.Capabilities, error) {
return c.Capabilities, nil return c.Capabilities, nil
} }
func (c *Configuration) KubernetesClientSet() (kubernetes.Interface, error) {
conf, err := c.RESTClientGetter.ToRESTConfig()
if err != nil {
return nil, errors.Wrap(err, "unable to generate config for kubernetes client")
}
return kubernetes.NewForConfig(conf)
}
// Now generates a timestamp // Now generates a timestamp
// //
// If the configuration has a Timestamper on it, that will be used. // If the configuration has a Timestamper on it, that will be used.

@ -40,20 +40,21 @@ func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent,
sort.Sort(hookByWeight(executingHooks)) sort.Sort(hookByWeight(executingHooks))
for _, h := range executingHooks { for _, h := range executingHooks {
if err := deleteHookByPolicy(cfg, h, release.HookBeforeHookCreation); err != nil { if err := cfg.deleteHookByPolicy(h, release.HookBeforeHookCreation); err != nil {
return err return err
} }
b := bytes.NewBufferString(h.Manifest) resources, err := cfg.KubeClient.Build(bytes.NewBufferString(h.Manifest))
if err := cfg.KubeClient.Create(b); err != nil { if err != nil {
return errors.Wrapf(err, "unable to build kubernetes object for %s hook %s", hook, h.Path)
}
if _, err := cfg.KubeClient.Create(resources); err != nil {
return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path) return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path)
} }
b.Reset()
b.WriteString(h.Manifest)
// Get the time at which the hook was applied to the cluster // Get the time at which the hook was applied to the cluster
start := time.Now() start := time.Now()
err := cfg.KubeClient.WatchUntilReady(b, timeout) err = cfg.KubeClient.WatchUntilReady(resources, timeout)
h.LastRun = release.HookExecution{ h.LastRun = release.HookExecution{
StartedAt: start, StartedAt: start,
CompletedAt: time.Now(), CompletedAt: time.Now(),
@ -62,7 +63,7 @@ func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent,
if err != nil { if err != nil {
// If a hook is failed, checkout the annotation of the hook to determine whether the hook should be deleted // If a hook is failed, checkout the annotation of the hook to determine whether the hook should be deleted
// under failed condition. If so, then clear the corresponding resource object in the hook // under failed condition. If so, then clear the corresponding resource object in the hook
if err := deleteHookByPolicy(cfg, h, release.HookFailed); err != nil { if err := cfg.deleteHookByPolicy(h, release.HookFailed); err != nil {
return err return err
} }
return err return err
@ -72,7 +73,7 @@ func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent,
// If all hooks are succeeded, checkout the annotation of each hook to determine whether the hook should be deleted // If all hooks are succeeded, checkout the annotation of each hook to determine whether the hook should be deleted
// under succeeded condition. If so, then clear the corresponding resource object in each hook // under succeeded condition. If so, then clear the corresponding resource object in each hook
for _, h := range executingHooks { for _, h := range executingHooks {
if err := deleteHookByPolicy(cfg, h, release.HookSucceeded); err != nil { if err := cfg.deleteHookByPolicy(h, release.HookSucceeded); err != nil {
return err return err
} }
} }
@ -93,10 +94,14 @@ func (x hookByWeight) Less(i, j int) bool {
} }
// deleteHookByPolicy deletes a hook if the hook policy instructs it to // deleteHookByPolicy deletes a hook if the hook policy instructs it to
func deleteHookByPolicy(cfg *Configuration, h *release.Hook, policy release.HookDeletePolicy) error { func (cfg *Configuration) deleteHookByPolicy(h *release.Hook, policy release.HookDeletePolicy) error {
if hookHasDeletePolicy(h, policy) { if hookHasDeletePolicy(h, policy) {
b := bytes.NewBufferString(h.Manifest) resources, err := cfg.KubeClient.Build(bytes.NewBufferString(h.Manifest))
return cfg.KubeClient.Delete(b) if err != nil {
return errors.Wrapf(err, "unable to build kubernetes object for deleting hook %s", h.Path)
}
_, errs := cfg.KubeClient.Delete(resources)
return errors.New(joinErrors(errs))
} }
return nil return nil
} }

@ -19,7 +19,6 @@ package action
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"net/url" "net/url"
"os" "os"
@ -168,8 +167,10 @@ func (i *Install) Run(chrt *chart.Chart) (*release.Release, error) {
// Mark this release as in-progress // Mark this release as in-progress
rel.SetStatus(release.StatusPendingInstall, "Initial install underway") rel.SetStatus(release.StatusPendingInstall, "Initial install underway")
if err := i.validateManifest(manifestDoc); err != nil {
return rel, err resources, err := i.cfg.KubeClient.Build(bytes.NewBufferString(rel.Manifest))
if err != nil {
return nil, errors.Wrap(err, "unable to build kubernetes objects from release manifest")
} }
// Bail out here if it is a dry run // Bail out here if it is a dry run
@ -204,14 +205,12 @@ func (i *Install) Run(chrt *chart.Chart) (*release.Release, error) {
// At this point, we can do the install. Note that before we were detecting whether to // At this point, we can do the install. Note that before we were detecting whether to
// do an update, but it's not clear whether we WANT to do an update if the re-use is set // do an update, but it's not clear whether we WANT to do an update if the re-use is set
// to true, since that is basically an upgrade operation. // to true, since that is basically an upgrade operation.
buf := bytes.NewBufferString(rel.Manifest) if _, err := i.cfg.KubeClient.Create(resources); err != nil {
if err := i.cfg.KubeClient.Create(buf); err != nil {
return i.failRelease(rel, err) return i.failRelease(rel, err)
} }
if i.Wait { if i.Wait {
buf := bytes.NewBufferString(rel.Manifest) if err := i.cfg.KubeClient.Wait(resources, i.Timeout); err != nil {
if err := i.cfg.KubeClient.Wait(buf, i.Timeout); err != nil {
return i.failRelease(rel, err) return i.failRelease(rel, err)
} }
@ -457,12 +456,6 @@ func ensureDirectoryForFile(file string) error {
return os.MkdirAll(baseDir, defaultDirectoryPermission) return os.MkdirAll(baseDir, defaultDirectoryPermission)
} }
// validateManifest checks to see whether the given manifest is valid for the current Kubernetes
func (i *Install) validateManifest(manifest io.Reader) error {
_, err := i.cfg.KubeClient.BuildUnstructured(manifest)
return err
}
// NameAndChart returns the name and chart that should be used. // NameAndChart returns the name and chart that should be used.
// //
// This will read the flags and handle name generation if necessary. // This will read the flags and handle name generation if necessary.

@ -130,12 +130,20 @@ func (r *Rollback) prepareRollback(name string) (*release.Release, *release.Rele
} }
func (r *Rollback) performRollback(currentRelease, targetRelease *release.Release) (*release.Release, error) { func (r *Rollback) performRollback(currentRelease, targetRelease *release.Release) (*release.Release, error) {
if r.DryRun { if r.DryRun {
r.cfg.Log("dry run for %s", targetRelease.Name) r.cfg.Log("dry run for %s", targetRelease.Name)
return targetRelease, nil return targetRelease, nil
} }
current, err := r.cfg.KubeClient.Build(bytes.NewBufferString(currentRelease.Manifest))
if err != nil {
return targetRelease, errors.Wrap(err, "unable to build kubernetes objects from current release manifest")
}
target, err := r.cfg.KubeClient.Build(bytes.NewBufferString(targetRelease.Manifest))
if err != nil {
return targetRelease, errors.Wrap(err, "unable to build kubernetes objects from new release manifest")
}
// pre-rollback hooks // pre-rollback hooks
if !r.DisableHooks { if !r.DisableHooks {
if err := r.cfg.execHook(targetRelease, release.HookPreRollback, r.Timeout); err != nil { if err := r.cfg.execHook(targetRelease, release.HookPreRollback, r.Timeout); err != nil {
@ -145,10 +153,9 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
r.cfg.Log("rollback hooks disabled for %s", targetRelease.Name) r.cfg.Log("rollback hooks disabled for %s", targetRelease.Name)
} }
cr := bytes.NewBufferString(currentRelease.Manifest) results, err := r.cfg.KubeClient.Update(current, target, r.Force)
tr := bytes.NewBufferString(targetRelease.Manifest)
if err := r.cfg.KubeClient.Update(cr, tr, r.Force, r.Recreate); err != nil { if err != nil {
msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err) msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err)
r.cfg.Log("warning: %s", msg) r.cfg.Log("warning: %s", msg)
currentRelease.Info.Status = release.StatusSuperseded currentRelease.Info.Status = release.StatusSuperseded
@ -159,9 +166,18 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
return targetRelease, err return targetRelease, err
} }
if r.Recreate {
// NOTE: Because this is not critical for a release to succeed, we just
// log if an error occurs and continue onward. If we ever introduce log
// levels, we should make these error level logs so users are notified
// that they'll need to go do the cleanup on their own
if err := recreate(r.cfg, results.Updated); err != nil {
r.cfg.Log(err.Error())
}
}
if r.Wait { if r.Wait {
buf := bytes.NewBufferString(targetRelease.Manifest) if err := r.cfg.KubeClient.Wait(target, r.Timeout); err != nil {
if err := r.cfg.KubeClient.Wait(buf, r.Timeout); err != nil {
targetRelease.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", targetRelease.Name, err.Error())) targetRelease.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", targetRelease.Name, err.Error()))
r.cfg.recordRelease(currentRelease) r.cfg.recordRelease(currentRelease)
r.cfg.recordRelease(targetRelease) r.cfg.recordRelease(targetRelease)

@ -17,13 +17,11 @@ limitations under the License.
package action package action
import ( import (
"bytes"
"strings" "strings"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
"helm.sh/helm/pkg/kube"
"helm.sh/helm/pkg/release" "helm.sh/helm/pkg/release"
"helm.sh/helm/pkg/releaseutil" "helm.sh/helm/pkg/releaseutil"
) )
@ -160,7 +158,7 @@ func joinErrors(errs []error) string {
} }
// deleteRelease deletes the release and returns manifests that were kept in the deletion process // deleteRelease deletes the release and returns manifests that were kept in the deletion process
func (u *Uninstall) deleteRelease(rel *release.Release) (kept string, errs []error) { func (u *Uninstall) deleteRelease(rel *release.Release) (string, []error) {
caps, err := u.cfg.getCapabilities() caps, err := u.cfg.getCapabilities()
if err != nil { if err != nil {
return rel.Manifest, []error{errors.Wrap(err, "could not get apiVersions from Kubernetes")} return rel.Manifest, []error{errors.Wrap(err, "could not get apiVersions from Kubernetes")}
@ -177,23 +175,20 @@ func (u *Uninstall) deleteRelease(rel *release.Release) (kept string, errs []err
} }
filesToKeep, filesToDelete := filterManifestsToKeep(files) filesToKeep, filesToDelete := filterManifestsToKeep(files)
var kept string
for _, f := range filesToKeep { for _, f := range filesToKeep {
kept += f.Name + "\n" kept += f.Name + "\n"
} }
var builder strings.Builder
for _, file := range filesToDelete { for _, file := range filesToDelete {
b := bytes.NewBufferString(strings.TrimSpace(file.Content)) builder.WriteString("\n---\n" + file.Content)
if b.Len() == 0 {
continue
}
if err := u.cfg.KubeClient.Delete(b); err != nil {
u.cfg.Log("uninstall: Failed deletion of %q: %s", rel.Name, err)
if err == kube.ErrNoObjectsVisited {
// Rewrite the message from "no objects visited"
err = errors.New("object not found, skipping delete")
}
errs = append(errs, err)
}
} }
resources, err := u.cfg.KubeClient.Build(strings.NewReader(builder.String()))
if err != nil {
return "", []error{errors.Wrap(err, "unable to build kubernetes objects for delete")}
}
_, errs := u.cfg.KubeClient.Delete(resources)
return kept, errs return kept, errs
} }

@ -23,6 +23,8 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"helm.sh/helm/pkg/chart" "helm.sh/helm/pkg/chart"
"helm.sh/helm/pkg/chartutil" "helm.sh/helm/pkg/chartutil"
"helm.sh/helm/pkg/kube" "helm.sh/helm/pkg/kube"
@ -74,7 +76,7 @@ func (u *Upgrade) Run(name string, chart *chart.Chart) (*release.Release, error)
u.Wait = u.Wait || u.Atomic u.Wait = u.Wait || u.Atomic
if err := validateReleaseName(name); err != nil { if err := validateReleaseName(name); err != nil {
return nil, errors.Errorf("upgradeRelease: Release name is invalid: %s", name) return nil, errors.Errorf("release name is invalid: %s", name)
} }
u.cfg.Log("preparing upgrade for %s", name) u.cfg.Log("preparing upgrade for %s", name)
currentRelease, upgradedRelease, err := u.prepareUpgrade(name, chart) currentRelease, upgradedRelease, err := u.prepareUpgrade(name, chart)
@ -197,6 +199,15 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea
return upgradedRelease, nil return upgradedRelease, nil
} }
current, err := u.cfg.KubeClient.Build(bytes.NewBufferString(originalRelease.Manifest))
if err != nil {
return upgradedRelease, errors.Wrap(err, "unable to build kubernetes objects from current release manifest")
}
target, err := u.cfg.KubeClient.Build(bytes.NewBufferString(upgradedRelease.Manifest))
if err != nil {
return upgradedRelease, errors.Wrap(err, "unable to build kubernetes objects from new release manifest")
}
// pre-upgrade hooks // pre-upgrade hooks
if !u.DisableHooks { if !u.DisableHooks {
if err := u.cfg.execHook(upgradedRelease, release.HookPreUpgrade, u.Timeout); err != nil { if err := u.cfg.execHook(upgradedRelease, release.HookPreUpgrade, u.Timeout); err != nil {
@ -205,14 +216,25 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea
} else { } else {
u.cfg.Log("upgrade hooks disabled for %s", upgradedRelease.Name) u.cfg.Log("upgrade hooks disabled for %s", upgradedRelease.Name)
} }
if err := u.upgradeRelease(originalRelease, upgradedRelease); err != nil {
results, err := u.cfg.KubeClient.Update(current, target, u.Force)
if err != nil {
u.cfg.recordRelease(originalRelease) u.cfg.recordRelease(originalRelease)
return u.failRelease(upgradedRelease, err) return u.failRelease(upgradedRelease, err)
} }
if u.Recreate {
// NOTE: Because this is not critical for a release to succeed, we just
// log if an error occurs and continue onward. If we ever introduce log
// levels, we should make these error level logs so users are notified
// that they'll need to go do the cleanup on their own
if err := recreate(u.cfg, results.Updated); err != nil {
u.cfg.Log(err.Error())
}
}
if u.Wait { if u.Wait {
buf := bytes.NewBufferString(upgradedRelease.Manifest) if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil {
if err := u.cfg.KubeClient.Wait(buf, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease) u.cfg.recordRelease(originalRelease)
return u.failRelease(upgradedRelease, err) return u.failRelease(upgradedRelease, err)
} }
@ -280,14 +302,6 @@ func (u *Upgrade) failRelease(rel *release.Release, err error) (*release.Release
return rel, err return rel, err
} }
// upgradeRelease performs an upgrade from current to target release
func (u *Upgrade) upgradeRelease(current, target *release.Release) error {
cm := bytes.NewBufferString(current.Manifest)
tm := bytes.NewBufferString(target.Manifest)
// TODO add wait
return u.cfg.KubeClient.Update(cm, tm, u.Force, u.Recreate)
}
// reuseValues copies values from the current release to a new release if the // reuseValues copies values from the current release to a new release if the
// new release does not have any values. // new release does not have any values.
// //
@ -331,3 +345,39 @@ func validateManifest(c kube.Interface, manifest []byte) error {
_, err := c.Build(bytes.NewReader(manifest)) _, err := c.Build(bytes.NewReader(manifest))
return err return err
} }
// recreate captures all the logic for recreating pods for both upgrade and
// rollback. If we end up refactoring rollback to use upgrade, this can just be
// made an unexported method on the upgrade action.
func recreate(cfg *Configuration, resources kube.ResourceList) error {
for _, res := range resources {
versioned := kube.AsVersioned(res)
selector, err := kube.SelectorsForObject(versioned)
if err != nil {
// If no selector is returned, it means this object is
// definitely not a pod, so continue onward
continue
}
client, err := cfg.KubernetesClientSet()
if err != nil {
return errors.Wrapf(err, "unable to recreate pods for object %s/%s because an error occurred", res.Namespace, res.Name)
}
pods, err := client.CoreV1().Pods(res.Namespace).List(metav1.ListOptions{
LabelSelector: selector.String(),
})
if err != nil {
return errors.Wrapf(err, "unable to recreate pods for object %s/%s because an error occurred", res.Namespace, res.Name)
}
// Restart pods
for _, pod := range pods.Items {
// Delete each pod for get them restarted with changed spec.
if err := client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, metav1.NewPreconditionDeleteOptions(string(pod.UID))); err != nil {
return errors.Wrapf(err, "unable to recreate pods for object %s/%s because an error occurred", res.Namespace, res.Name)
}
}
}
return nil
}

@ -38,8 +38,6 @@ import (
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/resource" "k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
watchtools "k8s.io/client-go/tools/watch" watchtools "k8s.io/client-go/tools/watch"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
) )
@ -64,32 +62,20 @@ func New(getter genericclioptions.RESTClientGetter) *Client {
} }
} }
// KubernetesClientSet returns a client set from the client factory.
func (c *Client) KubernetesClientSet() (*kubernetes.Clientset, error) {
return c.Factory.KubernetesClientSet()
}
var nopLogger = func(_ string, _ ...interface{}) {} var nopLogger = func(_ string, _ ...interface{}) {}
// Create creates Kubernetes resources from an io.reader. // Create creates Kubernetes resources specified in the resource list.
// func (c *Client) Create(resources ResourceList) (*Result, error) {
// Namespace will set the namespace. c.Log("creating %d resource(s)", len(resources))
func (c *Client) Create(reader io.Reader) error { if err := perform(resources, createResource); err != nil {
c.Log("building resources from manifest") return nil, err
infos, err := c.BuildUnstructured(reader)
if err != nil {
return err
} }
c.Log("creating %d resource(s)", len(infos)) return &Result{Created: resources}, nil
return perform(infos, createResource)
} }
func (c *Client) Wait(reader io.Reader, timeout time.Duration) error { // Wait up to the given timeout for the specified resources to be ready
infos, err := c.BuildUnstructured(reader) func (c *Client) Wait(resources ResourceList, timeout time.Duration) error {
if err != nil { cs, err := c.Factory.KubernetesClientSet()
return err
}
cs, err := c.KubernetesClientSet()
if err != nil { if err != nil {
return err return err
} }
@ -98,7 +84,7 @@ func (c *Client) Wait(reader io.Reader, timeout time.Duration) error {
log: c.Log, log: c.Log,
timeout: timeout, timeout: timeout,
} }
return w.waitForResources(infos) return w.waitForResources(resources)
} }
func (c *Client) namespace() string { func (c *Client) namespace() string {
@ -117,16 +103,8 @@ func (c *Client) newBuilder() *resource.Builder {
Flatten() Flatten()
} }
func (c *Client) validator() resource.ContentValidator { // Build validates for Kubernetes objects and returns unstructured infos.
schema, err := c.Factory.Validator(true) func (c *Client) Build(reader io.Reader) (ResourceList, error) {
if err != nil {
c.Log("warning: failed to load schema: %s", err)
}
return schema
}
// BuildUnstructured validates for Kubernetes objects and returns unstructured infos.
func (c *Client) BuildUnstructured(reader io.Reader) (Result, error) {
result, err := c.newBuilder(). result, err := c.newBuilder().
Unstructured(). Unstructured().
Stream(reader, ""). Stream(reader, "").
@ -134,39 +112,16 @@ func (c *Client) BuildUnstructured(reader io.Reader) (Result, error) {
return result, scrubValidationError(err) return result, scrubValidationError(err)
} }
// Build validates for Kubernetes objects and returns resource Infos from a io.Reader.
func (c *Client) Build(reader io.Reader) (Result, error) {
result, err := c.newBuilder().
WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
Schema(c.validator()).
Stream(reader, "").
Do().
Infos()
return result, scrubValidationError(err)
}
// Update reads in the current configuration and a target configuration from io.reader // Update reads in the current configuration and a target configuration from io.reader
// and creates resources that don't already exists, updates resources that have been modified // and creates resources that don't already exists, updates resources that have been modified
// in the target configuration and deletes resources from the current configuration that are // in the target configuration and deletes resources from the current configuration that are
// not present in the target configuration. // not present in the target configuration.
// func (c *Client) Update(original, target ResourceList, force bool) (*Result, error) {
// Namespace will set the namespaces.
func (c *Client) Update(originalReader, targetReader io.Reader, force, recreate bool) error {
original, err := c.BuildUnstructured(originalReader)
if err != nil {
return errors.Wrap(err, "failed decoding reader into objects")
}
c.Log("building resources from updated manifest")
target, err := c.BuildUnstructured(targetReader)
if err != nil {
return errors.Wrap(err, "failed decoding reader into objects")
}
updateErrors := []string{} updateErrors := []string{}
res := &Result{}
c.Log("checking %d resources for changes", len(target)) c.Log("checking %d resources for changes", len(target))
err = target.Visit(func(info *resource.Info, err error) error { err := target.Visit(func(info *resource.Info, err error) error {
if err != nil { if err != nil {
return err return err
} }
@ -182,6 +137,9 @@ func (c *Client) Update(originalReader, targetReader io.Reader, force, recreate
return errors.Wrap(err, "failed to create resource") return errors.Wrap(err, "failed to create resource")
} }
// Append the created resource to the results
res.Created = append(res.Created, info)
kind := info.Mapping.GroupVersionKind.Kind kind := info.Mapping.GroupVersionKind.Kind
c.Log("Created a new %s called %q\n", kind, info.Name) c.Log("Created a new %s called %q\n", kind, info.Name)
return nil return nil
@ -193,43 +151,64 @@ func (c *Client) Update(originalReader, targetReader io.Reader, force, recreate
return errors.Errorf("no %s with the name %q found", kind, info.Name) return errors.Errorf("no %s with the name %q found", kind, info.Name)
} }
if err := updateResource(c, info, originalInfo.Object, force, recreate); err != nil { if err := updateResource(c, info, originalInfo.Object, force); err != nil {
c.Log("error updating the resource %q:\n\t %v", info.Name, err) c.Log("error updating the resource %q:\n\t %v", info.Name, err)
updateErrors = append(updateErrors, err.Error()) updateErrors = append(updateErrors, err.Error())
} }
// Because we check for errors later, append the info regardless
res.Updated = append(res.Updated, info)
return nil return nil
}) })
switch { switch {
case err != nil: case err != nil:
return err return nil, err
case len(updateErrors) != 0: case len(updateErrors) != 0:
return errors.Errorf(strings.Join(updateErrors, " && ")) return nil, errors.Errorf(strings.Join(updateErrors, " && "))
} }
for _, info := range original.Difference(target) { for _, info := range original.Difference(target) {
c.Log("Deleting %q in %s...", info.Name, info.Namespace) c.Log("Deleting %q in %s...", info.Name, info.Namespace)
if err := deleteResource(info); err != nil { if err := deleteResource(info); err != nil {
c.Log("Failed to delete %q, err: %s", info.Name, err) c.Log("Failed to delete %q, err: %s", info.Name, err)
} else {
// Only append ones we succeeded in deleting
res.Deleted = append(res.Deleted, info)
} }
} }
return nil return res, nil
} }
// Delete deletes Kubernetes resources from an io.reader. // Delete deletes Kubernetes resources specified in the resources list. It will
// // attempt to delete all resources even if one or more fail and collect any
// Namespace will set the namespace. // errors. All successfully deleted items will be returned in the `Deleted`
func (c *Client) Delete(reader io.Reader) error { // ResourceList that is part of the result.
infos, err := c.BuildUnstructured(reader) func (c *Client) Delete(resources ResourceList) (*Result, []error) {
if err != nil { var errs []error
return err res := &Result{}
} err := perform(resources, func(info *resource.Info) error {
return perform(infos, func(info *resource.Info) error {
c.Log("Starting delete for %q %s", info.Name, info.Mapping.GroupVersionKind.Kind) c.Log("Starting delete for %q %s", info.Name, info.Mapping.GroupVersionKind.Kind)
err := deleteResource(info) if err := c.skipIfNotFound(deleteResource(info)); err != nil {
return c.skipIfNotFound(err) // Collect the error and continue on
errs = append(errs, err)
} else {
res.Deleted = append(res.Deleted, info)
}
return nil
}) })
if err != nil {
// Rewrite the message from "no objects visited" if that is what we got
// back
if err == ErrNoObjectsVisited {
err = errors.New("object not found, skipping delete")
}
errs = append(errs, err)
}
if errs != nil {
return nil, errs
}
return res, nil
} }
func (c *Client) skipIfNotFound(err error) error { func (c *Client) skipIfNotFound(err error) error {
@ -246,7 +225,7 @@ func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error {
} }
} }
// WatchUntilReady watches the resource given in the reader, and waits until it is ready. // WatchUntilReady watches the resources given and waits until it is ready.
// //
// This function is mainly for hook implementations. It watches for a resource to // This function is mainly for hook implementations. It watches for a resource to
// hit a particular milestone. The milestone depends on the Kind. // hit a particular milestone. The milestone depends on the Kind.
@ -256,21 +235,15 @@ func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error {
// //
// - Jobs: A job is marked "Ready" when it has successfully completed. This is // - Jobs: A job is marked "Ready" when it has successfully completed. This is
// ascertained by watching the Status fields in a job's output. // ascertained by watching the Status fields in a job's output.
// - Pods: A pod is marked "Ready" when it has successfully completed. This is
// ascertained by watching the status.phase field in a pod's output.
// //
// Handling for other kinds will be added as necessary. // Handling for other kinds will be added as necessary.
func (c *Client) WatchUntilReady(reader io.Reader, timeout time.Duration) error { func (c *Client) WatchUntilReady(resources ResourceList, timeout time.Duration) error {
infos, err := c.Build(reader)
if err != nil {
return err
}
// For jobs, there's also the option to do poll c.Jobs(namespace).Get(): // For jobs, there's also the option to do poll c.Jobs(namespace).Get():
// https://github.com/adamreese/kubernetes/blob/master/test/e2e/job.go#L291-L300 // https://github.com/adamreese/kubernetes/blob/master/test/e2e/job.go#L291-L300
return perform(infos, c.watchTimeout(timeout)) return perform(resources, c.watchTimeout(timeout))
} }
func perform(infos Result, fn func(*resource.Info) error) error { func perform(infos ResourceList, fn func(*resource.Info) error) error {
if len(infos) == 0 { if len(infos) == 0 {
return ErrNoObjectsVisited return ErrNoObjectsVisited
} }
@ -317,7 +290,7 @@ func createPatch(target *resource.Info, current runtime.Object) ([]byte, types.P
} }
// Get a versioned object // Get a versioned object
versionedObject := asVersioned(target) versionedObject := AsVersioned(target)
// Unstructured objects, such as CRDs, may not have an not registered error // Unstructured objects, such as CRDs, may not have an not registered error
// returned from ConvertToVersion. Anything that's unstructured should // returned from ConvertToVersion. Anything that's unstructured should
@ -332,7 +305,7 @@ func createPatch(target *resource.Info, current runtime.Object) ([]byte, types.P
return patch, types.StrategicMergePatchType, err return patch, types.StrategicMergePatchType, err
} }
func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, force, recreate bool) error { func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, force bool) error {
patch, patchType, err := createPatch(target, currentObj) patch, patchType, err := createPatch(target, currentObj)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to create patch") return errors.Wrap(err, "failed to create patch")
@ -379,37 +352,6 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object,
} }
} }
if !recreate {
return nil
}
versioned := asVersioned(target)
selector, err := selectorsForObject(versioned)
if err != nil {
return nil
}
client, err := c.KubernetesClientSet()
if err != nil {
return err
}
pods, err := client.CoreV1().Pods(target.Namespace).List(metav1.ListOptions{
LabelSelector: selector.String(),
})
if err != nil {
return err
}
// Restart pods
for _, pod := range pods.Items {
c.Log("Restarting pod: %v/%v", pod.Namespace, pod.Name)
// Delete each pod for get them restarted with changed spec.
if err := client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, metav1.NewPreconditionDeleteOptions(string(pod.UID))); err != nil {
return err
}
}
return nil return nil
} }
@ -431,6 +373,9 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel() defer cancel()
_, err = watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) { _, err = watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) {
// Make sure the incoming object is versioned as we use unstructured
// objects when we build manifests
obj := convertWithMapper(e.Object, info.Mapping)
switch e.Type { switch e.Type {
case watch.Added, watch.Modified: case watch.Added, watch.Modified:
// For things like a secret or a config map, this is the best indicator // For things like a secret or a config map, this is the best indicator
@ -438,11 +383,8 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err
// the status go into a good state. For other types, like ReplicaSet // the status go into a good state. For other types, like ReplicaSet
// we don't really do anything to support these as hooks. // we don't really do anything to support these as hooks.
c.Log("Add/Modify event for %s: %v", info.Name, e.Type) c.Log("Add/Modify event for %s: %v", info.Name, e.Type)
switch kind { if kind == "Job" {
case "Job": return c.waitForJob(obj, info.Name)
return c.waitForJob(e, info.Name)
case "Pod":
return c.waitForPodSuccess(e, info.Name)
} }
return true, nil return true, nil
case watch.Deleted: case watch.Deleted:
@ -462,10 +404,10 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err
// waitForJob is a helper that waits for a job to complete. // waitForJob is a helper that waits for a job to complete.
// //
// This operates on an event returned from a watcher. // This operates on an event returned from a watcher.
func (c *Client) waitForJob(e watch.Event, name string) (bool, error) { func (c *Client) waitForJob(obj runtime.Object, name string) (bool, error) {
o, ok := e.Object.(*batch.Job) o, ok := obj.(*batch.Job)
if !ok { if !ok {
return true, errors.Errorf("expected %s to be a *batch.Job, got %T", name, e.Object) return true, errors.Errorf("expected %s to be a *batch.Job, got %T", name, obj)
} }
for _, c := range o.Status.Conditions { for _, c := range o.Status.Conditions {
@ -480,30 +422,6 @@ func (c *Client) waitForJob(e watch.Event, name string) (bool, error) {
return false, nil return false, nil
} }
// waitForPodSuccess is a helper that waits for a pod to complete.
//
// This operates on an event returned from a watcher.
func (c *Client) waitForPodSuccess(e watch.Event, name string) (bool, error) {
o, ok := e.Object.(*v1.Pod)
if !ok {
return true, errors.Errorf("expected %s to be a *v1.Pod, got %T", name, e.Object)
}
switch o.Status.Phase {
case v1.PodSucceeded:
fmt.Printf("Pod %s succeeded\n", o.Name)
return true, nil
case v1.PodFailed:
return true, errors.Errorf("pod %s failed", o.Name)
case v1.PodPending:
fmt.Printf("Pod %s pending\n", o.Name)
case v1.PodRunning:
fmt.Printf("Pod %s running\n", o.Name)
}
return false, nil
}
// scrubValidationError removes kubectl info from the message. // scrubValidationError removes kubectl info from the message.
func scrubValidationError(err error) error { func scrubValidationError(err error) error {
if err == nil { if err == nil {
@ -516,3 +434,29 @@ func scrubValidationError(err error) error {
} }
return err return err
} }
// WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase
// and returns said phase (PodSucceeded or PodFailed qualify).
func (c *Client) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) {
client, _ := c.Factory.KubernetesClientSet()
to := int64(timeout)
watcher, err := client.CoreV1().Pods(c.namespace()).Watch(metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
TimeoutSeconds: &to,
})
for event := range watcher.ResultChan() {
p, ok := event.Object.(*v1.Pod)
if !ok {
return v1.PodUnknown, fmt.Errorf("%s not a pod", name)
}
switch p.Status.Phase {
case v1.PodFailed:
return v1.PodFailed, nil
case v1.PodSucceeded:
return v1.PodSucceeded, nil
}
}
return v1.PodUnknown, err
}

@ -142,7 +142,16 @@ func TestUpdate(t *testing.T) {
} }
}), }),
} }
if err := c.Update(objBody(&listA), objBody(&listB), false, false); err != nil { first, err := c.Build(objBody(&listA))
if err != nil {
t.Fatal(err)
}
second, err := c.Build(objBody(&listB))
if err != nil {
t.Fatal(err)
}
if _, err := c.Update(first, second, false); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// TODO: Find a way to test methods that use Client Set // TODO: Find a way to test methods that use Client Set
@ -188,11 +197,6 @@ func TestBuild(t *testing.T) {
namespace: "test", namespace: "test",
reader: strings.NewReader(guestbookManifest), reader: strings.NewReader(guestbookManifest),
count: 6, count: 6,
}, {
name: "Invalid schema",
namespace: "test",
reader: strings.NewReader(testInvalidServiceManifest),
err: true,
}, { }, {
name: "Valid input, deploying resources into different namespaces", name: "Valid input, deploying resources into different namespaces",
namespace: "test", namespace: "test",
@ -272,24 +276,41 @@ func TestPerform(t *testing.T) {
func TestReal(t *testing.T) { func TestReal(t *testing.T) {
t.Skip("This is a live test, comment this line to run") t.Skip("This is a live test, comment this line to run")
c := New(nil) c := New(nil)
if err := c.Create(strings.NewReader(guestbookManifest)); err != nil { resources, err := c.Build(strings.NewReader(guestbookManifest))
if err != nil {
t.Fatal(err)
}
if _, err := c.Create(resources); err != nil {
t.Fatal(err) t.Fatal(err)
} }
testSvcEndpointManifest := testServiceManifest + "\n---\n" + testEndpointManifest testSvcEndpointManifest := testServiceManifest + "\n---\n" + testEndpointManifest
c = New(nil) c = New(nil)
if err := c.Create(strings.NewReader(testSvcEndpointManifest)); err != nil { resources, err = c.Build(strings.NewReader(testSvcEndpointManifest))
if err != nil {
t.Fatal(err)
}
if _, err := c.Create(resources); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Delete(strings.NewReader(testEndpointManifest)); err != nil { resources, err = c.Build(strings.NewReader(testEndpointManifest))
if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// ensures that delete does not fail if a resource is not found if _, errs := c.Delete(resources); errs != nil {
if err := c.Delete(strings.NewReader(testSvcEndpointManifest)); err != nil { t.Fatal(errs)
}
resources, err = c.Build(strings.NewReader(testSvcEndpointManifest))
if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// ensures that delete does not fail if a resource is not found
if _, errs := c.Delete(resources); errs != nil {
t.Fatal(errs)
}
} }
const testServiceManifest = ` const testServiceManifest = `
@ -306,14 +327,6 @@ spec:
targetPort: 9376 targetPort: 9376
` `
const testInvalidServiceManifest = `
kind: Service
apiVersion: v1
spec:
ports:
- port: "80"
`
const testEndpointManifest = ` const testEndpointManifest = `
kind: Endpoints kind: Endpoints
apiVersion: v1 apiVersion: v1

@ -17,19 +17,28 @@ limitations under the License.
package kube // import "helm.sh/helm/pkg/kube" package kube // import "helm.sh/helm/pkg/kube"
import ( import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/cli-runtime/pkg/resource" "k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
) )
func asVersioned(info *resource.Info) runtime.Object { // AsVersioned converts the given info into a runtime.Object with the correct
gv := runtime.GroupVersioner(schema.GroupVersions(scheme.Scheme.PrioritizedVersionsAllGroups())) // group and version set
if info.Mapping != nil { func AsVersioned(info *resource.Info) runtime.Object {
gv = info.Mapping.GroupVersionKind.GroupVersion() return convertWithMapper(info.Object, info.Mapping)
}
// convertWithMapper converts the given object with the optional provided
// RESTMapping. If no mapping is provided, the default schema versioner is used
func convertWithMapper(obj runtime.Object, mapping *meta.RESTMapping) runtime.Object {
var gv = runtime.GroupVersioner(schema.GroupVersions(scheme.Scheme.PrioritizedVersionsAllGroups()))
if mapping != nil {
gv = mapping.GroupVersionKind.GroupVersion()
} }
if obj, err := runtime.ObjectConvertor(scheme.Scheme).ConvertToVersion(info.Object, gv); err == nil { if obj, err := runtime.ObjectConvertor(scheme.Scheme).ConvertToVersion(obj, gv); err == nil {
return obj return obj
} }
return info.Object return obj
} }

@ -21,6 +21,7 @@ import (
"io" "io"
"time" "time"
v1 "k8s.io/api/core/v1"
"k8s.io/cli-runtime/pkg/resource" "k8s.io/cli-runtime/pkg/resource"
"helm.sh/helm/pkg/kube" "helm.sh/helm/pkg/kube"
@ -31,76 +32,68 @@ import (
// delegates all its calls to `PrintingKubeClient` // delegates all its calls to `PrintingKubeClient`
type FailingKubeClient struct { type FailingKubeClient struct {
PrintingKubeClient PrintingKubeClient
DeleteError error CreateError error
WatchUntilReadyError error WaitError error
UpdateError error DeleteError error
BuildError error WatchUntilReadyError error
BuildUnstructuredError error UpdateError error
CreateError error BuildError error
WaitError error BuildUnstructuredError error
GetError error WaitAndGetCompletedPodPhaseError error
} }
// Create returns the configured error if set or prints // Create returns the configured error if set or prints
func (f *FailingKubeClient) Create(r io.Reader) error { func (f *FailingKubeClient) Create(resources kube.ResourceList) (*kube.Result, error) {
if f.CreateError != nil { if f.CreateError != nil {
return f.CreateError return nil, f.CreateError
} }
return f.PrintingKubeClient.Create(r) return f.PrintingKubeClient.Create(resources)
} }
// Wait returns the configured error if set or prints // Wait returns the configured error if set or prints
func (f *FailingKubeClient) Wait(r io.Reader, d time.Duration) error { func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration) error {
if f.WaitError != nil { if f.WaitError != nil {
return f.WaitError return f.WaitError
} }
return f.PrintingKubeClient.Wait(r, d) return f.PrintingKubeClient.Wait(resources, d)
}
// Create returns the configured error if set or prints
func (f *FailingKubeClient) Get(r io.Reader) (string, error) {
if f.GetError != nil {
return "", f.GetError
}
return f.PrintingKubeClient.Get(r)
} }
// Delete returns the configured error if set or prints // Delete returns the configured error if set or prints
func (f *FailingKubeClient) Delete(r io.Reader) error { func (f *FailingKubeClient) Delete(resources kube.ResourceList) (*kube.Result, []error) {
if f.DeleteError != nil { if f.DeleteError != nil {
return f.DeleteError return nil, []error{f.DeleteError}
} }
return f.PrintingKubeClient.Delete(r) return f.PrintingKubeClient.Delete(resources)
} }
// WatchUntilReady returns the configured error if set or prints // WatchUntilReady returns the configured error if set or prints
func (f *FailingKubeClient) WatchUntilReady(r io.Reader, d time.Duration) error { func (f *FailingKubeClient) WatchUntilReady(resources kube.ResourceList, d time.Duration) error {
if f.WatchUntilReadyError != nil { if f.WatchUntilReadyError != nil {
return f.WatchUntilReadyError return f.WatchUntilReadyError
} }
return f.PrintingKubeClient.WatchUntilReady(r, d) return f.PrintingKubeClient.WatchUntilReady(resources, d)
} }
// Update returns the configured error if set or prints // Update returns the configured error if set or prints
func (f *FailingKubeClient) Update(r, modifiedReader io.Reader, not, needed bool) error { func (f *FailingKubeClient) Update(r, modified kube.ResourceList, ignoreMe bool) (*kube.Result, error) {
if f.UpdateError != nil { if f.UpdateError != nil {
return f.UpdateError return nil, f.UpdateError
} }
return f.PrintingKubeClient.Update(r, modifiedReader, not, needed) return f.PrintingKubeClient.Update(r, modified, ignoreMe)
} }
// Build returns the configured error if set or prints // Build returns the configured error if set or prints
func (f *FailingKubeClient) Build(r io.Reader) (kube.Result, error) { func (f *FailingKubeClient) Build(r io.Reader) (kube.ResourceList, error) {
if f.BuildError != nil { if f.BuildError != nil {
return []*resource.Info{}, f.BuildError return []*resource.Info{}, f.BuildError
} }
return f.PrintingKubeClient.Build(r) return f.PrintingKubeClient.Build(r)
} }
// BuildUnstructured returns the configured error if set or prints // WaitAndGetCompletedPodPhase returns the configured error if set or prints
func (f *FailingKubeClient) BuildUnstructured(r io.Reader) (kube.Result, error) { func (f *FailingKubeClient) WaitAndGetCompletedPodPhase(s string, d time.Duration) (v1.PodPhase, error) {
if f.BuildUnstructuredError != nil { if f.WaitAndGetCompletedPodPhaseError != nil {
return []*resource.Info{}, f.BuildUnstructuredError return v1.PodSucceeded, f.WaitAndGetCompletedPodPhaseError
} }
return f.PrintingKubeClient.Build(r) return f.PrintingKubeClient.WaitAndGetCompletedPodPhase(s, d)
} }

@ -18,8 +18,10 @@ package fake
import ( import (
"io" "io"
"strings"
"time" "time"
v1 "k8s.io/api/core/v1"
"k8s.io/cli-runtime/pkg/resource" "k8s.io/cli-runtime/pkg/resource"
"helm.sh/helm/pkg/kube" "helm.sh/helm/pkg/kube"
@ -32,47 +34,62 @@ type PrintingKubeClient struct {
} }
// Create prints the values of what would be created with a real KubeClient. // Create prints the values of what would be created with a real KubeClient.
func (p *PrintingKubeClient) Create(r io.Reader) error { func (p *PrintingKubeClient) Create(resources kube.ResourceList) (*kube.Result, error) {
_, err := io.Copy(p.Out, r) _, err := io.Copy(p.Out, bufferize(resources))
return err if err != nil {
return nil, err
}
return &kube.Result{Created: resources}, nil
} }
func (p *PrintingKubeClient) Wait(r io.Reader, _ time.Duration) error { func (p *PrintingKubeClient) Wait(resources kube.ResourceList, _ time.Duration) error {
_, err := io.Copy(p.Out, r) _, err := io.Copy(p.Out, bufferize(resources))
return err return err
} }
// Get prints the values of what would be created with a real KubeClient.
func (p *PrintingKubeClient) Get(r io.Reader) (string, error) {
_, err := io.Copy(p.Out, r)
return "", err
}
// Delete implements KubeClient delete. // Delete implements KubeClient delete.
// //
// It only prints out the content to be deleted. // It only prints out the content to be deleted.
func (p *PrintingKubeClient) Delete(r io.Reader) error { func (p *PrintingKubeClient) Delete(resources kube.ResourceList) (*kube.Result, []error) {
_, err := io.Copy(p.Out, r) _, err := io.Copy(p.Out, bufferize(resources))
return err if err != nil {
return nil, []error{err}
}
return &kube.Result{Deleted: resources}, nil
} }
// WatchUntilReady implements KubeClient WatchUntilReady. // WatchUntilReady implements KubeClient WatchUntilReady.
func (p *PrintingKubeClient) WatchUntilReady(r io.Reader, _ time.Duration) error { func (p *PrintingKubeClient) WatchUntilReady(resources kube.ResourceList, _ time.Duration) error {
_, err := io.Copy(p.Out, r) _, err := io.Copy(p.Out, bufferize(resources))
return err return err
} }
// Update implements KubeClient Update. // Update implements KubeClient Update.
func (p *PrintingKubeClient) Update(_, modifiedReader io.Reader, _, _ bool) error { func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ bool) (*kube.Result, error) {
_, err := io.Copy(p.Out, modifiedReader) _, err := io.Copy(p.Out, bufferize(modified))
return err if err != nil {
return nil, err
}
// TODO: This doesn't completely mock out have some that get created,
// updated, and deleted. I don't think these are used in any unit tests, but
// we may want to refactor a way to handle future tests
return &kube.Result{Updated: modified}, nil
} }
// Build implements KubeClient Build. // Build implements KubeClient Build.
func (p *PrintingKubeClient) Build(_ io.Reader) (kube.Result, error) { func (p *PrintingKubeClient) Build(_ io.Reader) (kube.ResourceList, error) {
return []*resource.Info{}, nil return []*resource.Info{}, nil
} }
func (p *PrintingKubeClient) BuildUnstructured(_ io.Reader) (kube.Result, error) { // WaitAndGetCompletedPodPhase implements KubeClient WaitAndGetCompletedPodPhase.
return p.Build(nil) func (p *PrintingKubeClient) WaitAndGetCompletedPodPhase(_ string, _ time.Duration) (v1.PodPhase, error) {
return v1.PodSucceeded, nil
}
func bufferize(resources kube.ResourceList) io.Reader {
var builder strings.Builder
for _, info := range resources {
builder.WriteString(info.String() + "\n")
}
return strings.NewReader(builder.String())
} }

@ -19,43 +19,42 @@ package kube
import ( import (
"io" "io"
"time" "time"
v1 "k8s.io/api/core/v1"
) )
// Interface represents a client capable of communicating with the Kubernetes API. // KubernetesClient represents a client capable of communicating with the Kubernetes API.
// //
// A KubernetesClient must be concurrency safe. // A KubernetesClient must be concurrency safe.
type Interface interface { type Interface interface {
// Create creates one or more resources. // Create creates one or more resources.
// Create(resources ResourceList) (*Result, error)
// reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n").
Create(reader io.Reader) error
Wait(r io.Reader, timeout time.Duration) error Wait(resources ResourceList, timeout time.Duration) error
// Delete destroys one or more resources. // Delete destroys one or more resources.
// Delete(resources ResourceList) (*Result, []error)
// reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n").
Delete(io.Reader) error
// Watch the resource in reader until it is "ready". // Watch the resource in reader until it is "ready". This method
// //
// For Jobs, "ready" means the Job ran to completion (exited without error). // For Jobs, "ready" means the job ran to completion (excited without error).
// For Pods, "ready" means the Pod phase is marked "succeeded".
// For all other kinds, it means the kind was created or modified without // For all other kinds, it means the kind was created or modified without
// error. // error.
WatchUntilReady(reader io.Reader, timeout time.Duration) error WatchUntilReady(resources ResourceList, timeout time.Duration) error
// Update updates one or more resources or creates the resource // Update updates one or more resources or creates the resource
// if it doesn't exist. // if it doesn't exist.
Update(original, target ResourceList, force bool) (*Result, error)
// Build creates a resource list from a Reader
// //
// reader must contain a YAML stream (one or more YAML documents separated // reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n"). // by "\n---\n")
Update(originalReader, modifiedReader io.Reader, force bool, recreate bool) error Build(reader io.Reader) (ResourceList, error)
Build(reader io.Reader) (Result, error) // WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase
BuildUnstructured(reader io.Reader) (Result, error) // and returns said phase (PodSucceeded or PodFailed qualify).
WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error)
} }
var _ Interface = (*Client)(nil) var _ Interface = (*Client)(nil)

@ -0,0 +1,85 @@
/*
Copyright The Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube // import "helm.sh/helm/pkg/kube"
import "k8s.io/cli-runtime/pkg/resource"
// ResourceList provides convenience methods for comparing collections of Infos.
type ResourceList []*resource.Info
// Append adds an Info to the Result.
func (r *ResourceList) Append(val *resource.Info) {
*r = append(*r, val)
}
// Visit implements resource.Visitor.
func (r ResourceList) Visit(fn resource.VisitorFunc) error {
for _, i := range r {
if err := fn(i, nil); err != nil {
return err
}
}
return nil
}
// Filter returns a new Result with Infos that satisfy the predicate fn.
func (r ResourceList) Filter(fn func(*resource.Info) bool) ResourceList {
var result ResourceList
for _, i := range r {
if fn(i) {
result.Append(i)
}
}
return result
}
// Get returns the Info from the result that matches the name and kind.
func (r ResourceList) Get(info *resource.Info) *resource.Info {
for _, i := range r {
if isMatchingInfo(i, info) {
return i
}
}
return nil
}
// Contains checks to see if an object exists.
func (r ResourceList) Contains(info *resource.Info) bool {
for _, i := range r {
if isMatchingInfo(i, info) {
return true
}
}
return false
}
// Difference will return a new Result with objects not contained in rs.
func (r ResourceList) Difference(rs ResourceList) ResourceList {
return r.Filter(func(info *resource.Info) bool {
return !rs.Contains(info)
})
}
// Intersect will return a new Result with objects contained in both Results.
func (r ResourceList) Intersect(rs ResourceList) ResourceList {
return r.Filter(rs.Contains)
}
// isMatchingInfo returns true if infos match on Name and GroupVersionKind.
func isMatchingInfo(a, b *resource.Info) bool {
return a.Name == b.Name && a.Mapping.GroupVersionKind.Kind == b.Mapping.GroupVersionKind.Kind
}

@ -24,7 +24,7 @@ import (
"k8s.io/cli-runtime/pkg/resource" "k8s.io/cli-runtime/pkg/resource"
) )
func TestResult(t *testing.T) { func TestResourceList(t *testing.T) {
mapping := &meta.RESTMapping{ mapping := &meta.RESTMapping{
Resource: schema.GroupVersionResource{Group: "group", Version: "version", Resource: "pod"}, Resource: schema.GroupVersionResource{Group: "group", Version: "version", Resource: "pod"},
} }
@ -33,7 +33,7 @@ func TestResult(t *testing.T) {
return &resource.Info{Name: name, Mapping: mapping} return &resource.Info{Name: name, Mapping: mapping}
} }
var r1, r2 Result var r1, r2 ResourceList
r1 = []*resource.Info{info("foo"), info("bar")} r1 = []*resource.Info{info("foo"), info("bar")}
r2 = []*resource.Info{info("bar")} r2 = []*resource.Info{info("bar")}

@ -14,72 +14,15 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package kube // import "helm.sh/helm/pkg/kube" package kube
import "k8s.io/cli-runtime/pkg/resource" // Result contains the information of created, updated, and deleted resources
// for various kube API calls along with helper methods for using those
// Result provides convenience methods for comparing collections of Infos. // resources
type Result []*resource.Info type Result struct {
Created ResourceList
// Append adds an Info to the Result. Updated ResourceList
func (r *Result) Append(val *resource.Info) { Deleted ResourceList
*r = append(*r, val)
}
// Visit implements resource.Visitor.
func (r Result) Visit(fn resource.VisitorFunc) error {
for _, i := range r {
if err := fn(i, nil); err != nil {
return err
}
}
return nil
}
// Filter returns a new Result with Infos that satisfy the predicate fn.
func (r Result) Filter(fn func(*resource.Info) bool) Result {
var result Result
for _, i := range r {
if fn(i) {
result.Append(i)
}
}
return result
}
// Get returns the Info from the result that matches the name and kind.
func (r Result) Get(info *resource.Info) *resource.Info {
for _, i := range r {
if isMatchingInfo(i, info) {
return i
}
}
return nil
} }
// Contains checks to see if an object exists. // If needed, we can add methods to the Result type for things like diffing
func (r Result) Contains(info *resource.Info) bool {
for _, i := range r {
if isMatchingInfo(i, info) {
return true
}
}
return false
}
// Difference will return a new Result with objects not contained in rs.
func (r Result) Difference(rs Result) Result {
return r.Filter(func(info *resource.Info) bool {
return !rs.Contains(info)
})
}
// Intersect will return a new Result with objects contained in both Results.
func (r Result) Intersect(rs Result) Result {
return r.Filter(rs.Contains)
}
// isMatchingInfo returns true if infos match on Name and GroupVersionKind.
func isMatchingInfo(a, b *resource.Info) bool {
return a.Name == b.Name && a.Mapping.GroupVersionKind.Kind == b.Mapping.GroupVersionKind.Kind
}

@ -44,7 +44,7 @@ type waiter struct {
// waitForResources polls to get the current status of all pods, PVCs, and Services // waitForResources polls to get the current status of all pods, PVCs, and Services
// until all are ready or a timeout is reached // until all are ready or a timeout is reached
func (w *waiter) waitForResources(created Result) error { func (w *waiter) waitForResources(created ResourceList) error {
w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout)
return wait.Poll(2*time.Second, w.timeout, func() (bool, error) { return wait.Poll(2*time.Second, w.timeout, func() (bool, error) {
@ -56,7 +56,7 @@ func (w *waiter) waitForResources(created Result) error {
ok = true ok = true
err error err error
) )
switch value := asVersioned(v).(type) { switch value := AsVersioned(v).(type) {
case *corev1.Pod: case *corev1.Pod:
pod, err := w.c.CoreV1().Pods(value.Namespace).Get(value.Name, metav1.GetOptions{}) pod, err := w.c.CoreV1().Pods(value.Namespace).Get(value.Name, metav1.GetOptions{})
if err != nil || !w.isPodReady(pod) { if err != nil || !w.isPodReady(pod) {
@ -178,7 +178,7 @@ func (w *waiter) podsReadyForObject(namespace string, obj runtime.Object) (bool,
} }
func (w *waiter) podsforObject(namespace string, obj runtime.Object) ([]corev1.Pod, error) { func (w *waiter) podsforObject(namespace string, obj runtime.Object) ([]corev1.Pod, error) {
selector, err := selectorsForObject(obj) selector, err := SelectorsForObject(obj)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -300,10 +300,10 @@ func getPods(client kubernetes.Interface, namespace, selector string) ([]corev1.
return list.Items, err return list.Items, err
} }
// selectorsForObject returns the pod label selector for a given object // SelectorsForObject returns the pod label selector for a given object
// //
// Modified version of https://github.com/kubernetes/kubernetes/blob/v1.14.1/pkg/kubectl/polymorphichelpers/helpers.go#L84 // Modified version of https://github.com/kubernetes/kubernetes/blob/v1.14.1/pkg/kubectl/polymorphichelpers/helpers.go#L84
func selectorsForObject(object runtime.Object) (selector labels.Selector, err error) { func SelectorsForObject(object runtime.Object) (selector labels.Selector, err error) {
switch t := object.(type) { switch t := object.(type) {
case *extensionsv1beta1.ReplicaSet: case *extensionsv1beta1.ReplicaSet:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)

Loading…
Cancel
Save