add wait-for-jobs flag

Signed-off-by: zh168654 <zhangye.168@163.com>
pull/8363/head
zh168654 4 years ago
parent bd03e1b5c7
commit 957d2a2bf9

@ -140,6 +140,7 @@ func addInstallFlags(cmd *cobra.Command, f *pflag.FlagSet, client *action.Instal
f.BoolVar(&client.Replace, "replace", false, "re-use the given name, only if that name is a deleted release which remains in the history. This is unsafe in production")
f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)")
f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout")
f.BoolVar(&client.WaitForJobs, "wait-for-jobs", false, "if set and --wait enabled, will wait until all Jobs have been completed before marking the release as successful. It will wait for as long as --timeout")
f.BoolVarP(&client.GenerateName, "generate-name", "g", false, "generate the name (and omit the NAME parameter)")
f.StringVar(&client.NameTemplate, "name-template", "", "specify template used to name the release")
f.StringVar(&client.Description, "description", "", "add a custom description")

@ -85,6 +85,12 @@ func TestInstall(t *testing.T) {
cmd: "install apollo testdata/testcharts/empty --wait",
golden: "output/install-with-wait.txt",
},
// Install, with wait-for-jobs
{
name: "install with wait-for-jobs",
cmd: "install apollo testdata/testcharts/empty --wait --wait-for-jobs",
golden: "output/install-with-wait-for-jobs.txt",
},
// Install, using the name-template
{
name: "install with name-template",

@ -82,6 +82,7 @@ func newRollbackCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
f.BoolVar(&client.DisableHooks, "no-hooks", false, "prevent hooks from running during rollback")
f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)")
f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout")
f.BoolVar(&client.WaitForJobs, "wait-for-jobs", false, "if set and --wait enabled, will wait until all Jobs have been completed before marking the release as successful. It will wait for as long as --timeout")
f.BoolVar(&client.CleanupOnFail, "cleanup-on-fail", false, "allow deletion of new resources created in this rollback when rollback fails")
f.IntVar(&client.MaxHistory, "history-max", settings.MaxHistory, "limit the maximum number of revisions saved per release. Use 0 for no limit")

