Merge pull request #6085 from thomastaylor312/ref/better_kube_client

ref(*): Refactors kube client to be a bit more friendly
pull/6124/head
Taylor Thomas 6 years ago committed by GitHub
commit 0d8b30f46d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

438
Gopkg.lock generated

File diff suppressed because it is too large Load Diff

@ -58,6 +58,18 @@
name = "github.com/stretchr/testify" name = "github.com/stretchr/testify"
version = "^1.3.0" version = "^1.3.0"
[[constraint]]
name = "github.com/xeipuuv/gojsonschema"
version = "1.1.0"
[[constraint]]
name = "github.com/spf13/cobra"
version = "0.0.4"
[[constraint]]
name = "sigs.k8s.io/yaml"
version = "1.1.0"
[[override]] [[override]]
name = "sigs.k8s.io/kustomize" name = "sigs.k8s.io/kustomize"
version = "2.0.3" version = "2.0.3"
@ -104,15 +116,3 @@
[prune] [prune]
go-tests = true go-tests = true
[[constraint]]
name = "github.com/xeipuuv/gojsonschema"
version = "1.1.0"
[[constraint]]
name = "github.com/spf13/cobra"
version = "0.0.4"
[[constraint]]
name = "sigs.k8s.io/yaml"
version = "1.1.0"

@ -82,7 +82,7 @@ func initActionConfig(actionConfig *action.Configuration, allNamespaces bool) {
kc := kube.New(kubeConfig()) kc := kube.New(kubeConfig())
kc.Log = logf kc.Log = logf
clientset, err := kc.KubernetesClientSet() clientset, err := kc.Factory.KubernetesClientSet()
if err != nil { if err != nil {
// TODO return error // TODO return error
log.Fatal(err) log.Fatal(err)

@ -17,16 +17,21 @@ limitations under the License.
package action package action
import ( import (
"bytes"
"path" "path"
"regexp" "regexp"
"sort"
"strings"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/discovery" "k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"helm.sh/helm/pkg/chartutil" "helm.sh/helm/pkg/chartutil"
"helm.sh/helm/pkg/hooks"
"helm.sh/helm/pkg/kube" "helm.sh/helm/pkg/kube"
"helm.sh/helm/pkg/registry" "helm.sh/helm/pkg/registry"
"helm.sh/helm/pkg/release" "helm.sh/helm/pkg/release"
@ -110,6 +115,15 @@ func (c *Configuration) getCapabilities() (*chartutil.Capabilities, error) {
return c.Capabilities, nil return c.Capabilities, nil
} }
func (c *Configuration) KubernetesClientSet() (kubernetes.Interface, error) {
conf, err := c.RESTClientGetter.ToRESTConfig()
if err != nil {
return nil, errors.Wrap(err, "unable to generate config for kubernetes client")
}
return kubernetes.NewForConfig(conf)
}
// Now generates a timestamp // Now generates a timestamp
// //
// If the configuration has a Timestamper on it, that will be used. // If the configuration has a Timestamper on it, that will be used.
@ -190,3 +204,73 @@ type RESTClientGetter interface {
ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error) ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error)
ToRESTMapper() (meta.RESTMapper, error) ToRESTMapper() (meta.RESTMapper, error)
} }
// execHooks is a method for exec-ing all hooks of the given type. This is to
// avoid duplicate code in various actions
func execHooks(client kube.Interface, hs []*release.Hook, hook string, timeout time.Duration) error {
executingHooks := []*release.Hook{}
for _, h := range hs {
for _, e := range h.Events {
if string(e) == hook {
executingHooks = append(executingHooks, h)
}
}
}
sort.Sort(hookByWeight(executingHooks))
for _, h := range executingHooks {
if err := deleteHookByPolicy(client, h, hooks.BeforeHookCreation); err != nil {
return err
}
resources, err := client.Build(bytes.NewBufferString(h.Manifest))
if err != nil {
return errors.Wrapf(err, "unable to build kubernetes object for %s hook %s", hook, h.Path)
}
if _, err := client.Create(resources); err != nil {
return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path)
}
if err := client.WatchUntilReady(resources, timeout); err != nil {
// If a hook is failed, checkout the annotation of the hook to determine whether the hook should be deleted
// under failed condition. If so, then clear the corresponding resource object in the hook
if err := deleteHookByPolicy(client, h, hooks.HookFailed); err != nil {
return err
}
return err
}
}
// If all hooks are succeeded, checkout the annotation of each hook to determine whether the hook should be deleted
// under succeeded condition. If so, then clear the corresponding resource object in each hook
for _, h := range executingHooks {
if err := deleteHookByPolicy(client, h, hooks.HookSucceeded); err != nil {
return err
}
h.LastRun = time.Now()
}
return nil
}
// deleteHookByPolicy deletes a hook if the hook policy instructs it to
func deleteHookByPolicy(client kube.Interface, h *release.Hook, policy string) error {
if hookHasDeletePolicy(h, policy) {
resources, err := client.Build(bytes.NewBufferString(h.Manifest))
if err != nil {
return errors.Wrapf(err, "unable to build kubernetes object for deleting hook %s", h.Path)
}
_, errs := client.Delete(resources)
return errors.New(joinErrors(errs))
}
return nil
}
func joinErrors(errs []error) string {
es := make([]string, 0, len(errs))
for _, e := range errs {
es = append(es, e.Error())
}
return strings.Join(es, "; ")
}

