Merge pull request #8363 from zh168654/master

helm upgrade with --wait support jobs in manifest to be completed
pull/9139/head
Matt Farina 4 years ago committed by GitHub
commit f5ef87b96e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -140,6 +140,7 @@ func addInstallFlags(cmd *cobra.Command, f *pflag.FlagSet, client *action.Instal
f.BoolVar(&client.Replace, "replace", false, "re-use the given name, only if that name is a deleted release which remains in the history. This is unsafe in production")
f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)")
f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout")
f.BoolVar(&client.WaitForJobs, "wait-for-jobs", false, "if set and --wait enabled, will wait until all Jobs have been completed before marking the release as successful. It will wait for as long as --timeout")
f.BoolVarP(&client.GenerateName, "generate-name", "g", false, "generate the name (and omit the NAME parameter)")
f.StringVar(&client.NameTemplate, "name-template", "", "specify template used to name the release")
f.StringVar(&client.Description, "description", "", "add a custom description")

@ -85,6 +85,12 @@ func TestInstall(t *testing.T) {
cmd: "install apollo testdata/testcharts/empty --wait",
golden: "output/install-with-wait.txt",
},
// Install, with wait-for-jobs
{
name: "install with wait-for-jobs",
cmd: "install apollo testdata/testcharts/empty --wait --wait-for-jobs",
golden: "output/install-with-wait-for-jobs.txt",
},
// Install, using the name-template
{
name: "install with name-template",

@ -82,6 +82,7 @@ func newRollbackCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
f.BoolVar(&client.DisableHooks, "no-hooks", false, "prevent hooks from running during rollback")
f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)")
f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout")
f.BoolVar(&client.WaitForJobs, "wait-for-jobs", false, "if set and --wait enabled, will wait until all Jobs have been completed before marking the release as successful. It will wait for as long as --timeout")
f.BoolVar(&client.CleanupOnFail, "cleanup-on-fail", false, "allow deletion of new resources created in this rollback when rollback fails")
f.IntVar(&client.MaxHistory, "history-max", settings.MaxHistory, "limit the maximum number of revisions saved per release. Use 0 for no limit")

