fix(helm): support waiting for jobs with ttlSecondsAfterFinished

Rely on job events instead of polling while waiting for job readiness. This way job completion can be observed even if the job was removed because of expired TTL

Signed-off-by: Vladislav Koriakov <dkfl12@yahoo.com>
pull/11348/head
Vladislav Koriakov 3 years ago
parent f3099cdb67
commit 96867dcf29

@ -29,8 +29,10 @@ import (
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/resource" "k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
@ -82,6 +84,89 @@ type ReadyChecker struct {
pausedAsReady bool pausedAsReady bool
} }
// getResourceWatch attempts to get watch.Interface for the supplied resource.
// resource is expected to have a name.
func (c *ReadyChecker) getResourceWatch(ctx context.Context, resource *resource.Info) (watch.Interface, error) {
if resource.Name == "" {
return nil, fmt.Errorf("can't get watch for resource with empty name")
}
fieldNameSelector := fmt.Sprintf("metadata.name=%s", fields.EscapeValue(resource.Name))
listOpts := metav1.ListOptions{
FieldSelector: fieldNameSelector,
ResourceVersion: resource.ResourceVersion,
}
switch resourceType := AsVersioned(resource).(type) {
case *batchv1.Job:
return c.client.BatchV1().Jobs(resource.Namespace).Watch(ctx, listOpts)
default:
return nil, fmt.Errorf("can't get watch for resoure of type %T - not implemented", resourceType)
}
}
// convert event.Object to *batchv1.Job or return an error.
func eventObjectAsJob(event watch.Event) (*batchv1.Job, error) {
job, ok := event.Object.(*batchv1.Job)
if !ok {
return nil, fmt.Errorf(
"expected runtime.Object type of type *batchv1.Job, got %T",
job,
)
}
return job, nil
}
// waitJobsReady waits until all jobs from resource list are ready.
// Returns error if context is done or a job wasn't ready.
// If resource list has no jobs it returns true without an error.
func (c *ReadyChecker) waitJobsReady(ctx context.Context, resources ResourceList) (bool, error) {
for _, resource := range resources {
switch job := AsVersioned(resource).(type) {
case *batchv1.Job:
lastSeen := job
jobWatch, err := c.getResourceWatch(ctx, resource)
if err != nil {
c.log("Falling back to polling, error watching job events: %v", err)
return c.IsReady(ctx, resource)
}
defer jobWatch.Stop()
isReady, err := c.jobReady(lastSeen)
if err != nil {
return isReady, err
}
for !isReady {
select {
case <-ctx.Done():
isReady, err = c.jobReady(lastSeen)
if err != nil {
return isReady, err
}
return isReady, ctx.Err()
case event, ok := <-jobWatch.ResultChan():
if !ok {
return c.jobReady(lastSeen)
}
lastSeen, err = eventObjectAsJob(event)
if err != nil {
return isReady, err
}
isReady, err = c.jobReady(lastSeen)
if err != nil {
return isReady, err
}
if event.Type == watch.Deleted && !isReady {
return false, fmt.Errorf(
"job %v/%v is deleted but wasn't ready",
lastSeen.GetNamespace(),
lastSeen.GetName(),
)
}
}
}
}
}
return true, nil
}
// IsReady checks if v is ready. It supports checking readiness for pods, // IsReady checks if v is ready. It supports checking readiness for pods,
// deployments, persistent volume claims, services, daemon sets, custom // deployments, persistent volume claims, services, daemon sets, custom
// resource definitions, stateful sets, replication controllers, jobs (optional), // resource definitions, stateful sets, replication controllers, jobs (optional),