@ -19,13 +19,11 @@ package action
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"net/url" "net/url"
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
"sort"
"strings" "strings"
"text/template" "text/template"
"time" "time"
@ -170,8 +168,10 @@ func (i *Install) Run(chrt *chart.Chart) (*release.Release, error) {
// Mark this release as in-progress // Mark this release as in-progress
rel.SetStatus(release.StatusPendingInstall, "Initial install underway") rel.SetStatus(release.StatusPendingInstall, "Initial install underway")
if err := i.validateManifest(manifestDoc); err != nil {
return rel, err resources, err := i.cfg.KubeClient.Build(bytes.NewBufferString(rel.Manifest))
if err != nil {
return nil, errors.Wrap(err, "unable to build kubernetes objects from release manifest")
} }
// Bail out here if it is a dry run // Bail out here if it is a dry run
@ -198,7 +198,7 @@ func (i *Install) Run(chrt *chart.Chart) (*release.Release, error) {
// pre-install hooks // pre-install hooks
if !i.DisableHooks { if !i.DisableHooks {
if err := i.execHook(rel.Hooks, hooks.PreInstall); err != nil { if err := execHooks(i.cfg.KubeClient, rel.Hooks, hooks.PreInstall, i.Timeout); err != nil {
return i.failRelease(rel, fmt.Errorf("failed pre-install: %s", err)) return i.failRelease(rel, fmt.Errorf("failed pre-install: %s", err))
} }
} }
@ -206,21 +206,19 @@ func (i *Install) Run(chrt *chart.Chart) (*release.Release, error) {
// At this point, we can do the install. Note that before we were detecting whether to // At this point, we can do the install. Note that before we were detecting whether to
// do an update, but it's not clear whether we WANT to do an update if the re-use is set // do an update, but it's not clear whether we WANT to do an update if the re-use is set
// to true, since that is basically an upgrade operation. // to true, since that is basically an upgrade operation.
buf := bytes.NewBufferString(rel.Manifest) if _, err := i.cfg.KubeClient.Create(resources); err != nil {
if err := i.cfg.KubeClient.Create(buf); err != nil {
return i.failRelease(rel, err) return i.failRelease(rel, err)
} }
if i.Wait { if i.Wait {
buf := bytes.NewBufferString(rel.Manifest) if err := i.cfg.KubeClient.Wait(resources, i.Timeout); err != nil {
if err := i.cfg.KubeClient.Wait(buf, i.Timeout); err != nil {
return i.failRelease(rel, err) return i.failRelease(rel, err)
} }
} }
if !i.DisableHooks { if !i.DisableHooks {
if err := i.execHook(rel.Hooks, hooks.PostInstall); err != nil { if err := execHooks(i.cfg.KubeClient, rel.Hooks, hooks.PostInstall, i.Timeout); err != nil {
return i.failRelease(rel, fmt.Errorf("failed post-install: %s", err)) return i.failRelease(rel, fmt.Errorf("failed post-install: %s", err))
} }
} }
@ -459,60 +457,6 @@ func ensureDirectoryForFile(file string) error {
return os.MkdirAll(baseDir, defaultDirectoryPermission) return os.MkdirAll(baseDir, defaultDirectoryPermission)
} }
// validateManifest checks to see whether the given manifest is valid for the current Kubernetes
func (i *Install) validateManifest(manifest io.Reader) error {
_, err := i.cfg.KubeClient.BuildUnstructured(manifest)
return err
}
// execHook executes all of the hooks for the given hook event.
func (i *Install) execHook(hs []*release.Hook, hook string) error {
executingHooks := []*release.Hook{}
for _, h := range hs {
for _, e := range h.Events {
if string(e) == hook {
executingHooks = append(executingHooks, h)
}
}
}
sort.Sort(hookByWeight(executingHooks))
for _, h := range executingHooks {
if err := deleteHookByPolicy(i.cfg, h, hooks.BeforeHookCreation); err != nil {
return err
}
b := bytes.NewBufferString(h.Manifest)
if err := i.cfg.KubeClient.Create(b); err != nil {
return errors.Wrapf(err, "warning: Release %s %s %s failed", i.ReleaseName, hook, h.Path)
}
b.Reset()
b.WriteString(h.Manifest)
if err := i.cfg.KubeClient.WatchUntilReady(b, i.Timeout); err != nil {
// If a hook is failed, checkout the annotation of the hook to determine whether the hook should be deleted
// under failed condition. If so, then clear the corresponding resource object in the hook
if err := deleteHookByPolicy(i.cfg, h, hooks.HookFailed); err != nil {
return err
}
return err
}
}
// If all hooks are succeeded, checkout the annotation of each hook to determine whether the hook should be deleted
// under succeeded condition. If so, then clear the corresponding resource object in each hook
for _, h := range executingHooks {
if err := deleteHookByPolicy(i.cfg, h, hooks.HookSucceeded); err != nil {
return err
}
h.LastRun = time.Now()
}
return nil
}
// deletePolices represents a mapping between the key in the annotation for label deleting policy and its real meaning // deletePolices represents a mapping between the key in the annotation for label deleting policy and its real meaning
// FIXME: Can we refactor this out? // FIXME: Can we refactor this out?
var deletePolices = map[string]release.HookDeletePolicy{ var deletePolices = map[string]release.HookDeletePolicy{

@ -19,7 +19,6 @@ package action
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"sort"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -132,25 +131,32 @@ func (r *Rollback) prepareRollback(name string) (*release.Release, *release.Rele
} }
func (r *Rollback) performRollback(currentRelease, targetRelease *release.Release) (*release.Release, error) { func (r *Rollback) performRollback(currentRelease, targetRelease *release.Release) (*release.Release, error) {
if r.DryRun { if r.DryRun {
r.cfg.Log("dry run for %s", targetRelease.Name) r.cfg.Log("dry run for %s", targetRelease.Name)
return targetRelease, nil return targetRelease, nil
} }
current, err := r.cfg.KubeClient.Build(bytes.NewBufferString(currentRelease.Manifest))
if err != nil {
return targetRelease, errors.Wrap(err, "unable to build kubernetes objects from current release manifest")
}
target, err := r.cfg.KubeClient.Build(bytes.NewBufferString(targetRelease.Manifest))
if err != nil {
return targetRelease, errors.Wrap(err, "unable to build kubernetes objects from new release manifest")
}
// pre-rollback hooks // pre-rollback hooks
if !r.DisableHooks { if !r.DisableHooks {
if err := r.execHook(targetRelease.Hooks, hooks.PreRollback); err != nil { if err := execHooks(r.cfg.KubeClient, targetRelease.Hooks, hooks.PreRollback, r.Timeout); err != nil {
return targetRelease, err return targetRelease, err
} }
} else { } else {
r.cfg.Log("rollback hooks disabled for %s", targetRelease.Name) r.cfg.Log("rollback hooks disabled for %s", targetRelease.Name)
} }
cr := bytes.NewBufferString(currentRelease.Manifest) results, err := r.cfg.KubeClient.Update(current, target, r.Force)
tr := bytes.NewBufferString(targetRelease.Manifest)
if err := r.cfg.KubeClient.Update(cr, tr, r.Force, r.Recreate); err != nil { if err != nil {
msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err) msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err)
r.cfg.Log("warning: %s", msg) r.cfg.Log("warning: %s", msg)
currentRelease.Info.Status = release.StatusSuperseded currentRelease.Info.Status = release.StatusSuperseded
@ -161,9 +167,18 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
return targetRelease, err return targetRelease, err
} }
if r.Recreate {
// NOTE: Because this is not critical for a release to succeed, we just
// log if an error occurs and continue onward. If we ever introduce log
// levels, we should make these error level logs so users are notified
// that they'll need to go do the cleanup on their own
if err := recreate(r.cfg, results.Updated); err != nil {
r.cfg.Log(err.Error())
}
}
if r.Wait { if r.Wait {
buf := bytes.NewBufferString(targetRelease.Manifest) if err := r.cfg.KubeClient.Wait(target, r.Timeout); err != nil {
if err := r.cfg.KubeClient.Wait(buf, r.Timeout); err != nil {
targetRelease.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", targetRelease.Name, err.Error())) targetRelease.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", targetRelease.Name, err.Error()))
r.cfg.recordRelease(currentRelease) r.cfg.recordRelease(currentRelease)
r.cfg.recordRelease(targetRelease) r.cfg.recordRelease(targetRelease)
@ -173,7 +188,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
// post-rollback hooks // post-rollback hooks
if !r.DisableHooks { if !r.DisableHooks {
if err := r.execHook(targetRelease.Hooks, hooks.PostRollback); err != nil { if err := execHooks(r.cfg.KubeClient, targetRelease.Hooks, hooks.PostRollback, r.Timeout); err != nil {
return targetRelease, err return targetRelease, err
} }
} }
@ -193,61 +208,3 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
return targetRelease, nil return targetRelease, nil
} }
// execHook executes all of the hooks for the given hook event.
func (r *Rollback) execHook(hs []*release.Hook, hook string) error {
timeout := r.Timeout
executingHooks := []*release.Hook{}
for _, h := range hs {
for _, e := range h.Events {
if string(e) == hook {
executingHooks = append(executingHooks, h)
}
}
}
sort.Sort(hookByWeight(executingHooks))
for _, h := range executingHooks {
if err := deleteHookByPolicy(r.cfg, h, hooks.BeforeHookCreation); err != nil {
return err
}
b := bytes.NewBufferString(h.Manifest)
if err := r.cfg.KubeClient.Create(b); err != nil {
return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path)
}
b.Reset()
b.WriteString(h.Manifest)
if err := r.cfg.KubeClient.WatchUntilReady(b, timeout); err != nil {
// If a hook is failed, checkout the annotation of the hook to determine whether the hook should be deleted
// under failed condition. If so, then clear the corresponding resource object in the hook
if err := deleteHookByPolicy(r.cfg, h, hooks.HookFailed); err != nil {
return err
}
return err
}
}
// If all hooks are succeeded, checkout the annotation of each hook to determine whether the hook should be deleted
// under succeeded condition. If so, then clear the corresponding resource object in each hook
for _, h := range executingHooks {
if err := deleteHookByPolicy(r.cfg, h, hooks.HookSucceeded); err != nil {
return err
}
h.LastRun = time.Now()
}
return nil
}
// deleteHookByPolicy deletes a hook if the hook policy instructs it to
func deleteHookByPolicy(cfg *Configuration, h *release.Hook, policy string) error {
if hookHasDeletePolicy(h, policy) {
b := bytes.NewBufferString(h.Manifest)
return cfg.KubeClient.Delete(b)
}
return nil
}

@ -17,15 +17,12 @@ limitations under the License.
package action package action
import ( import (
"bytes"
"sort"
"strings" "strings"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
"helm.sh/helm/pkg/hooks" "helm.sh/helm/pkg/hooks"
"helm.sh/helm/pkg/kube"
"helm.sh/helm/pkg/release" "helm.sh/helm/pkg/release"
"helm.sh/helm/pkg/releaseutil" "helm.sh/helm/pkg/releaseutil"
) )
@ -94,7 +91,7 @@ func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error)
res := &release.UninstallReleaseResponse{Release: rel} res := &release.UninstallReleaseResponse{Release: rel}
if !u.DisableHooks { if !u.DisableHooks {
if err := u.execHook(rel.Hooks, hooks.PreDelete); err != nil { if err := execHooks(u.cfg.KubeClient, rel.Hooks, hooks.PreDelete, u.Timeout); err != nil {
return res, err return res, err
} }
} else { } else {
@ -111,7 +108,7 @@ func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error)
res.Info = kept res.Info = kept
if !u.DisableHooks { if !u.DisableHooks {
if err := u.execHook(rel.Hooks, hooks.PostDelete); err != nil { if err := execHooks(u.cfg.KubeClient, rel.Hooks, hooks.PostDelete, u.Timeout); err != nil {
errs = append(errs, err) errs = append(errs, err)
} }
} }
@ -153,64 +150,8 @@ func (u *Uninstall) purgeReleases(rels ...*release.Release) error {
return nil return nil
} }
func joinErrors(errs []error) string {
es := make([]string, 0, len(errs))
for _, e := range errs {
es = append(es, e.Error())
}
return strings.Join(es, "; ")
}
// execHook executes all of the hooks for the given hook event.
func (u *Uninstall) execHook(hs []*release.Hook, hook string) error {
executingHooks := []*release.Hook{}
for _, h := range hs {
for _, e := range h.Events {
if string(e) == hook {
executingHooks = append(executingHooks, h)
}
}
}
sort.Sort(hookByWeight(executingHooks))
for _, h := range executingHooks {
if err := deleteHookByPolicy(u.cfg, h, hooks.BeforeHookCreation); err != nil {
return err
}
b := bytes.NewBufferString(h.Manifest)
if err := u.cfg.KubeClient.Create(b); err != nil {
return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path)
}
b.Reset()
b.WriteString(h.Manifest)
if err := u.cfg.KubeClient.WatchUntilReady(b, u.Timeout); err != nil {
// If a hook is failed, checkout the annotation of the hook to determine whether the hook should be deleted
// under failed condition. If so, then clear the corresponding resource object in the hook
if err := deleteHookByPolicy(u.cfg, h, hooks.HookFailed); err != nil {
return err
}
return err
}
}
// If all hooks are succeeded, checkout the annotation of each hook to determine whether the hook should be deleted
// under succeeded condition. If so, then clear the corresponding resource object in each hook
for _, h := range executingHooks {
if err := deleteHookByPolicy(u.cfg, h, hooks.HookSucceeded); err != nil {
return err
}
h.LastRun = time.Now()
}
return nil
}
// deleteRelease deletes the release and returns manifests that were kept in the deletion process // deleteRelease deletes the release and returns manifests that were kept in the deletion process
func (u *Uninstall) deleteRelease(rel *release.Release) (kept string, errs []error) { func (u *Uninstall) deleteRelease(rel *release.Release) (string, []error) {
caps, err := u.cfg.getCapabilities() caps, err := u.cfg.getCapabilities()
if err != nil { if err != nil {
return rel.Manifest, []error{errors.Wrap(err, "could not get apiVersions from Kubernetes")} return rel.Manifest, []error{errors.Wrap(err, "could not get apiVersions from Kubernetes")}
@ -227,23 +168,20 @@ func (u *Uninstall) deleteRelease(rel *release.Release) (kept string, errs []err
} }
filesToKeep, filesToDelete := filterManifestsToKeep(files) filesToKeep, filesToDelete := filterManifestsToKeep(files)
var kept string
for _, f := range filesToKeep { for _, f := range filesToKeep {
kept += f.Name + "\n" kept += f.Name + "\n"
} }
var builder strings.Builder
for _, file := range filesToDelete { for _, file := range filesToDelete {
b := bytes.NewBufferString(strings.TrimSpace(file.Content)) builder.WriteString("\n---\n" + file.Content)
if b.Len() == 0 {
continue
}
if err := u.cfg.KubeClient.Delete(b); err != nil {
u.cfg.Log("uninstall: Failed deletion of %q: %s", rel.Name, err)
if err == kube.ErrNoObjectsVisited {
// Rewrite the message from "no objects visited"
err = errors.New("object not found, skipping delete")
}
errs = append(errs, err)
} }
resources, err := u.cfg.KubeClient.Build(strings.NewReader(builder.String()))
if err != nil {
return "", []error{errors.Wrap(err, "unable to build kubernetes objects for delete")}
} }
_, errs := u.cfg.KubeClient.Delete(resources)
return kept, errs return kept, errs
} }

@ -19,10 +19,10 @@ package action
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"sort"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"helm.sh/helm/pkg/chart" "helm.sh/helm/pkg/chart"
"helm.sh/helm/pkg/chartutil" "helm.sh/helm/pkg/chartutil"
@ -76,7 +76,7 @@ func (u *Upgrade) Run(name string, chart *chart.Chart) (*release.Release, error)
u.Wait = u.Wait || u.Atomic u.Wait = u.Wait || u.Atomic
if err := validateReleaseName(name); err != nil { if err := validateReleaseName(name); err != nil {
return nil, errors.Errorf("upgradeRelease: Release name is invalid: %s", name) return nil, errors.Errorf("release name is invalid: %s", name)
} }
u.cfg.Log("preparing upgrade for %s", name) u.cfg.Log("preparing upgrade for %s", name)
currentRelease, upgradedRelease, err := u.prepareUpgrade(name, chart) currentRelease, upgradedRelease, err := u.prepareUpgrade(name, chart)
@ -199,22 +199,42 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea
return upgradedRelease, nil return upgradedRelease, nil
} }
current, err := u.cfg.KubeClient.Build(bytes.NewBufferString(originalRelease.Manifest))
if err != nil {
return upgradedRelease, errors.Wrap(err, "unable to build kubernetes objects from current release manifest")
}
target, err := u.cfg.KubeClient.Build(bytes.NewBufferString(upgradedRelease.Manifest))
if err != nil {
return upgradedRelease, errors.Wrap(err, "unable to build kubernetes objects from new release manifest")
}
// pre-upgrade hooks // pre-upgrade hooks
if !u.DisableHooks { if !u.DisableHooks {
if err := u.execHook(upgradedRelease.Hooks, hooks.PreUpgrade); err != nil { if err := execHooks(u.cfg.KubeClient, upgradedRelease.Hooks, hooks.PreUpgrade, u.Timeout); err != nil {
return u.failRelease(upgradedRelease, fmt.Errorf("pre-upgrade hooks failed: %s", err)) return u.failRelease(upgradedRelease, fmt.Errorf("pre-upgrade hooks failed: %s", err))
} }
} else { } else {
u.cfg.Log("upgrade hooks disabled for %s", upgradedRelease.Name) u.cfg.Log("upgrade hooks disabled for %s", upgradedRelease.Name)
} }
if err := u.upgradeRelease(originalRelease, upgradedRelease); err != nil {
results, err := u.cfg.KubeClient.Update(current, target, u.Force)
if err != nil {
u.cfg.recordRelease(originalRelease) u.cfg.recordRelease(originalRelease)
return u.failRelease(upgradedRelease, err) return u.failRelease(upgradedRelease, err)
} }
if u.Recreate {
// NOTE: Because this is not critical for a release to succeed, we just
// log if an error occurs and continue onward. If we ever introduce log
// levels, we should make these error level logs so users are notified
// that they'll need to go do the cleanup on their own
if err := recreate(u.cfg, results.Updated); err != nil {
u.cfg.Log(err.Error())
}
}
if u.Wait { if u.Wait {
buf := bytes.NewBufferString(upgradedRelease.Manifest) if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil {
if err := u.cfg.KubeClient.Wait(buf, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease) u.cfg.recordRelease(originalRelease)
return u.failRelease(upgradedRelease, err) return u.failRelease(upgradedRelease, err)
} }
@ -222,7 +242,7 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea
// post-upgrade hooks // post-upgrade hooks
if !u.DisableHooks { if !u.DisableHooks {
if err := u.execHook(upgradedRelease.Hooks, hooks.PostUpgrade); err != nil { if err := execHooks(u.cfg.KubeClient, upgradedRelease.Hooks, hooks.PostUpgrade, u.Timeout); err != nil {
return u.failRelease(upgradedRelease, fmt.Errorf("post-upgrade hooks failed: %s", err)) return u.failRelease(upgradedRelease, fmt.Errorf("post-upgrade hooks failed: %s", err))
} }
} }
@ -282,14 +302,6 @@ func (u *Upgrade) failRelease(rel *release.Release, err error) (*release.Release
return rel, err return rel, err
} }
// upgradeRelease performs an upgrade from current to target release
func (u *Upgrade) upgradeRelease(current, target *release.Release) error {
cm := bytes.NewBufferString(current.Manifest)
tm := bytes.NewBufferString(target.Manifest)
// TODO add wait
return u.cfg.KubeClient.Update(cm, tm, u.Force, u.Recreate)
}
// reuseValues copies values from the current release to a new release if the // reuseValues copies values from the current release to a new release if the
// new release does not have any values. // new release does not have any values.
// //
@ -334,50 +346,38 @@ func validateManifest(c kube.Interface, manifest []byte) error {
return err return err
} }
// execHook executes all of the hooks for the given hook event. // recreate captures all the logic for recreating pods for both upgrade and
func (u *Upgrade) execHook(hs []*release.Hook, hook string) error { // rollback. If we end up refactoring rollback to use upgrade, this can just be
timeout := u.Timeout // made an unexported method on the upgrade action.
executingHooks := []*release.Hook{} func recreate(cfg *Configuration, resources kube.ResourceList) error {
for _, res := range resources {
for _, h := range hs { versioned := kube.AsVersioned(res)
for _, e := range h.Events { selector, err := kube.SelectorsForObject(versioned)
if string(e) == hook { if err != nil {
executingHooks = append(executingHooks, h) // If no selector is returned, it means this object is
} // definitely not a pod, so continue onward
} continue
} }
sort.Sort(hookByWeight(executingHooks)) client, err := cfg.KubernetesClientSet()
for _, h := range executingHooks { if err != nil {
if err := deleteHookByPolicy(u.cfg, h, hooks.BeforeHookCreation); err != nil { return errors.Wrapf(err, "unable to recreate pods for object %s/%s because an error occurred", res.Namespace, res.Name)
return err
} }
b := bytes.NewBufferString(h.Manifest) pods, err := client.CoreV1().Pods(res.Namespace).List(metav1.ListOptions{
if err := u.cfg.KubeClient.Create(b); err != nil { LabelSelector: selector.String(),
return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path) })
if err != nil {
return errors.Wrapf(err, "unable to recreate pods for object %s/%s because an error occurred", res.Namespace, res.Name)
} }
b.Reset()
b.WriteString(h.Manifest)
if err := u.cfg.KubeClient.WatchUntilReady(b, timeout); err != nil { // Restart pods
// If a hook is failed, checkout the annotation of the hook to determine whether the hook should be deleted for _, pod := range pods.Items {
// under failed condition. If so, then clear the corresponding resource object in the hook // Delete each pod for get them restarted with changed spec.
if err := deleteHookByPolicy(u.cfg, h, hooks.HookFailed); err != nil { if err := client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, metav1.NewPreconditionDeleteOptions(string(pod.UID))); err != nil {
return err return errors.Wrapf(err, "unable to recreate pods for object %s/%s because an error occurred", res.Namespace, res.Name)
} }
return err
}
}
// If all hooks are succeeded, checkout the annotation of each hook to determine whether the hook should be deleted
// under succeeded condition. If so, then clear the corresponding resource object in each hook
for _, h := range executingHooks {
if err := deleteHookByPolicy(u.cfg, h, hooks.HookSucceeded); err != nil {
return err
} }
h.LastRun = time.Now()
} }
return nil return nil
} }