@ -54,6 +54,11 @@ func TestRollbackCmd(t *testing.T) {
cmd: "rollback funny-honey 1 --wait",
golden: "output/rollback-wait.txt",
rels: rels,
}, {
name: "rollback a release with wait-for-jobs",
cmd: "rollback funny-honey 1 --wait --wait-for-jobs",
golden: "output/rollback-wait-for-jobs.txt",
rels: rels,
}, {
name: "rollback a release without revision",
cmd: "rollback funny-honey",

@ -0,0 +1,6 @@
NAME: apollo
LAST DEPLOYED: Fri Sep 2 22:04:05 1977
NAMESPACE: default
STATUS: deployed
REVISION: 1
TEST SUITE: None

@ -0,0 +1 @@
Rollback was a success! Happy Helming!

@ -0,0 +1,7 @@
Release "crazy-bunny" has been upgraded. Happy Helming!
NAME: crazy-bunny
LAST DEPLOYED: Fri Sep 2 22:04:05 1977
NAMESPACE: default
STATUS: deployed
REVISION: 3
TEST SUITE: None

@ -103,6 +103,7 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
instClient.SkipCRDs = client.SkipCRDs
instClient.Timeout = client.Timeout
instClient.Wait = client.Wait
instClient.WaitForJobs = client.WaitForJobs
instClient.Devel = client.Devel
instClient.Namespace = client.Namespace
instClient.Atomic = client.Atomic
@ -179,6 +180,7 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
f.BoolVar(&client.ResetValues, "reset-values", false, "when upgrading, reset the values to the ones built into the chart")
f.BoolVar(&client.ReuseValues, "reuse-values", false, "when upgrading, reuse the last release's values and merge in any overrides from the command line via --set and -f. If '--reset-values' is specified, this is ignored")
f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout")
f.BoolVar(&client.WaitForJobs, "wait-for-jobs", false, "if set and --wait enabled, will wait until all Jobs have been completed before marking the release as successful. It will wait for as long as --timeout")
f.BoolVar(&client.Atomic, "atomic", false, "if set, upgrade process rolls back changes made in case of failed upgrade. The --wait flag will be set automatically if --atomic is used")
f.IntVar(&client.MaxHistory, "history-max", settings.MaxHistory, "limit the maximum number of revisions saved per release. Use 0 for no limit")
f.BoolVar(&client.CleanupOnFail, "cleanup-on-fail", false, "allow deletion of new resources created in this upgrade when upgrade fails")

@ -131,6 +131,12 @@ func TestUpgradeCmd(t *testing.T) {
golden: "output/upgrade-with-wait.txt",
rels: []*release.Release{relMock("crazy-bunny", 2, ch2)},
},
{
name: "upgrade a release with wait-for-jobs",
cmd: fmt.Sprintf("upgrade crazy-bunny --wait --wait-for-jobs '%s'", chartPath),
golden: "output/upgrade-with-wait-for-jobs.txt",
rels: []*release.Release{relMock("crazy-bunny", 2, ch2)},
},
{
name: "upgrade a release with missing dependencies",
cmd: fmt.Sprintf("upgrade bonkers-bunny %s", missingDepsPath),

@ -77,6 +77,7 @@ type Install struct {
DisableHooks bool
Replace bool
Wait bool
WaitForJobs bool
Devel bool
DependencyUpdate bool
Timeout time.Duration
@ -156,7 +157,7 @@ func (i *Install) installCRDs(crds []chart.CRD) error {
discoveryClient.Invalidate()
// Give time for the CRD to be recognized.
if err := i.cfg.KubeClient.Wait(totalItems, 60*time.Second); err != nil {
if err := i.cfg.KubeClient.Wait(totalItems, 60*time.Second, false); err != nil {
return err
}
@ -345,10 +346,9 @@ func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.
}
if i.Wait {
if err := i.cfg.KubeClient.Wait(resources, i.Timeout); err != nil {
if err := i.cfg.KubeClient.Wait(resources, i.Timeout, i.WaitForJobs); err != nil {
return i.failRelease(rel, err)
}
}
if !i.DisableHooks {

@ -362,6 +362,23 @@ func TestInstallRelease_Wait(t *testing.T) {
is.Equal(res.Info.Status, release.StatusFailed)
}
func TestInstallRelease_WaitForJobs(t *testing.T) {
is := assert.New(t)
instAction := installAction(t)
instAction.ReleaseName = "come-fail-away"
failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitError = fmt.Errorf("I timed out")
instAction.cfg.KubeClient = failer
instAction.Wait = true
instAction.WaitForJobs = true
vals := map[string]interface{}{}
res, err := instAction.Run(buildChart(), vals)
is.Error(err)
is.Contains(res.Info.Description, "I timed out")
is.Equal(res.Info.Status, release.StatusFailed)
}
func TestInstallRelease_Atomic(t *testing.T) {
is := assert.New(t)

@ -38,6 +38,7 @@ type Rollback struct {
Version int
Timeout time.Duration
Wait bool
WaitForJobs bool
DisableHooks bool
DryRun bool
Recreate bool // will (if true) recreate pods after a rollback.
@ -199,7 +200,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
}
if r.Wait {
if err := r.cfg.KubeClient.Wait(target, r.Timeout); err != nil {
if err := r.cfg.KubeClient.Wait(target, r.Timeout, r.WaitForJobs); err != nil {
targetRelease.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", targetRelease.Name, err.Error()))
r.cfg.recordRelease(currentRelease)
r.cfg.recordRelease(targetRelease)

@ -64,6 +64,8 @@ type Upgrade struct {
Timeout time.Duration
// Wait determines whether the wait operation should be performed after the upgrade is requested.
Wait bool
// WaitForJobs determines whether the wait operation for the Jobs should be performed after the upgrade is requested.
WaitForJobs bool
// DisableHooks disables hook processing if set to true.
DisableHooks bool
// DryRun controls whether the operation is prepared, but not executed.
@ -329,7 +331,7 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea
}
if u.Wait {
if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil {
if err := u.cfg.KubeClient.Wait(target, u.Timeout, u.WaitForJobs); err != nil {
u.cfg.recordRelease(originalRelease)
return u.failRelease(upgradedRelease, results.Created, err)
}
@ -400,6 +402,7 @@ func (u *Upgrade) failRelease(rel *release.Release, created kube.ResourceList, e
rollin := NewRollback(u.cfg)
rollin.Version = filteredHistory[0].Version
rollin.Wait = true
rollin.WaitForJobs = u.WaitForJobs
rollin.DisableHooks = u.DisableHooks
rollin.Recreate = u.Recreate
rollin.Force = u.Force

@ -60,6 +60,29 @@ func TestUpgradeRelease_Wait(t *testing.T) {
is.Equal(res.Info.Status, release.StatusFailed)
}
func TestUpgradeRelease_WaitForJobs(t *testing.T) {
is := assert.New(t)
req := require.New(t)
upAction := upgradeAction(t)
rel := releaseStub()
rel.Name = "come-fail-away"
rel.Info.Status = release.StatusDeployed
upAction.cfg.Releases.Create(rel)
failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitError = fmt.Errorf("I timed out")
upAction.cfg.KubeClient = failer
upAction.Wait = true
upAction.WaitForJobs = true
vals := map[string]interface{}{}
res, err := upAction.Run(rel.Name, buildChart(), vals)
req.Error(err)
is.Contains(res.Info.Description, "I timed out")
is.Equal(res.Info.Status, release.StatusFailed)
}
func TestUpgradeRelease_CleanupOnFail(t *testing.T) {
is := assert.New(t)
req := require.New(t)

@ -127,7 +127,7 @@ func (c *Client) Create(resources ResourceList) (*Result, error) {
}
// Wait up to the given timeout for the specified resources to be ready
func (c *Client) Wait(resources ResourceList, timeout time.Duration) error {
func (c *Client) Wait(resources ResourceList, timeout time.Duration, waitForJobsEnabled bool) error {
cs, err := c.getKubeClient()
if err != nil {
return err
@ -137,7 +137,7 @@ func (c *Client) Wait(resources ResourceList, timeout time.Duration) error {
log: c.Log,
timeout: timeout,
}
return w.waitForResources(resources)
return w.waitForResources(resources, waitForJobsEnabled)
}
func (c *Client) namespace() string {

@ -51,11 +51,11 @@ func (f *FailingKubeClient) Create(resources kube.ResourceList) (*kube.Result, e
}
// Wait returns the configured error if set or prints
func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration) error {
func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration, waitForJobsEnabled bool) error {
if f.WaitError != nil {
return f.WaitError
}
return f.PrintingKubeClient.Wait(resources, d)
return f.PrintingKubeClient.Wait(resources, d, waitForJobsEnabled)
}
// Delete returns the configured error if set or prints

@ -47,7 +47,7 @@ func (p *PrintingKubeClient) Create(resources kube.ResourceList) (*kube.Result,
return &kube.Result{Created: resources}, nil
}
func (p *PrintingKubeClient) Wait(resources kube.ResourceList, _ time.Duration) error {
func (p *PrintingKubeClient) Wait(resources kube.ResourceList, _ time.Duration, _ bool) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
}

@ -30,7 +30,7 @@ type Interface interface {
// Create creates one or more resources.
Create(resources ResourceList) (*Result, error)
Wait(resources ResourceList, timeout time.Duration) error
Wait(resources ResourceList, timeout time.Duration, waitForJobsEnabled bool) error
// Delete destroys one or more resources.
Delete(resources ResourceList) (*Result, []error)

@ -47,9 +47,9 @@ type waiter struct {
log func(string, ...interface{})
}
// waitForResources polls to get the current status of all pods, PVCs, and Services
// until all are ready or a timeout is reached
func (w *waiter) waitForResources(created ResourceList) error {
// waitForResources polls to get the current status of all pods, PVCs, Services and
// Jobs(optional) until all are ready or a timeout is reached
func (w *waiter) waitForResources(created ResourceList, waitForJobsEnabled bool) error {
w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout)
return wait.Poll(2*time.Second, w.timeout, func() (bool, error) {
@ -68,10 +68,12 @@ func (w *waiter) waitForResources(created ResourceList) error {
return false, err
}
case *batchv1.Job:
if waitForJobsEnabled {
job, err := w.c.BatchV1().Jobs(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{})
if err != nil || !w.jobReady(job) {
return false, err
}
}
case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment:
currentDeployment, err := w.c.AppsV1().Deployments(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{})
if err != nil {

Loading…
Cancel
Save