initial plumbing off time.Duration to context.Context

Signed-off-by: Josh Rowley <joshlrowley@gmail.com>
pull/12109/head
Josh Rowley 2 years ago
parent ad1fd058ad
commit 9e62789dde

@ -17,17 +17,18 @@ package action
import (
"bytes"
"context"
"sort"
"time"
"github.com/pkg/errors"
"helm.sh/helm/v3/pkg/kube"
"helm.sh/helm/v3/pkg/release"
helmtime "helm.sh/helm/v3/pkg/time"
)
// execHook executes all of the hooks for the given hook event.
func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent, timeout time.Duration) error {
func (cfg *Configuration) execHook(ctx context.Context, rl *release.Release, hook release.HookEvent) error {
executingHooks := []*release.Hook{}
for _, h := range rl.Hooks {
@ -79,8 +80,12 @@ func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent,
return errors.Wrapf(err, "warning: Hook %s %s failed", hook, h.Path)
}
ctxInterface, ok := cfg.KubeClient.(kube.ContextInterface)
if !ok {
panic("KubeClient does not satisfies kube.ContextInterface")
}
// Watch hook resources until they have completed
err = cfg.KubeClient.WatchUntilReady(resources, timeout)
err = ctxInterface.WatchUntilReadyWithContext(ctx, resources)
// Note the time of success/failure
h.LastRun.CompletedAt = helmtime.Now()
// Mark hook as succeeded or failed

@ -206,7 +206,9 @@ func (i *Install) installCRDs(crds []chart.CRD) error {
// If DryRun is set to true, this will prepare the release, but not install it
func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.Release, error) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(context.TODO(), i.Timeout)
defer cancel()
return i.RunWithContext(ctx, chrt, vals)
}
@ -360,6 +362,10 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma
}
}
// if err := ctx.Err(); err != nil {
// return rel, err
// }
// Store the release in history before continuing (new in Helm 3). We always know
// that this is a create operation.
if err := i.cfg.Releases.Create(rel); err != nil {
@ -368,27 +374,16 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma
// not working.
return rel, err
}
rChan := make(chan resultMessage)
ctxChan := make(chan resultMessage)
doneChan := make(chan struct{})
defer close(doneChan)
go i.performInstall(rChan, rel, toBeAdopted, resources)
go i.handleContext(ctx, ctxChan, doneChan, rel)
select {
case result := <-rChan:
return result.r, result.e
case result := <-ctxChan:
return result.r, result.e
}
return i.performInstall(ctx, rel, toBeAdopted, resources)
}
func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) {
func (i *Install) performInstall(ctx context.Context, rel *release.Release, toBeAdopted, resources kube.ResourceList) (*release.Release, error) {
// pre-install hooks
if !i.DisableHooks {
if err := i.cfg.execHook(rel, release.HookPreInstall, i.Timeout); err != nil {
i.reportToRun(c, rel, fmt.Errorf("failed pre-install: %s", err))
return
if err := i.cfg.execHook(ctx, rel, release.HookPreInstall); err != nil {
return i.failRelease(rel, fmt.Errorf("failed pre-install: %w", err))
}
}
@ -397,34 +392,31 @@ func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, t
// to true, since that is basically an upgrade operation.
if len(toBeAdopted) == 0 && len(resources) > 0 {
if _, err := i.cfg.KubeClient.Create(resources); err != nil {
i.reportToRun(c, rel, err)
return
return i.failRelease(rel, err)
}
} else if len(resources) > 0 {
if _, err := i.cfg.KubeClient.Update(toBeAdopted, resources, i.Force); err != nil {
i.reportToRun(c, rel, err)
return
return i.failRelease(rel, err)
}
}
if i.Wait {
kubeClient := i.cfg.KubeClient.(kube.ContextInterface)
if i.WaitForJobs {
if err := i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout); err != nil {
i.reportToRun(c, rel, err)
return
if err := kubeClient.WaitWithJobsContext(ctx, resources); err != nil {
return i.failRelease(rel, err)
}
} else {
if err := i.cfg.KubeClient.Wait(resources, i.Timeout); err != nil {
i.reportToRun(c, rel, err)
return
if err := kubeClient.WaitWithContext(ctx, resources); err != nil {
return i.failRelease(rel, err)
}
}
}
if !i.DisableHooks {
if err := i.cfg.execHook(rel, release.HookPostInstall, i.Timeout); err != nil {
i.reportToRun(c, rel, fmt.Errorf("failed post-install: %s", err))
return
if err := i.cfg.execHook(ctx, rel, release.HookPostInstall); err != nil {
return i.failRelease(rel, fmt.Errorf("failed post-install: %w", err))
}
}
@ -445,25 +437,9 @@ func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, t
i.cfg.Log("failed to record the release: %s", err)
}
i.reportToRun(c, rel, nil)
}
func (i *Install) handleContext(ctx context.Context, c chan<- resultMessage, done chan struct{}, rel *release.Release) {
select {
case <-ctx.Done():
err := ctx.Err()
i.reportToRun(c, rel, err)
case <-done:
return
}
}
func (i *Install) reportToRun(c chan<- resultMessage, rel *release.Release, err error) {
i.Lock.Lock()
if err != nil {
rel, err = i.failRelease(rel, err)
}
c <- resultMessage{r: rel, e: err}
i.Lock.Unlock()
return rel, nil
}
func (i *Install) failRelease(rel *release.Release, err error) (*release.Release, error) {
rel.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", i.ReleaseName, err.Error()))
if i.Atomic {

@ -387,6 +387,7 @@ func TestInstallRelease_Wait_Interrupted(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
time.AfterFunc(time.Second, cancel)
//cancel()
res, err := instAction.RunWithContext(ctx, buildChart(), vals)
is.Error(err)

@ -48,8 +48,7 @@ func NewReleaseTesting(cfg *Configuration) *ReleaseTesting {
}
}
// Run executes 'helm test' against the given release.
func (r *ReleaseTesting) Run(name string) (*release.Release, error) {
func (r *ReleaseTesting) RunWithContext(ctx context.Context, name string) (*release.Release, error) {
if err := r.cfg.KubeClient.IsReachable(); err != nil {
return nil, err
}
@ -88,7 +87,7 @@ func (r *ReleaseTesting) Run(name string) (*release.Release, error) {
rel.Hooks = executingHooks
}
if err := r.cfg.execHook(rel, release.HookTest, r.Timeout); err != nil {
if err := r.cfg.execHook(ctx, rel, release.HookTest); err != nil {
rel.Hooks = append(skippedHooks, rel.Hooks...)
r.cfg.Releases.Update(rel)
return rel, err
@ -98,6 +97,14 @@ func (r *ReleaseTesting) Run(name string) (*release.Release, error) {
return rel, r.cfg.Releases.Update(rel)
}
// Run executes 'helm test' against the given release.
func (r *ReleaseTesting) Run(name string) (*release.Release, error) {
ctx, cancel := context.WithTimeout(context.TODO(), r.Timeout)
defer cancel()
return r.RunWithContext(ctx, name)
}
// GetPodLogs will write the logs for all test pods in the given release into
// the given writer. These can be immediately output to the user or captured for
// other uses

@ -18,6 +18,7 @@ package action
import (
"bytes"
"context"
"fmt"
"strings"
"time"
@ -54,8 +55,7 @@ func NewRollback(cfg *Configuration) *Rollback {
}
}
// Run executes 'helm rollback' against the given release.
func (r *Rollback) Run(name string) error {
func (r *Rollback) RunWithContext(ctx context.Context, name string) error {
if err := r.cfg.KubeClient.IsReachable(); err != nil {
return err
}
@ -76,7 +76,7 @@ func (r *Rollback) Run(name string) error {
}
r.cfg.Log("performing rollback of %s", name)
if _, err := r.performRollback(currentRelease, targetRelease); err != nil {
if _, err := r.performRollback(ctx, currentRelease, targetRelease); err != nil {
return err
}
@ -89,6 +89,14 @@ func (r *Rollback) Run(name string) error {
return nil
}
// Run executes 'helm rollback' against the given release.
func (r *Rollback) Run(name string) error {
ctx, cancel := context.WithTimeout(context.TODO(), r.Timeout)
defer cancel()
return r.RunWithContext(ctx, name)
}
// prepareRollback finds the previous release and prepares a new release object with
// the previous release's configuration
func (r *Rollback) prepareRollback(name string) (*release.Release, *release.Release, error) {
@ -140,7 +148,7 @@ func (r *Rollback) prepareRollback(name string) (*release.Release, *release.Rele
return currentRelease, targetRelease, nil
}
func (r *Rollback) performRollback(currentRelease, targetRelease *release.Release) (*release.Release, error) {
func (r *Rollback) performRollback(ctx context.Context, currentRelease, targetRelease *release.Release) (*release.Release, error) {
if r.DryRun {
r.cfg.Log("dry run for %s", targetRelease.Name)
return targetRelease, nil
@ -157,7 +165,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
// pre-rollback hooks
if !r.DisableHooks {
if err := r.cfg.execHook(targetRelease, release.HookPreRollback, r.Timeout); err != nil {
if err := r.cfg.execHook(ctx, targetRelease, release.HookPreRollback); err != nil {
return targetRelease, err
}
} else {
@ -224,7 +232,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
// post-rollback hooks
if !r.DisableHooks {
if err := r.cfg.execHook(targetRelease, release.HookPostRollback, r.Timeout); err != nil {
if err := r.cfg.execHook(ctx, targetRelease, release.HookPostRollback); err != nil {
return targetRelease, err
}
}

@ -17,6 +17,7 @@ limitations under the License.
package action
import (
"context"
"strings"
"time"
@ -54,6 +55,13 @@ func NewUninstall(cfg *Configuration) *Uninstall {
// Run uninstalls the given release.
func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error) {
ctx, cancel := context.WithTimeout(context.TODO(), u.Timeout)
defer cancel()
return u.RunWithContext(ctx, name)
}
func (u *Uninstall) RunWithContext(ctx context.Context, name string) (*release.UninstallReleaseResponse, error) {
if err := u.cfg.KubeClient.IsReachable(); err != nil {
return nil, err
}
@ -101,7 +109,7 @@ func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error)
res := &release.UninstallReleaseResponse{Release: rel}
if !u.DisableHooks {
if err := u.cfg.execHook(rel, release.HookPreDelete, u.Timeout); err != nil {
if err := u.cfg.execHook(ctx, rel, release.HookPreDelete); err != nil {
return res, err
}
} else {
@ -126,15 +134,15 @@ func (u *Uninstall) Run(name string) (*release.UninstallReleaseResponse, error)
res.Info = kept
if u.Wait {
if kubeClient, ok := u.cfg.KubeClient.(kube.InterfaceExt); ok {
if err := kubeClient.WaitForDelete(deletedResources, u.Timeout); err != nil {
if kubeClient, ok := u.cfg.KubeClient.(kube.ContextInterface); ok {
if err := kubeClient.WaitForDeleteWithContext(ctx, deletedResources); err != nil {
errs = append(errs, err)
}
}
}
if !u.DisableHooks {
if err := u.cfg.execHook(rel, release.HookPostDelete, u.Timeout); err != nil {
if err := u.cfg.execHook(ctx, rel, release.HookPostDelete); err != nil {
errs = append(errs, err)
}
}

@ -130,7 +130,9 @@ func (u *Upgrade) SetRegistryClient(client *registry.Client) {
// Run executes the upgrade on the given release.
func (u *Upgrade) Run(name string, chart *chart.Chart, vals map[string]interface{}) (*release.Release, error) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(context.TODO(), u.Timeout)
defer cancel()
return u.RunWithContext(ctx, name, chart, vals)
}
@ -331,51 +333,16 @@ func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedR
if err := u.cfg.Releases.Create(upgradedRelease); err != nil {
return nil, err
}
rChan := make(chan resultMessage)
ctxChan := make(chan resultMessage)
doneChan := make(chan interface{})
defer close(doneChan)
go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease)
go u.handleContext(ctx, doneChan, ctxChan, upgradedRelease)
select {
case result := <-rChan:
return result.r, result.e
case result := <-ctxChan:
return result.r, result.e
}
}
// Function used to lock the Mutex, this is important for the case when the atomic flag is set.
// In that case the upgrade will finish before the rollback is finished so it is necessary to wait for the rollback to finish.
// The rollback will be trigger by the function failRelease
func (u *Upgrade) reportToPerformUpgrade(c chan<- resultMessage, rel *release.Release, created kube.ResourceList, err error) {
u.Lock.Lock()
if err != nil {
rel, err = u.failRelease(rel, created, err)
}
c <- resultMessage{r: rel, e: err}
u.Lock.Unlock()
return u.releasingUpgrade(ctx, upgradedRelease, current, target, originalRelease)
}
// Setup listener for SIGINT and SIGTERM
func (u *Upgrade) handleContext(ctx context.Context, done chan interface{}, c chan<- resultMessage, upgradedRelease *release.Release) {
select {
case <-ctx.Done():
err := ctx.Err()
// when the atomic flag is set the ongoing release finish first and doesn't give time for the rollback happens.
u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, err)
case <-done:
return
}
}
func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) {
func (u *Upgrade) releasingUpgrade(ctx context.Context, upgradedRelease *release.Release, current, target kube.ResourceList, originalRelease *release.Release) (*release.Release, error) {
// pre-upgrade hooks
if !u.DisableHooks {
if err := u.cfg.execHook(upgradedRelease, release.HookPreUpgrade, u.Timeout); err != nil {
u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %s", err))
return
if err := u.cfg.execHook(ctx, upgradedRelease, release.HookPreUpgrade); err != nil {
return u.failRelease(upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %w", err))
}
} else {
u.cfg.Log("upgrade hooks disabled for %s", upgradedRelease.Name)
@ -384,8 +351,7 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele
results, err := u.cfg.KubeClient.Update(current, target, u.Force)
if err != nil {
u.cfg.recordRelease(originalRelease)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)
return
return u.failRelease(upgradedRelease, results.Created, err)
}
if u.Recreate {
@ -405,23 +371,20 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele
if u.WaitForJobs {
if err := u.cfg.KubeClient.WaitWithJobs(target, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)
return
return u.failRelease(upgradedRelease, results.Created, err)
}
} else {
if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)
return
return u.failRelease(upgradedRelease, results.Created, err)
}
}
}
// post-upgrade hooks
if !u.DisableHooks {
if err := u.cfg.execHook(upgradedRelease, release.HookPostUpgrade, u.Timeout); err != nil {
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %s", err))
return
if err := u.cfg.execHook(ctx, upgradedRelease, release.HookPostUpgrade); err != nil {
return u.failRelease(upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %w", err))
}
}
@ -434,7 +397,8 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele
} else {
upgradedRelease.Info.Description = "Upgrade complete"
}
u.reportToPerformUpgrade(c, upgradedRelease, nil, nil)
return upgradedRelease, nil
}
func (u *Upgrade) failRelease(rel *release.Release, created kube.ResourceList, err error) (*release.Release, error) {

@ -275,6 +275,15 @@ func getResource(info *resource.Info) (runtime.Object, error) {
// Wait waits up to the given timeout for the specified resources to be ready.
func (c *Client) Wait(resources ResourceList, timeout time.Duration) error {
// Helm 4 TODO: remove decarator around WaitWithContext.
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
return c.WaitWithContext(ctx, resources)
}
// WaitWithContext waits up till ctx timeout for the specified resources to be ready.
func (c *Client) WaitWithContext(ctx context.Context, resources ResourceList) error {
cs, err := c.getKubeClient()
if err != nil {
return err
@ -283,13 +292,21 @@ func (c *Client) Wait(resources ResourceList, timeout time.Duration) error {
w := waiter{
c: checker,
log: c.Log,
timeout: timeout,
}
return w.waitForResources(resources)
return w.waitForResources(ctx, resources)
}
// WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs.
func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) error {
// Helm 4 TODO: remove decarator around WaitWithJobsContext.
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
return c.WaitWithJobsContext(ctx, resources)
}
// WaitWithJobsContext wait up to the given ctx timeout for the specified resources to be ready, including jobs.
func (c *Client) WaitWithJobsContext(ctx context.Context, resources ResourceList) error {
cs, err := c.getKubeClient()
if err != nil {
return err
@ -298,18 +315,27 @@ func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) err
w := waiter{
c: checker,
log: c.Log,
timeout: timeout,
}
return w.waitForResources(resources)
return w.waitForResources(ctx, resources)
}
// WaitForDelete wait up to the given timeout for the specified resources to be deleted.
func (c *Client) WaitForDelete(resources ResourceList, timeout time.Duration) error {
// Helm 4 TODO: remove decarator around WaitWithJobsContext.
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
return c.WaitForDeleteWithContext(ctx, resources)
}
// WaitForDelete wait up to the given timeout for the specified resources to be deleted.
func (c *Client) WaitForDeleteWithContext(ctx context.Context, resources ResourceList) error {
w := waiter{
log: c.Log,
timeout: timeout,
}
return w.waitForDeletedResources(resources)
return w.waitForDeletedResources(ctx, resources)
}
func (c *Client) namespace() string {
@ -506,9 +532,9 @@ func delete(c *Client, resources ResourceList, propagation metav1.DeletionPropag
return res, nil
}
func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error {
func (c *Client) watchTimeout(ctx context.Context) func(*resource.Info) error {
return func(info *resource.Info) error {
return c.watchUntilReady(t, info)
return c.watchUntilReady(ctx, info)
}
}
@ -527,9 +553,18 @@ func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error {
//
// Handling for other kinds will be added as necessary.
func (c *Client) WatchUntilReady(resources ResourceList, timeout time.Duration) error {
// Helm 4 TODO: remove decarator around WatchUntilReadyWithContext.
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
// For jobs, there's also the option to do poll c.Jobs(namespace).Get():
// https://github.com/adamreese/kubernetes/blob/master/test/e2e/job.go#L291-L300
return perform(resources, c.watchTimeout(timeout))
return c.WatchUntilReadyWithContext(ctx, resources)
}
// WatchUntilReadyWithContext -.
func (c *Client) WatchUntilReadyWithContext(ctx context.Context, resources ResourceList) error {
return perform(resources, c.watchTimeout(ctx))
}
func perform(infos ResourceList, fn func(*resource.Info) error) error {
@ -594,6 +629,7 @@ func createResource(info *resource.Info) error {
if err != nil {
return err
}
return info.Refresh(obj, true)
}
@ -695,7 +731,7 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object,
return nil
}
func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error {
func (c *Client) watchUntilReady(ctx context.Context, info *resource.Info) error {
kind := info.Mapping.GroupVersionKind.Kind
switch kind {
case "Job", "Pod":
@ -703,7 +739,7 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err
return nil
}
c.Log("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeout)
c.Log("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeFromCtx(ctx))
// Use a selector on the name of the resource. This should be unique for the
// given version and kind
@ -719,8 +755,6 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err
// In the future, we might want to add some special logic for types
// like Ingress, Volume, etc.
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel()
_, err = watchtools.UntilWithSync(ctx, lw, &unstructured.Unstructured{}, nil, func(e watch.Event) (bool, error) {
// Make sure the incoming object is versioned as we use unstructured
// objects when we build manifests
@ -811,6 +845,35 @@ func scrubValidationError(err error) error {
return err
}
func (c *Client) WaitAndGetCompletedPodPhaseWithContext(ctx context.Context, name string) (v1.PodPhase, error) {
client, err := c.getKubeClient()
if err != nil {
return v1.PodUnknown, err
}
watcher, err := client.CoreV1().Pods(c.namespace()).Watch(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
})
if err != nil {
return v1.PodUnknown, err
}
for event := range watcher.ResultChan() {
p, ok := event.Object.(*v1.Pod)
if !ok {
return v1.PodUnknown, fmt.Errorf("%s not a pod", name)
}
switch p.Status.Phase {
case v1.PodFailed:
return v1.PodFailed, nil
case v1.PodSucceeded:
return v1.PodSucceeded, nil
}
}
return v1.PodUnknown, err
}
// WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase
// and returns said phase (PodSucceeded or PodFailed qualify).
func (c *Client) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) {

@ -18,6 +18,7 @@ limitations under the License.
package fake
import (
"context"
"io"
"time"
@ -74,6 +75,15 @@ func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration) e
return f.PrintingKubeClient.Wait(resources, d)
}
// Waits the amount of time defined on f.WaitDuration, then returns the configured error if set or prints.
func (f *FailingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList) error {
time.Sleep(f.WaitDuration)
if f.WaitError != nil {
return f.WaitError
}
return f.PrintingKubeClient.Wait(resources, 0)
}
// WaitWithJobs returns the configured error if set or prints
func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Duration) error {
if f.WaitError != nil {
@ -82,6 +92,14 @@ func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Dur
return f.PrintingKubeClient.WaitWithJobs(resources, d)
}
// WaitWithJobs returns the configured error if set or prints
func (f *FailingKubeClient) WaitWithJobsContext(ctx context.Context, resources kube.ResourceList) error {
if f.WaitError != nil {
return f.WaitError
}
return f.PrintingKubeClient.WaitWithJobsContext(ctx, resources)
}
// WaitForDelete returns the configured error if set or prints
func (f *FailingKubeClient) WaitForDelete(resources kube.ResourceList, d time.Duration) error {
if f.WaitError != nil {
@ -90,6 +108,14 @@ func (f *FailingKubeClient) WaitForDelete(resources kube.ResourceList, d time.Du
return f.PrintingKubeClient.WaitForDelete(resources, d)
}
// WaitForDeleteWithContext returns the configured error if set or prints
func (f *FailingKubeClient) WaitForDeleteWithContext(ctx context.Context, resources kube.ResourceList) error {
if f.WaitError != nil {
return f.WaitError
}
return f.PrintingKubeClient.WaitForDelete(resources, 0)
}
// Delete returns the configured error if set or prints
func (f *FailingKubeClient) Delete(resources kube.ResourceList) (*kube.Result, []error) {
if f.DeleteError != nil {
@ -141,6 +167,14 @@ func (f *FailingKubeClient) WaitAndGetCompletedPodPhase(s string, d time.Duratio
return f.PrintingKubeClient.WaitAndGetCompletedPodPhase(s, d)
}
// WaitAndGetCompletedPodPhaseWithContext returns the configured error if set or prints
func (f *FailingKubeClient) WaitAndGetCompletedPodPhaseWithContext(ctx context.Context, s string) (v1.PodPhase, error) {
if f.WaitAndGetCompletedPodPhaseError != nil {
return v1.PodSucceeded, f.WaitAndGetCompletedPodPhaseError
}
return f.PrintingKubeClient.WaitAndGetCompletedPodPhase(s, 0)
}
// DeleteWithPropagationPolicy returns the configured error if set or prints
func (f *FailingKubeClient) DeleteWithPropagationPolicy(resources kube.ResourceList, policy metav1.DeletionPropagation) (*kube.Result, []error) {
if f.DeleteWithPropagationError != nil {
@ -149,6 +183,14 @@ func (f *FailingKubeClient) DeleteWithPropagationPolicy(resources kube.ResourceL
return f.PrintingKubeClient.DeleteWithPropagationPolicy(resources, policy)
}
func (f *FailingKubeClient) WatchUntilReadyWithContext(ctx context.Context, resources kube.ResourceList) error {
if f.WatchUntilReadyError != nil {
return f.WatchUntilReadyError
}
return f.PrintingKubeClient.WatchUntilReady(resources, 0)
}
func createDummyResourceList() kube.ResourceList {
var resInfo resource.Info
resInfo.Name = "dummyName"
@ -158,3 +200,10 @@ func createDummyResourceList() kube.ResourceList {
return resourceList
}
// compile time check that FailingKubeClient satiesfies our interfaces.
var (
_ kube.Interface = &FailingKubeClient{}
_ kube.ContextInterface = &FailingKubeClient{}
_ kube.InterfaceExt = &FailingKubeClient{}
)

@ -17,6 +17,7 @@ limitations under the License.
package fake
import (
"context"
"io"
"strings"
"time"
@ -62,16 +63,31 @@ func (p *PrintingKubeClient) Wait(resources kube.ResourceList, _ time.Duration)
return err
}
func (p *PrintingKubeClient) WaitWithContext(ctx context.Context, resources kube.ResourceList) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
}
func (p *PrintingKubeClient) WaitWithJobs(resources kube.ResourceList, _ time.Duration) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
}
func (p *PrintingKubeClient) WaitWithJobsContext(ctx context.Context, resources kube.ResourceList) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
}
func (p *PrintingKubeClient) WaitForDelete(resources kube.ResourceList, _ time.Duration) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
}
func (p *PrintingKubeClient) WaitForDeleteWithContext(ctx context.Context, resources kube.ResourceList) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
}
// Delete implements KubeClient delete.
//
// It only prints out the content to be deleted.
@ -89,6 +105,12 @@ func (p *PrintingKubeClient) WatchUntilReady(resources kube.ResourceList, _ time
return err
}
// WatchUntilReadyWithContext implements KubeClient WatchUntilReadyWithContext.
func (p *PrintingKubeClient) WatchUntilReadyWithContext(ctx context.Context, resources kube.ResourceList) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
}
// Update implements KubeClient Update.
func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ bool) (*kube.Result, error) {
_, err := io.Copy(p.Out, bufferize(modified))
@ -116,6 +138,11 @@ func (p *PrintingKubeClient) WaitAndGetCompletedPodPhase(_ string, _ time.Durati
return v1.PodSucceeded, nil
}
// WaitAndGetCompletedPodPhaseWithContext implements KubeClient WaitAndGetCompletedPodPhaseWithContext.
func (p *PrintingKubeClient) WaitAndGetCompletedPodPhaseWithContext(_ context.Context, _ string) (v1.PodPhase, error) {
return v1.PodSucceeded, nil
}
// DeleteWithPropagationPolicy implements KubeClient delete.
//
// It only prints out the content to be deleted.
@ -134,3 +161,10 @@ func bufferize(resources kube.ResourceList) io.Reader {
}
return strings.NewReader(builder.String())
}
// compile time check that PrintingKubeClient satiesfies our interfaces.
var (
_ kube.Interface = &PrintingKubeClient{}
_ kube.ContextInterface = &PrintingKubeClient{}
_ kube.InterfaceExt = &PrintingKubeClient{}
)

@ -17,6 +17,7 @@ limitations under the License.
package kube
import (
"context"
"io"
"time"
@ -72,6 +73,19 @@ type Interface interface {
IsReachable() error
}
// ContextInterface is introduced to avoid breaking backwards compatability for Interface implementers.
//
// TODO Helm 4: Replace Interface methods that accept a time.Duration as an argument, with a context.
type ContextInterface interface {
// WaitWithContext waits till a ctx timeout for the specified resources to be ready.
WaitWithContext(ctx context.Context, resources ResourceList) error
// WaitWithJobsContext wait up to the given ctx timeout for the specified resources to be ready, including jobs.
WaitWithJobsContext(ctx context.Context, resources ResourceList) error
WatchUntilReadyWithContext(context.Context, ResourceList) error
WaitAndGetCompletedPodPhaseWithContext(context.Context, string) (v1.PodPhase, error)
WaitForDeleteWithContext(context.Context, ResourceList) error
}
// InterfaceExt is introduced to avoid breaking backwards compatibility for Interface implementers.
//
// TODO Helm 4: Remove InterfaceExt and integrate its method(s) into the Interface.
@ -112,5 +126,6 @@ type InterfaceResources interface {
var _ Interface = (*Client)(nil)
var _ InterfaceExt = (*Client)(nil)
var _ ContextInterface = (*Client)(nil)
var _ InterfaceDeletionPropagation = (*Client)(nil)
var _ InterfaceResources = (*Client)(nil)

@ -38,17 +38,21 @@ import (
type waiter struct {
c ReadyChecker
timeout time.Duration
log func(string, ...interface{})
}
// timeFromCtx extracts time until deadline of ctx.
func timeFromCtx(ctx context.Context) time.Duration {
deadline, _ := ctx.Deadline()
return time.Until(deadline)
}
// waitForResources polls to get the current status of all pods, PVCs, Services and
// Jobs(optional) until all are ready or a timeout is reached
func (w *waiter) waitForResources(created ResourceList) error {
w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout)
func (w *waiter) waitForResources(ctx context.Context, created ResourceList) error {
ctx, cancel := context.WithTimeout(context.Background(), w.timeout)
defer cancel()
w.log("beginning wait for %d resources with timeout of %v", len(created), timeFromCtx(ctx))
return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) {
for _, v := range created {
@ -62,11 +66,8 @@ func (w *waiter) waitForResources(created ResourceList) error {
}
// waitForDeletedResources polls to check if all the resources are deleted or a timeout is reached
func (w *waiter) waitForDeletedResources(deleted ResourceList) error {
w.log("beginning wait for %d resources to be deleted with timeout of %v", len(deleted), w.timeout)
ctx, cancel := context.WithTimeout(context.Background(), w.timeout)
defer cancel()
func (w *waiter) waitForDeletedResources(ctx context.Context, deleted ResourceList) error {
w.log("beginning wait for %d resources to be deleted with timeout of %v", len(deleted), timeFromCtx(ctx))
return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) {
for _, v := range deleted {

Loading…
Cancel
Save