@ -54,6 +54,11 @@ func TestRollbackCmd(t *testing.T) {
cmd: "rollback funny-honey 1 --wait",
golden: "output/rollback-wait.txt",
rels: rels,
}, {
name: "rollback a release with wait-for-jobs",
cmd: "rollback funny-honey 1 --wait --wait-for-jobs",
golden: "output/rollback-wait-for-jobs.txt",
rels: rels,
}, {
name: "rollback a release without revision",
cmd: "rollback funny-honey",

@ -0,0 +1,6 @@
NAME: apollo
LAST DEPLOYED: Fri Sep 2 22:04:05 1977
NAMESPACE: default
STATUS: deployed
REVISION: 1
TEST SUITE: None

@ -0,0 +1 @@
Rollback was a success! Happy Helming!

@ -0,0 +1,7 @@
Release "crazy-bunny" has been upgraded. Happy Helming!
NAME: crazy-bunny
LAST DEPLOYED: Fri Sep 2 22:04:05 1977
NAMESPACE: default
STATUS: deployed
REVISION: 3
TEST SUITE: None

@ -103,6 +103,7 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
instClient.SkipCRDs = client.SkipCRDs
instClient.Timeout = client.Timeout
instClient.Wait = client.Wait
instClient.WaitForJobs = client.WaitForJobs
instClient.Devel = client.Devel
instClient.Namespace = client.Namespace
instClient.Atomic = client.Atomic
@ -179,6 +180,7 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
f.BoolVar(&client.ResetValues, "reset-values", false, "when upgrading, reset the values to the ones built into the chart")
f.BoolVar(&client.ReuseValues, "reuse-values", false, "when upgrading, reuse the last release's values and merge in any overrides from the command line via --set and -f. If '--reset-values' is specified, this is ignored")
f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout")
f.BoolVar(&client.WaitForJobs, "wait-for-jobs", false, "if set and --wait enabled, will wait until all Jobs have been completed before marking the release as successful. It will wait for as long as --timeout")
f.BoolVar(&client.Atomic, "atomic", false, "if set, upgrade process rolls back changes made in case of failed upgrade. The --wait flag will be set automatically if --atomic is used")
f.IntVar(&client.MaxHistory, "history-max", settings.MaxHistory, "limit the maximum number of revisions saved per release. Use 0 for no limit")
f.BoolVar(&client.CleanupOnFail, "cleanup-on-fail", false, "allow deletion of new resources created in this upgrade when upgrade fails")

@ -131,6 +131,12 @@ func TestUpgradeCmd(t *testing.T) {
golden: "output/upgrade-with-wait.txt",
rels: []*release.Release{relMock("crazy-bunny", 2, ch2)},
},
{
name: "upgrade a release with wait-for-jobs",
cmd: fmt.Sprintf("upgrade crazy-bunny --wait --wait-for-jobs '%s'", chartPath),
golden: "output/upgrade-with-wait-for-jobs.txt",
rels: []*release.Release{relMock("crazy-bunny", 2, ch2)},
},
{
name: "upgrade a release with missing dependencies",
cmd: fmt.Sprintf("upgrade bonkers-bunny %s", missingDepsPath),

@ -77,6 +77,7 @@ type Install struct {
DisableHooks bool
Replace bool
Wait bool
WaitForJobs bool
Devel bool
DependencyUpdate bool
Timeout time.Duration
@ -345,10 +346,15 @@ func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.
}
if i.Wait {
if err := i.cfg.KubeClient.Wait(resources, i.Timeout); err != nil {
return i.failRelease(rel, err)
if i.WaitForJobs {
if err := i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout); err != nil {
return i.failRelease(rel, err)
}
} else {
if err := i.cfg.KubeClient.Wait(resources, i.Timeout); err != nil {
return i.failRelease(rel, err)
}
}
}
if !i.DisableHooks {

@ -362,6 +362,23 @@ func TestInstallRelease_Wait(t *testing.T) {
is.Equal(res.Info.Status, release.StatusFailed)
}
func TestInstallRelease_WaitForJobs(t *testing.T) {
is := assert.New(t)
instAction := installAction(t)
instAction.ReleaseName = "come-fail-away"
failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitError = fmt.Errorf("I timed out")
instAction.cfg.KubeClient = failer
instAction.Wait = true
instAction.WaitForJobs = true
vals := map[string]interface{}{}
res, err := instAction.Run(buildChart(), vals)
is.Error(err)
is.Contains(res.Info.Description, "I timed out")
is.Equal(res.Info.Status, release.StatusFailed)
}
func TestInstallRelease_Atomic(t *testing.T) {
is := assert.New(t)

@ -38,6 +38,7 @@ type Rollback struct {
Version int
Timeout time.Duration
Wait bool
WaitForJobs bool
DisableHooks bool
DryRun bool
Recreate bool // will (if true) recreate pods after a rollback.
@ -199,11 +200,20 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
}
if r.Wait {
if err := r.cfg.KubeClient.Wait(target, r.Timeout); err != nil {
targetRelease.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", targetRelease.Name, err.Error()))
r.cfg.recordRelease(currentRelease)
r.cfg.recordRelease(targetRelease)
return targetRelease, errors.Wrapf(err, "release %s failed", targetRelease.Name)
if r.WaitForJobs {
if err := r.cfg.KubeClient.WaitWithJobs(target, r.Timeout); err != nil {
targetRelease.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", targetRelease.Name, err.Error()))
r.cfg.recordRelease(currentRelease)
r.cfg.recordRelease(targetRelease)
return targetRelease, errors.Wrapf(err, "release %s failed", targetRelease.Name)
}
} else {
if err := r.cfg.KubeClient.Wait(target, r.Timeout); err != nil {
targetRelease.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", targetRelease.Name, err.Error()))
r.cfg.recordRelease(currentRelease)
r.cfg.recordRelease(targetRelease)
return targetRelease, errors.Wrapf(err, "release %s failed", targetRelease.Name)
}
}
}

@ -64,6 +64,8 @@ type Upgrade struct {
Timeout time.Duration
// Wait determines whether the wait operation should be performed after the upgrade is requested.
Wait bool
// WaitForJobs determines whether the wait operation for the Jobs should be performed after the upgrade is requested.
WaitForJobs bool
// DisableHooks disables hook processing if set to true.
DisableHooks bool
// DryRun controls whether the operation is prepared, but not executed.
@ -329,9 +331,16 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea
}
if u.Wait {
if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease)
return u.failRelease(upgradedRelease, results.Created, err)
if u.WaitForJobs {
if err := u.cfg.KubeClient.WaitWithJobs(target, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease)
return u.failRelease(upgradedRelease, results.Created, err)
}
} else {
if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease)
return u.failRelease(upgradedRelease, results.Created, err)
}
}
}
@ -400,6 +409,7 @@ func (u *Upgrade) failRelease(rel *release.Release, created kube.ResourceList, e
rollin := NewRollback(u.cfg)
rollin.Version = filteredHistory[0].Version
rollin.Wait = true
rollin.WaitForJobs = u.WaitForJobs
rollin.DisableHooks = u.DisableHooks
rollin.Recreate = u.Recreate
rollin.Force = u.Force

@ -60,6 +60,29 @@ func TestUpgradeRelease_Wait(t *testing.T) {
is.Equal(res.Info.Status, release.StatusFailed)
}
func TestUpgradeRelease_WaitForJobs(t *testing.T) {
is := assert.New(t)
req := require.New(t)
upAction := upgradeAction(t)
rel := releaseStub()
rel.Name = "come-fail-away"
rel.Info.Status = release.StatusDeployed
upAction.cfg.Releases.Create(rel)
failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitError = fmt.Errorf("I timed out")
upAction.cfg.KubeClient = failer
upAction.Wait = true
upAction.WaitForJobs = true
vals := map[string]interface{}{}
res, err := upAction.Run(rel.Name, buildChart(), vals)
req.Error(err)
is.Contains(res.Info.Description, "I timed out")
is.Equal(res.Info.Status, release.StatusFailed)
}
func TestUpgradeRelease_CleanupOnFail(t *testing.T) {
is := assert.New(t)
req := require.New(t)

@ -137,7 +137,21 @@ func (c *Client) Wait(resources ResourceList, timeout time.Duration) error {
log: c.Log,
timeout: timeout,
}
return w.waitForResources(resources)
return w.waitForResources(resources, false)
}
// WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs.
func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) error {
cs, err := c.getKubeClient()
if err != nil {
return err
}
w := waiter{
c: cs,
log: c.Log,
timeout: timeout,
}
return w.waitForResources(resources, true)
}
func (c *Client) namespace() string {

@ -58,6 +58,14 @@ func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration) e
return f.PrintingKubeClient.Wait(resources, d)
}
// WaitWithJobs returns the configured error if set or prints
func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Duration) error {
if f.WaitError != nil {
return f.WaitError
}
return f.PrintingKubeClient.Wait(resources, d)
}
// Delete returns the configured error if set or prints
func (f *FailingKubeClient) Delete(resources kube.ResourceList) (*kube.Result, []error) {
if f.DeleteError != nil {

@ -52,6 +52,11 @@ func (p *PrintingKubeClient) Wait(resources kube.ResourceList, _ time.Duration)
return err
}
func (p *PrintingKubeClient) WaitWithJobs(resources kube.ResourceList, _ time.Duration) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
}
// Delete implements KubeClient delete.
//
// It only prints out the content to be deleted.