@ -38,8 +38,6 @@ import (
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/resource" "k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
watchtools "k8s.io/client-go/tools/watch" watchtools "k8s.io/client-go/tools/watch"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
) )
@ -64,32 +62,20 @@ func New(getter genericclioptions.RESTClientGetter) *Client {
} }
} }
// KubernetesClientSet returns a client set from the client factory.
func (c *Client) KubernetesClientSet() (*kubernetes.Clientset, error) {
return c.Factory.KubernetesClientSet()
}
var nopLogger = func(_ string, _ ...interface{}) {} var nopLogger = func(_ string, _ ...interface{}) {}
// Create creates Kubernetes resources from an io.reader. // Create creates Kubernetes resources specified in the resource list.
// func (c *Client) Create(resources ResourceList) (*Result, error) {
// Namespace will set the namespace. c.Log("creating %d resource(s)", len(resources))
func (c *Client) Create(reader io.Reader) error { if err := perform(resources, createResource); err != nil {
c.Log("building resources from manifest") return nil, err
infos, err := c.BuildUnstructured(reader)
if err != nil {
return err
} }
c.Log("creating %d resource(s)", len(infos)) return &Result{Created: resources}, nil
return perform(infos, createResource)
} }
func (c *Client) Wait(reader io.Reader, timeout time.Duration) error { // Wait up to the given timeout for the specified resources to be ready
infos, err := c.BuildUnstructured(reader) func (c *Client) Wait(resources ResourceList, timeout time.Duration) error {
if err != nil { cs, err := c.Factory.KubernetesClientSet()
return err
}
cs, err := c.KubernetesClientSet()
if err != nil { if err != nil {
return err return err
} }
@ -98,7 +84,7 @@ func (c *Client) Wait(reader io.Reader, timeout time.Duration) error {
log: c.Log, log: c.Log,
timeout: timeout, timeout: timeout,
} }
return w.waitForResources(infos) return w.waitForResources(resources)
} }
func (c *Client) namespace() string { func (c *Client) namespace() string {
@ -117,16 +103,8 @@ func (c *Client) newBuilder() *resource.Builder {
Flatten() Flatten()
} }
func (c *Client) validator() resource.ContentValidator { // Build validates for Kubernetes objects and returns unstructured infos.
schema, err := c.Factory.Validator(true) func (c *Client) Build(reader io.Reader) (ResourceList, error) {
if err != nil {
c.Log("warning: failed to load schema: %s", err)
}
return schema
}
// BuildUnstructured validates for Kubernetes objects and returns unstructured infos.
func (c *Client) BuildUnstructured(reader io.Reader) (Result, error) {
result, err := c.newBuilder(). result, err := c.newBuilder().
Unstructured(). Unstructured().
Stream(reader, ""). Stream(reader, "").
@ -134,39 +112,16 @@ func (c *Client) BuildUnstructured(reader io.Reader) (Result, error) {
return result, scrubValidationError(err) return result, scrubValidationError(err)
} }
// Build validates for Kubernetes objects and returns resource Infos from a io.Reader.
func (c *Client) Build(reader io.Reader) (Result, error) {
result, err := c.newBuilder().
WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
Schema(c.validator()).
Stream(reader, "").
Do().
Infos()
return result, scrubValidationError(err)
}
// Update reads in the current configuration and a target configuration from io.reader // Update reads in the current configuration and a target configuration from io.reader
// and creates resources that don't already exists, updates resources that have been modified // and creates resources that don't already exists, updates resources that have been modified
// in the target configuration and deletes resources from the current configuration that are // in the target configuration and deletes resources from the current configuration that are
// not present in the target configuration. // not present in the target configuration.
// func (c *Client) Update(original, target ResourceList, force bool) (*Result, error) {
// Namespace will set the namespaces.
func (c *Client) Update(originalReader, targetReader io.Reader, force, recreate bool) error {
original, err := c.BuildUnstructured(originalReader)
if err != nil {
return errors.Wrap(err, "failed decoding reader into objects")
}
c.Log("building resources from updated manifest")
target, err := c.BuildUnstructured(targetReader)
if err != nil {
return errors.Wrap(err, "failed decoding reader into objects")
}
updateErrors := []string{} updateErrors := []string{}
res := &Result{}
c.Log("checking %d resources for changes", len(target)) c.Log("checking %d resources for changes", len(target))
err = target.Visit(func(info *resource.Info, err error) error { err := target.Visit(func(info *resource.Info, err error) error {
if err != nil { if err != nil {
return err return err
} }
@ -182,6 +137,9 @@ func (c *Client) Update(originalReader, targetReader io.Reader, force, recreate
return errors.Wrap(err, "failed to create resource") return errors.Wrap(err, "failed to create resource")
} }
// Append the created resource to the results
res.Created = append(res.Created, info)
kind := info.Mapping.GroupVersionKind.Kind kind := info.Mapping.GroupVersionKind.Kind
c.Log("Created a new %s called %q\n", kind, info.Name) c.Log("Created a new %s called %q\n", kind, info.Name)
return nil return nil
@ -193,43 +151,64 @@ func (c *Client) Update(originalReader, targetReader io.Reader, force, recreate
return errors.Errorf("no %s with the name %q found", kind, info.Name) return errors.Errorf("no %s with the name %q found", kind, info.Name)
} }
if err := updateResource(c, info, originalInfo.Object, force, recreate); err != nil { if err := updateResource(c, info, originalInfo.Object, force); err != nil {
c.Log("error updating the resource %q:\n\t %v", info.Name, err) c.Log("error updating the resource %q:\n\t %v", info.Name, err)
updateErrors = append(updateErrors, err.Error()) updateErrors = append(updateErrors, err.Error())
} }
// Because we check for errors later, append the info regardless
res.Updated = append(res.Updated, info)
return nil return nil
}) })
switch { switch {
case err != nil: case err != nil:
return err return nil, err
case len(updateErrors) != 0: case len(updateErrors) != 0:
return errors.Errorf(strings.Join(updateErrors, " && ")) return nil, errors.Errorf(strings.Join(updateErrors, " && "))
} }
for _, info := range original.Difference(target) { for _, info := range original.Difference(target) {
c.Log("Deleting %q in %s...", info.Name, info.Namespace) c.Log("Deleting %q in %s...", info.Name, info.Namespace)
if err := deleteResource(info); err != nil { if err := deleteResource(info); err != nil {
c.Log("Failed to delete %q, err: %s", info.Name, err) c.Log("Failed to delete %q, err: %s", info.Name, err)
} else {
// Only append ones we succeeded in deleting
res.Deleted = append(res.Deleted, info)
} }
} }
return nil return res, nil
} }
// Delete deletes Kubernetes resources from an io.reader. // Delete deletes Kubernetes resources specified in the resources list. It will
// // attempt to delete all resources even if one or more fail and collect any
// Namespace will set the namespace. // errors. All successfully deleted items will be returned in the `Deleted`
func (c *Client) Delete(reader io.Reader) error { // ResourceList that is part of the result.
infos, err := c.BuildUnstructured(reader) func (c *Client) Delete(resources ResourceList) (*Result, []error) {
if err != nil { var errs []error
return err res := &Result{}
} err := perform(resources, func(info *resource.Info) error {
return perform(infos, func(info *resource.Info) error {
c.Log("Starting delete for %q %s", info.Name, info.Mapping.GroupVersionKind.Kind) c.Log("Starting delete for %q %s", info.Name, info.Mapping.GroupVersionKind.Kind)
err := deleteResource(info) if err := c.skipIfNotFound(deleteResource(info)); err != nil {
return c.skipIfNotFound(err) // Collect the error and continue on
errs = append(errs, err)
} else {
res.Deleted = append(res.Deleted, info)
}
return nil
}) })
if err != nil {
// Rewrite the message from "no objects visited" if that is what we got
// back
if err == ErrNoObjectsVisited {
err = errors.New("object not found, skipping delete")
}
errs = append(errs, err)
}
if errs != nil {
return nil, errs
}
return res, nil
} }
func (c *Client) skipIfNotFound(err error) error { func (c *Client) skipIfNotFound(err error) error {
@ -246,7 +225,7 @@ func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error {
} }
} }
// WatchUntilReady watches the resource given in the reader, and waits until it is ready. // WatchUntilReady watches the resources given and waits until it is ready.
// //
// This function is mainly for hook implementations. It watches for a resource to // This function is mainly for hook implementations. It watches for a resource to
// hit a particular milestone. The milestone depends on the Kind. // hit a particular milestone. The milestone depends on the Kind.
@ -258,17 +237,13 @@ func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error {
// ascertained by watching the Status fields in a job's output. // ascertained by watching the Status fields in a job's output.
// //
// Handling for other kinds will be added as necessary. // Handling for other kinds will be added as necessary.
func (c *Client) WatchUntilReady(reader io.Reader, timeout time.Duration) error { func (c *Client) WatchUntilReady(resources ResourceList, timeout time.Duration) error {
infos, err := c.Build(reader)
if err != nil {
return err
}
// For jobs, there's also the option to do poll c.Jobs(namespace).Get(): // For jobs, there's also the option to do poll c.Jobs(namespace).Get():
// https://github.com/adamreese/kubernetes/blob/master/test/e2e/job.go#L291-L300 // https://github.com/adamreese/kubernetes/blob/master/test/e2e/job.go#L291-L300
return perform(infos, c.watchTimeout(timeout)) return perform(resources, c.watchTimeout(timeout))
} }
func perform(infos Result, fn func(*resource.Info) error) error { func perform(infos ResourceList, fn func(*resource.Info) error) error {
if len(infos) == 0 { if len(infos) == 0 {
return ErrNoObjectsVisited return ErrNoObjectsVisited
} }
@ -315,7 +290,7 @@ func createPatch(target *resource.Info, current runtime.Object) ([]byte, types.P
} }
// Get a versioned object // Get a versioned object
versionedObject := asVersioned(target) versionedObject := AsVersioned(target)
// Unstructured objects, such as CRDs, may not have an not registered error // Unstructured objects, such as CRDs, may not have an not registered error
// returned from ConvertToVersion. Anything that's unstructured should // returned from ConvertToVersion. Anything that's unstructured should
@ -330,7 +305,7 @@ func createPatch(target *resource.Info, current runtime.Object) ([]byte, types.P
return patch, types.StrategicMergePatchType, err return patch, types.StrategicMergePatchType, err
} }
func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, force, recreate bool) error { func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, force bool) error {
patch, patchType, err := createPatch(target, currentObj) patch, patchType, err := createPatch(target, currentObj)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to create patch") return errors.Wrap(err, "failed to create patch")
@ -377,37 +352,6 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object,
} }
} }
if !recreate {
return nil
}
versioned := asVersioned(target)
selector, err := selectorsForObject(versioned)
if err != nil {
return nil
}
client, err := c.KubernetesClientSet()
if err != nil {
return err
}
pods, err := client.CoreV1().Pods(target.Namespace).List(metav1.ListOptions{
LabelSelector: selector.String(),
})
if err != nil {
return err
}
// Restart pods
for _, pod := range pods.Items {
c.Log("Restarting pod: %v/%v", pod.Namespace, pod.Name)
// Delete each pod for get them restarted with changed spec.
if err := client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, metav1.NewPreconditionDeleteOptions(string(pod.UID))); err != nil {
return err
}
}
return nil return nil
} }
@ -429,6 +373,9 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel() defer cancel()
_, err = watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) { _, err = watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) {
// Make sure the incoming object is versioned as we use unstructured
// objects when we build manifests
obj := convertWithMapper(e.Object, info.Mapping)
switch e.Type { switch e.Type {
case watch.Added, watch.Modified: case watch.Added, watch.Modified:
// For things like a secret or a config map, this is the best indicator // For things like a secret or a config map, this is the best indicator
@ -437,7 +384,7 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err
// we don't really do anything to support these as hooks. // we don't really do anything to support these as hooks.
c.Log("Add/Modify event for %s: %v", info.Name, e.Type) c.Log("Add/Modify event for %s: %v", info.Name, e.Type)
if kind == "Job" { if kind == "Job" {
return c.waitForJob(e, info.Name) return c.waitForJob(obj, info.Name)
} }
return true, nil return true, nil
case watch.Deleted: case watch.Deleted:
@ -457,10 +404,10 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err
// waitForJob is a helper that waits for a job to complete. // waitForJob is a helper that waits for a job to complete.
// //
// This operates on an event returned from a watcher. // This operates on an event returned from a watcher.
func (c *Client) waitForJob(e watch.Event, name string) (bool, error) { func (c *Client) waitForJob(obj runtime.Object, name string) (bool, error) {
o, ok := e.Object.(*batch.Job) o, ok := obj.(*batch.Job)
if !ok { if !ok {
return true, errors.Errorf("expected %s to be a *batch.Job, got %T", name, e.Object) return true, errors.Errorf("expected %s to be a *batch.Job, got %T", name, obj)
} }
for _, c := range o.Status.Conditions { for _, c := range o.Status.Conditions {
@ -491,7 +438,7 @@ func scrubValidationError(err error) error {
// WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase // WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase
// and returns said phase (PodSucceeded or PodFailed qualify). // and returns said phase (PodSucceeded or PodFailed qualify).
func (c *Client) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) { func (c *Client) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) {
client, _ := c.KubernetesClientSet() client, _ := c.Factory.KubernetesClientSet()
to := int64(timeout) to := int64(timeout)
watcher, err := client.CoreV1().Pods(c.namespace()).Watch(metav1.ListOptions{ watcher, err := client.CoreV1().Pods(c.namespace()).Watch(metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name), FieldSelector: fmt.Sprintf("metadata.name=%s", name),

@ -142,7 +142,16 @@ func TestUpdate(t *testing.T) {
} }
}), }),
} }
if err := c.Update(objBody(&listA), objBody(&listB), false, false); err != nil { first, err := c.Build(objBody(&listA))
if err != nil {
t.Fatal(err)
}
second, err := c.Build(objBody(&listB))
if err != nil {
t.Fatal(err)
}
if _, err := c.Update(first, second, false); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// TODO: Find a way to test methods that use Client Set // TODO: Find a way to test methods that use Client Set
@ -188,11 +197,6 @@ func TestBuild(t *testing.T) {
namespace: "test", namespace: "test",
reader: strings.NewReader(guestbookManifest), reader: strings.NewReader(guestbookManifest),
count: 6, count: 6,
}, {
name: "Invalid schema",
namespace: "test",
reader: strings.NewReader(testInvalidServiceManifest),
err: true,
}, { }, {
name: "Valid input, deploying resources into different namespaces", name: "Valid input, deploying resources into different namespaces",
namespace: "test", namespace: "test",
@ -272,24 +276,41 @@ func TestPerform(t *testing.T) {
func TestReal(t *testing.T) { func TestReal(t *testing.T) {
t.Skip("This is a live test, comment this line to run") t.Skip("This is a live test, comment this line to run")
c := New(nil) c := New(nil)
if err := c.Create(strings.NewReader(guestbookManifest)); err != nil { resources, err := c.Build(strings.NewReader(guestbookManifest))
if err != nil {
t.Fatal(err)
}
if _, err := c.Create(resources); err != nil {
t.Fatal(err) t.Fatal(err)
} }
testSvcEndpointManifest := testServiceManifest + "\n---\n" + testEndpointManifest testSvcEndpointManifest := testServiceManifest + "\n---\n" + testEndpointManifest
c = New(nil) c = New(nil)
if err := c.Create(strings.NewReader(testSvcEndpointManifest)); err != nil { resources, err = c.Build(strings.NewReader(testSvcEndpointManifest))
if err != nil {
t.Fatal(err)
}
if _, err := c.Create(resources); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := c.Delete(strings.NewReader(testEndpointManifest)); err != nil { resources, err = c.Build(strings.NewReader(testEndpointManifest))
if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// ensures that delete does not fail if a resource is not found if _, errs := c.Delete(resources); errs != nil {
if err := c.Delete(strings.NewReader(testSvcEndpointManifest)); err != nil { t.Fatal(errs)
}
resources, err = c.Build(strings.NewReader(testSvcEndpointManifest))
if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// ensures that delete does not fail if a resource is not found
if _, errs := c.Delete(resources); errs != nil {
t.Fatal(errs)
}
} }
const testServiceManifest = ` const testServiceManifest = `
@ -306,14 +327,6 @@ spec:
targetPort: 9376 targetPort: 9376
` `
const testInvalidServiceManifest = `
kind: Service
apiVersion: v1
spec:
ports:
- port: "80"
`
const testEndpointManifest = ` const testEndpointManifest = `
kind: Endpoints kind: Endpoints
apiVersion: v1 apiVersion: v1

@ -17,19 +17,28 @@ limitations under the License.
package kube // import "helm.sh/helm/pkg/kube" package kube // import "helm.sh/helm/pkg/kube"
import ( import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/cli-runtime/pkg/resource" "k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
) )
func asVersioned(info *resource.Info) runtime.Object { // AsVersioned converts the given info into a runtime.Object with the correct
gv := runtime.GroupVersioner(schema.GroupVersions(scheme.Scheme.PrioritizedVersionsAllGroups())) // group and version set
if info.Mapping != nil { func AsVersioned(info *resource.Info) runtime.Object {
gv = info.Mapping.GroupVersionKind.GroupVersion() return convertWithMapper(info.Object, info.Mapping)
} }
if obj, err := runtime.ObjectConvertor(scheme.Scheme).ConvertToVersion(info.Object, gv); err == nil {
// convertWithMapper converts the given object with the optional provided
// RESTMapping. If no mapping is provided, the default schema versioner is used
func convertWithMapper(obj runtime.Object, mapping *meta.RESTMapping) runtime.Object {
var gv = runtime.GroupVersioner(schema.GroupVersions(scheme.Scheme.PrioritizedVersionsAllGroups()))
if mapping != nil {
gv = mapping.GroupVersionKind.GroupVersion()
}
if obj, err := runtime.ObjectConvertor(scheme.Scheme).ConvertToVersion(obj, gv); err == nil {
return obj return obj
} }
return info.Object return obj
} }

@ -34,7 +34,6 @@ type FailingKubeClient struct {
PrintingKubeClient PrintingKubeClient
CreateError error CreateError error
WaitError error WaitError error
GetError error
DeleteError error DeleteError error
WatchUntilReadyError error WatchUntilReadyError error
UpdateError error UpdateError error
@ -44,69 +43,53 @@ type FailingKubeClient struct {
} }
// Create returns the configured error if set or prints // Create returns the configured error if set or prints
func (f *FailingKubeClient) Create(r io.Reader) error { func (f *FailingKubeClient) Create(resources kube.ResourceList) (*kube.Result, error) {
if f.CreateError != nil { if f.CreateError != nil {
return f.CreateError return nil, f.CreateError
} }
return f.PrintingKubeClient.Create(r) return f.PrintingKubeClient.Create(resources)
} }
// Wait returns the configured error if set or prints // Wait returns the configured error if set or prints
func (f *FailingKubeClient) Wait(r io.Reader, d time.Duration) error { func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration) error {
if f.WaitError != nil { if f.WaitError != nil {
return f.WaitError return f.WaitError
} }
return f.PrintingKubeClient.Wait(r, d) return f.PrintingKubeClient.Wait(resources, d)
}
// Create returns the configured error if set or prints
func (f *FailingKubeClient) Get(r io.Reader) (string, error) {
if f.GetError != nil {
return "", f.GetError
}
return f.PrintingKubeClient.Get(r)
} }
// Delete returns the configured error if set or prints // Delete returns the configured error if set or prints
func (f *FailingKubeClient) Delete(r io.Reader) error { func (f *FailingKubeClient) Delete(resources kube.ResourceList) (*kube.Result, []error) {
if f.DeleteError != nil { if f.DeleteError != nil {
return f.DeleteError return nil, []error{f.DeleteError}
} }
return f.PrintingKubeClient.Delete(r) return f.PrintingKubeClient.Delete(resources)
} }
// WatchUntilReady returns the configured error if set or prints // WatchUntilReady returns the configured error if set or prints
func (f *FailingKubeClient) WatchUntilReady(r io.Reader, d time.Duration) error { func (f *FailingKubeClient) WatchUntilReady(resources kube.ResourceList, d time.Duration) error {
if f.WatchUntilReadyError != nil { if f.WatchUntilReadyError != nil {
return f.WatchUntilReadyError return f.WatchUntilReadyError
} }
return f.PrintingKubeClient.WatchUntilReady(r, d) return f.PrintingKubeClient.WatchUntilReady(resources, d)
} }
// Update returns the configured error if set or prints // Update returns the configured error if set or prints
func (f *FailingKubeClient) Update(r, modifiedReader io.Reader, not, needed bool) error { func (f *FailingKubeClient) Update(r, modified kube.ResourceList, ignoreMe bool) (*kube.Result, error) {
if f.UpdateError != nil { if f.UpdateError != nil {
return f.UpdateError return nil, f.UpdateError
} }
return f.PrintingKubeClient.Update(r, modifiedReader, not, needed) return f.PrintingKubeClient.Update(r, modified, ignoreMe)
} }
// Build returns the configured error if set or prints // Build returns the configured error if set or prints
func (f *FailingKubeClient) Build(r io.Reader) (kube.Result, error) { func (f *FailingKubeClient) Build(r io.Reader) (kube.ResourceList, error) {
if f.BuildError != nil { if f.BuildError != nil {
return []*resource.Info{}, f.BuildError return []*resource.Info{}, f.BuildError
} }
return f.PrintingKubeClient.Build(r) return f.PrintingKubeClient.Build(r)
} }
// BuildUnstructured returns the configured error if set or prints
func (f *FailingKubeClient) BuildUnstructured(r io.Reader) (kube.Result, error) {
if f.BuildUnstructuredError != nil {
return []*resource.Info{}, f.BuildUnstructuredError
}
return f.PrintingKubeClient.Build(r)
}
// WaitAndGetCompletedPodPhase returns the configured error if set or prints // WaitAndGetCompletedPodPhase returns the configured error if set or prints
func (f *FailingKubeClient) WaitAndGetCompletedPodPhase(s string, d time.Duration) (v1.PodPhase, error) { func (f *FailingKubeClient) WaitAndGetCompletedPodPhase(s string, d time.Duration) (v1.PodPhase, error) {
if f.WaitAndGetCompletedPodPhaseError != nil { if f.WaitAndGetCompletedPodPhaseError != nil {

@ -18,6 +18,7 @@ package fake
import ( import (
"io" "io"
"strings"
"time" "time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
@ -33,52 +34,62 @@ type PrintingKubeClient struct {
} }
// Create prints the values of what would be created with a real KubeClient. // Create prints the values of what would be created with a real KubeClient.
func (p *PrintingKubeClient) Create(r io.Reader) error { func (p *PrintingKubeClient) Create(resources kube.ResourceList) (*kube.Result, error) {
_, err := io.Copy(p.Out, r) _, err := io.Copy(p.Out, bufferize(resources))
return err if err != nil {
return nil, err
} }
return &kube.Result{Created: resources}, nil
func (p *PrintingKubeClient) Wait(r io.Reader, _ time.Duration) error {
_, err := io.Copy(p.Out, r)
return err
} }
// Get prints the values of what would be created with a real KubeClient. func (p *PrintingKubeClient) Wait(resources kube.ResourceList, _ time.Duration) error {
func (p *PrintingKubeClient) Get(r io.Reader) (string, error) { _, err := io.Copy(p.Out, bufferize(resources))
_, err := io.Copy(p.Out, r) return err
return "", err
} }
// Delete implements KubeClient delete. // Delete implements KubeClient delete.
// //
// It only prints out the content to be deleted. // It only prints out the content to be deleted.
func (p *PrintingKubeClient) Delete(r io.Reader) error { func (p *PrintingKubeClient) Delete(resources kube.ResourceList) (*kube.Result, []error) {
_, err := io.Copy(p.Out, r) _, err := io.Copy(p.Out, bufferize(resources))
return err if err != nil {
return nil, []error{err}
}
return &kube.Result{Deleted: resources}, nil
} }
// WatchUntilReady implements KubeClient WatchUntilReady. // WatchUntilReady implements KubeClient WatchUntilReady.
func (p *PrintingKubeClient) WatchUntilReady(r io.Reader, _ time.Duration) error { func (p *PrintingKubeClient) WatchUntilReady(resources kube.ResourceList, _ time.Duration) error {
_, err := io.Copy(p.Out, r) _, err := io.Copy(p.Out, bufferize(resources))
return err return err
} }
// Update implements KubeClient Update. // Update implements KubeClient Update.
func (p *PrintingKubeClient) Update(_, modifiedReader io.Reader, _, _ bool) error { func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ bool) (*kube.Result, error) {
_, err := io.Copy(p.Out, modifiedReader) _, err := io.Copy(p.Out, bufferize(modified))
return err if err != nil {
return nil, err
}
// TODO: This doesn't completely mock out have some that get created,
// updated, and deleted. I don't think these are used in any unit tests, but
// we may want to refactor a way to handle future tests
return &kube.Result{Updated: modified}, nil
} }
// Build implements KubeClient Build. // Build implements KubeClient Build.
func (p *PrintingKubeClient) Build(_ io.Reader) (kube.Result, error) { func (p *PrintingKubeClient) Build(_ io.Reader) (kube.ResourceList, error) {
return []*resource.Info{}, nil return []*resource.Info{}, nil
} }
func (p *PrintingKubeClient) BuildUnstructured(_ io.Reader) (kube.Result, error) {
return p.Build(nil)
}
// WaitAndGetCompletedPodPhase implements KubeClient WaitAndGetCompletedPodPhase. // WaitAndGetCompletedPodPhase implements KubeClient WaitAndGetCompletedPodPhase.
func (p *PrintingKubeClient) WaitAndGetCompletedPodPhase(_ string, _ time.Duration) (v1.PodPhase, error) { func (p *PrintingKubeClient) WaitAndGetCompletedPodPhase(_ string, _ time.Duration) (v1.PodPhase, error) {
return v1.PodSucceeded, nil return v1.PodSucceeded, nil
} }
func bufferize(resources kube.ResourceList) io.Reader {
var builder strings.Builder
for _, info := range resources {
builder.WriteString(info.String() + "\n")
}
return strings.NewReader(builder.String())
}

@ -28,35 +28,29 @@ import (
// A KubernetesClient must be concurrency safe. // A KubernetesClient must be concurrency safe.
type Interface interface { type Interface interface {
// Create creates one or more resources. // Create creates one or more resources.
// Create(resources ResourceList) (*Result, error)
// reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n").
Create(reader io.Reader) error
Wait(r io.Reader, timeout time.Duration) error Wait(resources ResourceList, timeout time.Duration) error
// Delete destroys one or more resources. // Delete destroys one or more resources.
// Delete(resources ResourceList) (*Result, []error)
// reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n").
Delete(io.Reader) error
// Watch the resource in reader until it is "ready". // Watch the resource in reader until it is "ready". This method
// //
// For Jobs, "ready" means the job ran to completion (excited without error). // For Jobs, "ready" means the job ran to completion (excited without error).
// For all other kinds, it means the kind was created or modified without // For all other kinds, it means the kind was created or modified without
// error. // error.
WatchUntilReady(reader io.Reader, timeout time.Duration) error WatchUntilReady(resources ResourceList, timeout time.Duration) error
// Update updates one or more resources or creates the resource // Update updates one or more resources or creates the resource
// if it doesn't exist. // if it doesn't exist.
Update(original, target ResourceList, force bool) (*Result, error)
// Build creates a resource list from a Reader
// //
// reader must contain a YAML stream (one or more YAML documents separated // reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n"). // by "\n---\n")
Update(originalReader, modifiedReader io.Reader, force bool, recreate bool) error Build(reader io.Reader) (ResourceList, error)
Build(reader io.Reader) (Result, error)
BuildUnstructured(reader io.Reader) (Result, error)
// WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase // WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase
// and returns said phase (PodSucceeded or PodFailed qualify). // and returns said phase (PodSucceeded or PodFailed qualify).

@ -0,0 +1,85 @@
/*
Copyright The Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube // import "helm.sh/helm/pkg/kube"
import "k8s.io/cli-runtime/pkg/resource"
// ResourceList provides convenience methods for comparing collections of Infos.
type ResourceList []*resource.Info
// Append adds an Info to the Result.
func (r *ResourceList) Append(val *resource.Info) {
*r = append(*r, val)
}
// Visit implements resource.Visitor.
func (r ResourceList) Visit(fn resource.VisitorFunc) error {
for _, i := range r {
if err := fn(i, nil); err != nil {
return err
}
}
return nil
}
// Filter returns a new Result with Infos that satisfy the predicate fn.
func (r ResourceList) Filter(fn func(*resource.Info) bool) ResourceList {
var result ResourceList
for _, i := range r {
if fn(i) {
result.Append(i)
}
}
return result
}
// Get returns the Info from the result that matches the name and kind.
func (r ResourceList) Get(info *resource.Info) *resource.Info {
for _, i := range r {
if isMatchingInfo(i, info) {
return i
}
}
return nil
}
// Contains checks to see if an object exists.
func (r ResourceList) Contains(info *resource.Info) bool {
for _, i := range r {
if isMatchingInfo(i, info) {
return true
}
}
return false
}
// Difference will return a new Result with objects not contained in rs.
func (r ResourceList) Difference(rs ResourceList) ResourceList {
return r.Filter(func(info *resource.Info) bool {
return !rs.Contains(info)
})
}
// Intersect will return a new Result with objects contained in both Results.
func (r ResourceList) Intersect(rs ResourceList) ResourceList {
return r.Filter(rs.Contains)
}
// isMatchingInfo returns true if infos match on Name and GroupVersionKind.
func isMatchingInfo(a, b *resource.Info) bool {
return a.Name == b.Name && a.Mapping.GroupVersionKind.Kind == b.Mapping.GroupVersionKind.Kind
}

@ -24,7 +24,7 @@ import (
"k8s.io/cli-runtime/pkg/resource" "k8s.io/cli-runtime/pkg/resource"
) )
func TestResult(t *testing.T) { func TestResourceList(t *testing.T) {
mapping := &meta.RESTMapping{ mapping := &meta.RESTMapping{
Resource: schema.GroupVersionResource{Group: "group", Version: "version", Resource: "pod"}, Resource: schema.GroupVersionResource{Group: "group", Version: "version", Resource: "pod"},
} }
@ -33,7 +33,7 @@ func TestResult(t *testing.T) {
return &resource.Info{Name: name, Mapping: mapping} return &resource.Info{Name: name, Mapping: mapping}
} }
var r1, r2 Result var r1, r2 ResourceList
r1 = []*resource.Info{info("foo"), info("bar")} r1 = []*resource.Info{info("foo"), info("bar")}
r2 = []*resource.Info{info("bar")} r2 = []*resource.Info{info("bar")}

@ -14,72 +14,15 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package kube // import "helm.sh/helm/pkg/kube" package kube
import "k8s.io/cli-runtime/pkg/resource" // Result contains the information of created, updated, and deleted resources
// for various kube API calls along with helper methods for using those
// Result provides convenience methods for comparing collections of Infos. // resources
type Result []*resource.Info type Result struct {
Created ResourceList
// Append adds an Info to the Result. Updated ResourceList
func (r *Result) Append(val *resource.Info) { Deleted ResourceList
*r = append(*r, val)
}
// Visit implements resource.Visitor.
func (r Result) Visit(fn resource.VisitorFunc) error {
for _, i := range r {
if err := fn(i, nil); err != nil {
return err
}
}
return nil
} }
// Filter returns a new Result with Infos that satisfy the predicate fn. // If needed, we can add methods to the Result type for things like diffing
func (r Result) Filter(fn func(*resource.Info) bool) Result {
var result Result
for _, i := range r {
if fn(i) {
result.Append(i)
}
}
return result
}
// Get returns the Info from the result that matches the name and kind.
func (r Result) Get(info *resource.Info) *resource.Info {
for _, i := range r {
if isMatchingInfo(i, info) {
return i
}
}
return nil
}
// Contains checks to see if an object exists.
func (r Result) Contains(info *resource.Info) bool {
for _, i := range r {
if isMatchingInfo(i, info) {
return true
}
}
return false
}
// Difference will return a new Result with objects not contained in rs.
func (r Result) Difference(rs Result) Result {
return r.Filter(func(info *resource.Info) bool {
return !rs.Contains(info)
})
}
// Intersect will return a new Result with objects contained in both Results.
func (r Result) Intersect(rs Result) Result {
return r.Filter(rs.Contains)
}
// isMatchingInfo returns true if infos match on Name and GroupVersionKind.
func isMatchingInfo(a, b *resource.Info) bool {
return a.Name == b.Name && a.Mapping.GroupVersionKind.Kind == b.Mapping.GroupVersionKind.Kind
}

@ -44,7 +44,7 @@ type waiter struct {
// waitForResources polls to get the current status of all pods, PVCs, and Services // waitForResources polls to get the current status of all pods, PVCs, and Services
// until all are ready or a timeout is reached // until all are ready or a timeout is reached
func (w *waiter) waitForResources(created Result) error { func (w *waiter) waitForResources(created ResourceList) error {
w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout)
return wait.Poll(2*time.Second, w.timeout, func() (bool, error) { return wait.Poll(2*time.Second, w.timeout, func() (bool, error) {
@ -56,7 +56,7 @@ func (w *waiter) waitForResources(created Result) error {
ok = true ok = true
err error err error
) )
switch value := asVersioned(v).(type) { switch value := AsVersioned(v).(type) {
case *corev1.Pod: case *corev1.Pod:
pod, err := w.c.CoreV1().Pods(value.Namespace).Get(value.Name, metav1.GetOptions{}) pod, err := w.c.CoreV1().Pods(value.Namespace).Get(value.Name, metav1.GetOptions{})
if err != nil || !w.isPodReady(pod) { if err != nil || !w.isPodReady(pod) {
@ -178,7 +178,7 @@ func (w *waiter) podsReadyForObject(namespace string, obj runtime.Object) (bool,
} }
func (w *waiter) podsforObject(namespace string, obj runtime.Object) ([]corev1.Pod, error) { func (w *waiter) podsforObject(namespace string, obj runtime.Object) ([]corev1.Pod, error) {
selector, err := selectorsForObject(obj) selector, err := SelectorsForObject(obj)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -300,10 +300,10 @@ func getPods(client kubernetes.Interface, namespace, selector string) ([]corev1.
return list.Items, err return list.Items, err
} }
// selectorsForObject returns the pod label selector for a given object // SelectorsForObject returns the pod label selector for a given object
// //
// Modified version of https://github.com/kubernetes/kubernetes/blob/v1.14.1/pkg/kubectl/polymorphichelpers/helpers.go#L84 // Modified version of https://github.com/kubernetes/kubernetes/blob/v1.14.1/pkg/kubectl/polymorphichelpers/helpers.go#L84
func selectorsForObject(object runtime.Object) (selector labels.Selector, err error) { func SelectorsForObject(object runtime.Object) (selector labels.Selector, err error) {
switch t := object.(type) { switch t := object.(type) {
case *extensionsv1beta1.ReplicaSet: case *extensionsv1beta1.ReplicaSet:
selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector) selector, err = metav1.LabelSelectorAsSelector(t.Spec.Selector)

@ -37,8 +37,11 @@ type Environment struct {
} }
func (env *Environment) createTestPod(test *test) error { func (env *Environment) createTestPod(test *test) error {
b := bytes.NewBufferString(test.manifest) resources, err := env.KubeClient.Build(bytes.NewBufferString(test.manifest))
if err := env.KubeClient.Create(b); err != nil { if err != nil {
return err
}
if _, err := env.KubeClient.Create(resources); err != nil {
test.result.Info = err.Error() test.result.Info = err.Error()
test.result.Status = release.TestRunFailure test.result.Status = release.TestRunFailure
return err return err
@ -112,9 +115,15 @@ func (env *Environment) streamMessage(msg string, status release.TestRunStatus)
// DeleteTestPods deletes resources given in testManifests // DeleteTestPods deletes resources given in testManifests
func (env *Environment) DeleteTestPods(testManifests []string) { func (env *Environment) DeleteTestPods(testManifests []string) {
for _, testManifest := range testManifests { for _, testManifest := range testManifests {
err := env.KubeClient.Delete(bytes.NewBufferString(testManifest)) resources, err := env.KubeClient.Build(bytes.NewBufferString(testManifest))
if err != nil { if err != nil {
env.streamError(err.Error()) env.streamError(err.Error())
} }
_, errs := env.KubeClient.Delete(resources)
if err != nil {
for _, e := range errs {
env.streamError(e.Error())
}
}
} }
} }

@ -17,13 +17,14 @@ limitations under the License.
package releasetesting package releasetesting
import ( import (
"io" "io/ioutil"
"testing" "testing"
"time" "time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"helm.sh/helm/pkg/kube" "helm.sh/helm/pkg/kube"
"helm.sh/helm/pkg/kube/fake"
"helm.sh/helm/pkg/release" "helm.sh/helm/pkg/release"
) )
@ -237,14 +238,14 @@ func testSuiteFixture(testManifests []string) *TestSuite {
func testEnvFixture() *Environment { func testEnvFixture() *Environment {
return &Environment{ return &Environment{
Namespace: "default", Namespace: "default",
KubeClient: &mockKubeClient{}, KubeClient: &mockKubeClient{PrintingKubeClient: fake.PrintingKubeClient{Out: ioutil.Discard}},
Timeout: 1, Timeout: 1,
Messages: make(chan *release.TestReleaseResponse, 1), Messages: make(chan *release.TestReleaseResponse, 1),
} }
} }
type mockKubeClient struct { type mockKubeClient struct {
kube.Interface fake.PrintingKubeClient
podFail bool podFail bool
err error err error
} }
@ -255,5 +256,4 @@ func (c *mockKubeClient) WaitAndGetCompletedPodPhase(_ string, _ time.Duration)
} }
return v1.PodSucceeded, nil return v1.PodSucceeded, nil
} }
func (c *mockKubeClient) Create(_ io.Reader) error { return c.err } func (c *mockKubeClient) Create(_ kube.ResourceList) (*kube.Result, error) { return nil, c.err }
func (c *mockKubeClient) Delete(_ io.Reader) error { return nil }

Loading…
Cancel
Save