@ -17,7 +17,9 @@ package kube // import "helm.sh/helm/v3/pkg/kube"
import ( import (
"context" "context"
"fmt"
"testing" "testing"
"time"
appsv1 "k8s.io/api/apps/v1" appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1" batchv1 "k8s.io/api/batch/v1"
@ -25,7 +27,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
k8stest "k8s.io/client-go/testing"
) )
const defaultNamespace = metav1.NamespaceDefault const defaultNamespace = metav1.NamespaceDefault
@ -264,6 +269,158 @@ func Test_ReadyChecker_podsReadyForObject(t *testing.T) {
} }
} }
func Test_ReadyChecker_waitJobReady(t *testing.T) {
type jobEventAction func(job batchv1.Job) (batchv1.Job, watch.EventType)
type args struct {
job *batchv1.Job
actions []jobEventAction
wantErr error
ctxTimeout *time.Duration
}
makeDuration := func(timeout time.Duration) *time.Duration {
return &timeout
}
tests := []struct {
name string
args args
want bool
}{
{
name: "job is succeeded on deletion",
args: args{
job: newJob("foo", 1, intToInt32(1), 0, 0),
actions: []jobEventAction{
func(job batchv1.Job) (batchv1.Job, watch.EventType) {
return job, watch.Added
},
func(job batchv1.Job) (batchv1.Job, watch.EventType) {
job.Status.Succeeded = 1
return job, watch.Deleted
},
},
},
want: true,
},
{
name: "job is succeeded on modification",
args: args{
job: newJob("foo", 1, intToInt32(1), 0, 0),
actions: []jobEventAction{
func(job batchv1.Job) (batchv1.Job, watch.EventType) {
return job, watch.Added
},
func(job batchv1.Job) (batchv1.Job, watch.EventType) {
job.Status.Succeeded = 1
return job, watch.Modified
},
},
},
want: true,
},
{
name: "job is failed",
args: args{job: newJob("foo", 1, intToInt32(1), 0, 1)},
want: false,
},
{
name: "job deleted never succeeded",
args: args{
job: newJob("foo", 1, intToInt32(1), 0, 0),
actions: []jobEventAction{
func(job batchv1.Job) (batchv1.Job, watch.EventType) {
return job, watch.Added
},
func(job batchv1.Job) (batchv1.Job, watch.EventType) {
return job, watch.Deleted
},
},
wantErr: fmt.Errorf("job default/foo is deleted but wasn't ready"),
},
},
{
name: "job is succeeded and deleted",
args: args{
job: newJob("foo", 0, intToInt32(1), 0, 0),
actions: []jobEventAction{
func(job batchv1.Job) (batchv1.Job, watch.EventType) {
job.Status.Succeeded = 1
return job, watch.Deleted
},
},
},
want: true,
},
{
name: "job with null completions",
args: args{job: newJob("foo", 0, nil, 1, 0)},
want: true,
},
{
name: "job is succeeded",
args: args{job: newJob("foo", 1, intToInt32(1), 1, 0)},
want: true,
},
{
name: "wait times out",
args: args{
job: newJob("foo", 1, intToInt32(1), 0, 0),
ctxTimeout: makeDuration(time.Millisecond * 10),
},
want: false,
},
}
for _, tt := range tests {
testArgs := tt.args
t.Run(tt.name, func(t *testing.T) {
fakeClient := fake.NewSimpleClientset(testArgs.job)
watcher := watch.NewFake()
fakeClient.PrependWatchReactor(
"jobs",
k8stest.DefaultWatchReactor(watcher, nil),
)
go func(job batchv1.Job, eventActions []jobEventAction) {
defer watcher.Stop()
for _, jobAction := range eventActions {
job, event := jobAction(job)
watcher.Action(event, &job)
}
}(*testArgs.job, testArgs.actions)
checker := NewReadyChecker(fakeClient, nil, CheckJobs(true))
res := &resource.Info{
Object: testArgs.job,
Namespace: testArgs.job.Namespace,
Name: testArgs.job.Name,
}
var (
ctx = context.Background()
cancel context.CancelFunc
)
if testArgs.ctxTimeout != nil {
ctx, cancel = context.WithTimeout(ctx, *testArgs.ctxTimeout)
defer cancel()
}
got, err := checker.waitJobsReady(ctx, ResourceList{res})
if err != nil {
if testArgs.wantErr == nil {
t.Errorf("waitJobsReady() wanted no error, got '%v'", err)
} else if testArgs.wantErr.Error() != err.Error() {
t.Errorf("waitJobsReady() wanted error '%v', got '%v'", testArgs.wantErr, err)
}
} else if testArgs.wantErr != nil {
t.Errorf("waitJobsReady() wanted error '%v', got none", testArgs.wantErr)
}
if got != tt.want {
t.Errorf("waitJobsReady() = %v, want %v", got, tt.want)
}
})
}
}
func Test_ReadyChecker_jobReady(t *testing.T) { func Test_ReadyChecker_jobReady(t *testing.T) {
type args struct { type args struct {
job *batchv1.Job job *batchv1.Job
@ -551,6 +708,10 @@ func newPersistentVolumeClaim(name string, phase corev1.PersistentVolumeClaimPha
func newJob(name string, backoffLimit int, completions *int32, succeeded int, failed int) *batchv1.Job { func newJob(name string, backoffLimit int, completions *int32, succeeded int, failed int) *batchv1.Job {
return &batchv1.Job{ return &batchv1.Job{
TypeMeta: metav1.TypeMeta{
Kind: "Job",
APIVersion: "batch/v1",
},
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: name, Name: name,
Namespace: defaultNamespace, Namespace: defaultNamespace,

@ -42,15 +42,15 @@ type waiter struct {
log func(string, ...interface{}) log func(string, ...interface{})
} }
// waitForResources polls to get the current status of all pods, PVCs, Services and // waitForResources polls to get the current status of all pods, PVCs, Services and until all are ready or a timeout is reached.
// Jobs(optional) until all are ready or a timeout is reached // For jobs (optional) it watches for events and checks if the job is (or was) ready at the time of event.
func (w *waiter) waitForResources(created ResourceList) error { func (w *waiter) waitForResources(created ResourceList) error {
w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout) w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout)
ctx, cancel := context.WithTimeout(context.Background(), w.timeout) ctx, cancel := context.WithTimeout(context.Background(), w.timeout)
defer cancel() defer cancel()
return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) { err := wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) {
for _, v := range created { for _, v := range created {
ready, err := w.c.IsReady(ctx, v) ready, err := w.c.IsReady(ctx, v)
if !ready || err != nil { if !ready || err != nil {
@ -59,6 +59,21 @@ func (w *waiter) waitForResources(created ResourceList) error {
} }
return true, nil return true, nil
}) })
if err != nil {
return err
}
if w.c.checkJobs {
jobsReady, err := w.c.waitJobsReady(ctx, created)
if err != nil {
return err
}
if !jobsReady {
return errors.New("jobs not ready")
}
}
return nil
} }
// waitForDeletedResources polls to check if all the resources are deleted or a timeout is reached // waitForDeletedResources polls to check if all the resources are deleted or a timeout is reached

Loading…
Cancel
Save