@ -32,6 +32,8 @@ type Interface interface {
Wait(resources ResourceList, timeout time.Duration) error
WaitWithJobs(resources ResourceList, timeout time.Duration) error
// Delete destroys one or more resources.
Delete(resources ResourceList) (*Result, []error)

@ -47,9 +47,9 @@ type waiter struct {
log func(string, ...interface{})
}
// waitForResources polls to get the current status of all pods, PVCs, and Services
// until all are ready or a timeout is reached
func (w *waiter) waitForResources(created ResourceList) error {
// waitForResources polls to get the current status of all pods, PVCs, Services and
// Jobs(optional) until all are ready or a timeout is reached
func (w *waiter) waitForResources(created ResourceList, waitForJobsEnabled bool) error {
w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout)
return wait.Poll(2*time.Second, w.timeout, func() (bool, error) {
@ -67,6 +67,13 @@ func (w *waiter) waitForResources(created ResourceList) error {
if err != nil || !w.isPodReady(pod) {
return false, err
}
case *batchv1.Job:
if waitForJobsEnabled {
job, err := w.c.BatchV1().Jobs(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{})
if err != nil || !w.jobReady(job) {
return false, err
}
}
case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment:
currentDeployment, err := w.c.AppsV1().Deployments(v.Namespace).Get(context.Background(), v.Name, metav1.GetOptions{})
if err != nil {
@ -182,6 +189,18 @@ func (w *waiter) isPodReady(pod *corev1.Pod) bool {
return false
}
func (w *waiter) jobReady(job *batchv1.Job) bool {
if job.Status.Failed >= *job.Spec.BackoffLimit {
w.log("Job is failed: %s/%s", job.GetNamespace(), job.GetName())
return false
}
if job.Status.Succeeded < *job.Spec.Completions {
w.log("Job is not completed: %s/%s", job.GetNamespace(), job.GetName())
return false
}
return true
}
func (w *waiter) serviceReady(s *corev1.Service) bool {
// ExternalName Services are external to cluster so helm shouldn't be checking to see if they're 'ready' (i.e. have an IP Set)
if s.Spec.Type == corev1.ServiceTypeExternalName {

@ -0,0 +1,515 @@
/*
Copyright The Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube // import "helm.sh/helm/v3/pkg/kube"
import (
"context"
"testing"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes/fake"
)
const defaultNamespace = metav1.NamespaceDefault
func Test_waiter_deploymentReady(t *testing.T) {
type args struct {
rs *appsv1.ReplicaSet
dep *appsv1.Deployment
}
tests := []struct {
name string
args args
want bool
}{
{
name: "deployment is ready",
args: args{
rs: newReplicaSet("foo", 1, 1),
dep: newDeployment("foo", 1, 1, 0),
},
want: true,
},
{
name: "deployment is not ready",
args: args{
rs: newReplicaSet("foo", 0, 0),
dep: newDeployment("foo", 1, 1, 0),
},
want: false,
},
{
name: "deployment is ready when maxUnavailable is set",
args: args{
rs: newReplicaSet("foo", 2, 1),
dep: newDeployment("foo", 2, 1, 1),
},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := &waiter{
c: fake.NewSimpleClientset(),
log: nopLogger,
}
if got := w.deploymentReady(tt.args.rs, tt.args.dep); got != tt.want {
t.Errorf("deploymentReady() = %v, want %v", got, tt.want)
}
})
}
}
func Test_waiter_daemonSetReady(t *testing.T) {
type args struct {
ds *appsv1.DaemonSet
}
tests := []struct {
name string
args args
want bool
}{
{
name: "daemonset is ready",
args: args{
ds: newDaemonSet("foo", 0, 1, 1, 1),
},
want: true,
},
{
name: "daemonset is not ready",
args: args{
ds: newDaemonSet("foo", 0, 0, 1, 1),
},
want: false,
},
{
name: "daemonset pods have not been scheduled successfully",
args: args{
ds: newDaemonSet("foo", 0, 0, 1, 0),
},
want: false,
},
{
name: "daemonset is ready when maxUnavailable is set",
args: args{
ds: newDaemonSet("foo", 1, 1, 2, 2),
},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := &waiter{
c: fake.NewSimpleClientset(),
log: nopLogger,
}
if got := w.daemonSetReady(tt.args.ds); got != tt.want {
t.Errorf("daemonSetReady() = %v, want %v", got, tt.want)
}
})
}
}
func Test_waiter_statefulSetReady(t *testing.T) {
type args struct {
sts *appsv1.StatefulSet
}
tests := []struct {
name string
args args
want bool
}{
{
name: "statefulset is ready",
args: args{
sts: newStatefulSet("foo", 1, 0, 1, 1),
},
want: true,
},
{
name: "statefulset is not ready",
args: args{
sts: newStatefulSet("foo", 1, 0, 0, 1),
},
want: false,
},
{
name: "statefulset is ready when partition is specified",
args: args{
sts: newStatefulSet("foo", 2, 1, 2, 1),
},
want: true,
},
{
name: "statefulset is not ready when partition is set",
args: args{
sts: newStatefulSet("foo", 1, 1, 1, 1),
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := &waiter{
c: fake.NewSimpleClientset(),
log: nopLogger,
}
if got := w.statefulSetReady(tt.args.sts); got != tt.want {
t.Errorf("statefulSetReady() = %v, want %v", got, tt.want)
}
})
}
}
func Test_waiter_podsReadyForObject(t *testing.T) {
type args struct {
namespace string
obj runtime.Object
}
tests := []struct {
name string
args args
existPods []corev1.Pod
want bool
wantErr bool
}{
{
name: "pods ready for a replicaset",
args: args{
namespace: defaultNamespace,
obj: newReplicaSet("foo", 1, 1),
},
existPods: []corev1.Pod{
*newPodWithCondition("foo", corev1.ConditionTrue),
},
want: true,
wantErr: false,
},
{
name: "pods not ready for a replicaset",
args: args{
namespace: defaultNamespace,
obj: newReplicaSet("foo", 1, 1),
},
existPods: []corev1.Pod{
*newPodWithCondition("foo", corev1.ConditionFalse),
},
want: false,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := &waiter{
c: fake.NewSimpleClientset(),
log: nopLogger,
}
for _, pod := range tt.existPods {
if _, err := w.c.CoreV1().Pods(defaultNamespace).Create(context.TODO(), &pod, metav1.CreateOptions{}); err != nil {
t.Errorf("Failed to create Pod error: %v", err)
return
}
}
got, err := w.podsReadyForObject(tt.args.namespace, tt.args.obj)
if (err != nil) != tt.wantErr {
t.Errorf("podsReadyForObject() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("podsReadyForObject() got = %v, want %v", got, tt.want)
}
})
}
}
func Test_waiter_jobReady(t *testing.T) {
type args struct {
job *batchv1.Job
}
tests := []struct {
name string
args args
want bool
}{
{
name: "job is completed",
args: args{job: newJob("foo", 1, 1, 1, 0)},
want: true,
},
{
name: "job is incomplete",
args: args{job: newJob("foo", 1, 1, 0, 0)},
want: false,
},
{
name: "job is failed",
args: args{job: newJob("foo", 1, 1, 0, 1)},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := &waiter{
c: fake.NewSimpleClientset(),
log: nopLogger,
}
if got := w.jobReady(tt.args.job); got != tt.want {
t.Errorf("jobReady() = %v, want %v", got, tt.want)
}
})
}
}
func Test_waiter_volumeReady(t *testing.T) {
type args struct {
v *corev1.PersistentVolumeClaim
}
tests := []struct {
name string
args args
want bool
}{
{
name: "pvc is bound",
args: args{
v: newPersistentVolumeClaim("foo", corev1.ClaimBound),
},
want: true,
},
{
name: "pvc is not ready",
args: args{
v: newPersistentVolumeClaim("foo", corev1.ClaimPending),
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := &waiter{
c: fake.NewSimpleClientset(),
log: nopLogger,
}
if got := w.volumeReady(tt.args.v); got != tt.want {
t.Errorf("volumeReady() = %v, want %v", got, tt.want)
}
})
}
}
func newDaemonSet(name string, maxUnavailable, numberReady, desiredNumberScheduled, updatedNumberScheduled int) *appsv1.DaemonSet {
return &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: defaultNamespace,
},
Spec: appsv1.DaemonSetSpec{
UpdateStrategy: appsv1.DaemonSetUpdateStrategy{
Type: appsv1.RollingUpdateDaemonSetStrategyType,
RollingUpdate: &appsv1.RollingUpdateDaemonSet{
MaxUnavailable: func() *intstr.IntOrString { i := intstr.FromInt(maxUnavailable); return &i }(),
},
},
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": name}},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{"name": name},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Image: "nginx",
},
},
},
},
},
Status: appsv1.DaemonSetStatus{
DesiredNumberScheduled: int32(desiredNumberScheduled),
NumberReady: int32(numberReady),
UpdatedNumberScheduled: int32(updatedNumberScheduled),
},
}
}
func newStatefulSet(name string, replicas, partition, readyReplicas, updatedReplicas int) *appsv1.StatefulSet {
return &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: defaultNamespace,
},
Spec: appsv1.StatefulSetSpec{
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
Type: appsv1.RollingUpdateStatefulSetStrategyType,
RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{
Partition: intToInt32(partition),
},
},
Replicas: intToInt32(replicas),
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": name}},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{"name": name},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Image: "nginx",
},
},
},
},
},
Status: appsv1.StatefulSetStatus{
UpdatedReplicas: int32(updatedReplicas),
ReadyReplicas: int32(readyReplicas),
},
}
}
func newDeployment(name string, replicas, maxSurge, maxUnavailable int) *appsv1.Deployment {
return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: defaultNamespace,
},
Spec: appsv1.DeploymentSpec{
Strategy: appsv1.DeploymentStrategy{
Type: appsv1.RollingUpdateDeploymentStrategyType,
RollingUpdate: &appsv1.RollingUpdateDeployment{
MaxUnavailable: func() *intstr.IntOrString { i := intstr.FromInt(maxUnavailable); return &i }(),
MaxSurge: func() *intstr.IntOrString { i := intstr.FromInt(maxSurge); return &i }(),
},
},
Replicas: intToInt32(replicas),
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": name}},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{"name": name},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Image: "nginx",
},
},
},
},
},
}
}
func newReplicaSet(name string, replicas int, readyReplicas int) *appsv1.ReplicaSet {
d := newDeployment(name, replicas, 0, 0)
return &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: defaultNamespace,
Labels: d.Spec.Selector.MatchLabels,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(d, d.GroupVersionKind())},
},
Spec: appsv1.ReplicaSetSpec{
Selector: d.Spec.Selector,
Replicas: intToInt32(replicas),
Template: d.Spec.Template,
},
Status: appsv1.ReplicaSetStatus{
ReadyReplicas: int32(readyReplicas),
},
}
}
func newPodWithCondition(name string, podReadyCondition corev1.ConditionStatus) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: defaultNamespace,
Labels: map[string]string{"name": name},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Image: "nginx",
},
},
},
Status: corev1.PodStatus{
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: podReadyCondition,
},
},
},
}
}
func newPersistentVolumeClaim(name string, phase corev1.PersistentVolumeClaimPhase) *corev1.PersistentVolumeClaim {
return &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: defaultNamespace,
},
Status: corev1.PersistentVolumeClaimStatus{
Phase: phase,
},
}
}
func newJob(name string, backoffLimit, completions, succeeded, failed int) *batchv1.Job {
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: defaultNamespace,
},
Spec: batchv1.JobSpec{
BackoffLimit: intToInt32(backoffLimit),
Completions: intToInt32(completions),
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{"name": name},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Image: "nginx",
},
},
},
},
},
Status: batchv1.JobStatus{
Succeeded: int32(succeeded),
Failed: int32(failed),
},
}
}
func intToInt32(i int) *int32 {
i32 := int32(i)
return &i32
}
Loading…
Cancel
Save