fix: handle Jobs with TTL in status watcher

Jobs with ttlSecondsAfterFinished set will be automatically deleted
by the TTL controller after completion. The kstatus watcher would get
stuck waiting for these Jobs when they become NotFound after the TTL
controller deletes them.

This fix:
- Collects Jobs with TTL during wait setup
- Computes obj once per resource to avoid duplicated work
- Passes TTL Jobs context to the statusObserver
- Treats NotFound status for Jobs with TTL as Current status
- This allows the wait to complete successfully when the Job is deleted

This approach keeps the Jobs in the watch list, so Helm still waits
for the Job to complete, but handles the case where the TTL controller
deletes the Job after completion.

Fixes #31786

Signed-off-by: mehrdadbn9 <mehrdadbiukian@gmail.com>
pull/31851/head
mehrdadbn9 1 month ago
parent ee018608f6
commit e11a73b34e

@ -33,6 +33,7 @@ import (
"github.com/fluxcd/cli-utils/pkg/kstatus/watcher"
"github.com/fluxcd/cli-utils/pkg/object"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
@ -144,7 +145,7 @@ func (w *statusWaiter) waitForDelete(ctx context.Context, resourceList ResourceL
RESTScopeStrategy: watcher.RESTScopeNamespace,
})
statusCollector := collector.NewResourceStatusCollector(resources)
done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.NotFoundStatus, w.Logger()))
done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.NotFoundStatus, nil, w.Logger()))
<-done
if statusCollector.Error != nil {
@ -173,16 +174,21 @@ func (w *statusWaiter) wait(ctx context.Context, resourceList ResourceList, sw w
cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()
resources := []object.ObjMetadata{}
jobsWithTTL := make(map[object.ObjMetadata]struct{})
for _, resource := range resourceList {
obj, err := object.RuntimeToObjMeta(resource.Object)
if err != nil {
return err
}
switch value := AsVersioned(resource).(type) {
case *appsv1.Deployment:
if value.Spec.Paused {
continue
}
}
obj, err := object.RuntimeToObjMeta(resource.Object)
if err != nil {
return err
case *batchv1.Job:
if value.Spec.TTLSecondsAfterFinished != nil {
jobsWithTTL[obj] = struct{}{}
}
}
resources = append(resources, obj)
}
@ -191,7 +197,7 @@ func (w *statusWaiter) wait(ctx context.Context, resourceList ResourceList, sw w
RESTScopeStrategy: watcher.RESTScopeNamespace,
})
statusCollector := collector.NewResourceStatusCollector(resources)
done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.CurrentStatus, w.Logger()))
done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.CurrentStatus, jobsWithTTL, w.Logger()))
<-done
if statusCollector.Error != nil {
@ -201,7 +207,15 @@ func (w *statusWaiter) wait(ctx context.Context, resourceList ResourceList, sw w
errs := []error{}
for _, id := range resources {
rs := statusCollector.ResourceStatuses[id]
if rs.Status == status.CurrentStatus {
effectiveStatus := rs.Status
// Treat NotFound Jobs with TTL as Current for aggregation purposes.
// This handles the case where the TTL controller deletes the Job after it completes.
if rs.Status == status.NotFoundStatus {
if _, hasTTL := jobsWithTTL[id]; hasTTL {
effectiveStatus = status.CurrentStatus
}
}
if effectiveStatus == status.CurrentStatus {
continue
}
errs = append(errs, fmt.Errorf("resource %s/%s/%s not ready. status: %s, message: %s",
@ -230,7 +244,7 @@ func contextWithTimeout(ctx context.Context, timeout time.Duration) (context.Con
return watchtools.ContextWithOptionalTimeout(ctx, timeout)
}
func statusObserver(cancel context.CancelFunc, desired status.Status, logger *slog.Logger) collector.ObserverFunc {
func statusObserver(cancel context.CancelFunc, desired status.Status, jobsWithTTL map[object.ObjMetadata]struct{}, logger *slog.Logger) collector.ObserverFunc {
return func(statusCollector *collector.ResourceStatusCollector, _ event.Event) {
var rss []*event.ResourceStatus
var nonDesiredResources []*event.ResourceStatus
@ -239,17 +253,29 @@ func statusObserver(cancel context.CancelFunc, desired status.Status, logger *sl
continue
}
// If a resource is already deleted before waiting has started, it will show as unknown.
// This check ensures we don't wait forever for a resource that is already deleted.
// This check ensures we do not wait forever for a resource that is already deleted.
if rs.Status == status.UnknownStatus && desired == status.NotFoundStatus {
continue
}
// Failed is a terminal state. This check ensures we don't wait forever for a resource
// Failed is a terminal state. This check ensures we do not wait forever for a resource
// that has already failed, as intervention is required to resolve the failure.
if rs.Status == status.FailedStatus && desired == status.CurrentStatus {
continue
}
rss = append(rss, rs)
if rs.Status != desired {
// Treat NotFound Jobs with TTL as Current for aggregation purposes.
// This handles the case where the TTL controller deletes the Job after it completes.
effectiveStatus := rs.Status
if rs.Status == status.NotFoundStatus && desired == status.CurrentStatus {
if _, hasTTL := jobsWithTTL[rs.Identifier]; hasTTL {
effectiveStatus = status.CurrentStatus
}
}
rss = append(rss, &event.ResourceStatus{
Identifier: rs.Identifier,
Status: effectiveStatus,
Message: rs.Message,
})
if effectiveStatus != desired {
nonDesiredResources = append(nonDesiredResources, rs)
}
}
@ -261,7 +287,7 @@ func statusObserver(cancel context.CancelFunc, desired status.Status, logger *sl
}
if len(nonDesiredResources) > 0 {
// Log a single resource so the user knows what they're waiting for without an overwhelming amount of output
// Log a single resource so the user knows what they are waiting for without an overwhelming amount of output
sort.Slice(nonDesiredResources, func(i, j int) bool {
return nonDesiredResources[i].Identifier.Name < nonDesiredResources[j].Identifier.Name
})

@ -22,6 +22,7 @@ import (
"fmt"
"log/slog"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@ -123,6 +124,23 @@ status:
message: "Job has reached the specified backoff limit"
`
var jobWithTTLManifest = `
apiVersion: batch/v1
kind: Job
metadata:
name: job-with-ttl
namespace: default
generation: 1
spec:
ttlSecondsAfterFinished: 0
status:
succeeded: 1
active: 0
conditions:
- type: Complete
status: "True"
`
var podCompleteManifest = `
apiVersion: v1
kind: Pod
@ -1818,3 +1836,104 @@ func TestWatchUntilReadyWithCustomReaders(t *testing.T) {
})
}
}
// TestJobWithTTLDeletedDuringWait tests that Jobs with ttlSecondsAfterFinished
// are properly handled when they get deleted by the TTL controller after completion.
// This simulates the regression scenario from issue #31786.
func TestJobWithTTLDeletedDuringWait(t *testing.T) {
t.Parallel()
tests := []struct {
name string
objManifests []string
deleteDuring bool // If true, delete the Job during the wait
expectErrStrs []string
testFunc func(*statusWaiter, ResourceList, time.Duration) error
}{
{
name: "Job with TTL completes and is deleted during WatchUntilReady",
objManifests: []string{jobWithTTLManifest},
deleteDuring: true,
testFunc: func(sw *statusWaiter, rl ResourceList, timeout time.Duration) error {
return sw.WatchUntilReady(rl, timeout)
},
},
{
name: "Job with TTL completes and is deleted during WaitWithJobs",
objManifests: []string{jobWithTTLManifest},
deleteDuring: true,
testFunc: func(sw *statusWaiter, rl ResourceList, timeout time.Duration) error {
return sw.WaitWithJobs(rl, timeout)
},
},
{
name: "Job with TTL completes and is deleted during Wait",
objManifests: []string{jobWithTTLManifest},
deleteDuring: true,
testFunc: func(sw *statusWaiter, rl ResourceList, timeout time.Duration) error {
return sw.Wait(rl, timeout)
},
},
{
name: "Job with TTL exists and is complete (not deleted)",
objManifests: []string{jobWithTTLManifest},
deleteDuring: false,
testFunc: func(sw *statusWaiter, rl ResourceList, timeout time.Duration) error {
return sw.WatchUntilReady(rl, timeout)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
c := newTestClient(t)
fakeClient := dynamicfake.NewSimpleDynamicClient(scheme.Scheme)
fakeMapper := testutil.NewFakeRESTMapper(
batchv1.SchemeGroupVersion.WithKind("Job"),
)
statusWaiter := statusWaiter{
client: fakeClient,
restMapper: fakeMapper,
}
statusWaiter.SetLogger(slog.Default().Handler())
objs := getRuntimeObjFromManifests(t, tt.objManifests)
// Create the objects initially
for _, obj := range objs {
u := obj.(*unstructured.Unstructured)
gvr := getGVR(t, fakeMapper, u)
err := fakeClient.Tracker().Create(gvr, u, u.GetNamespace())
require.NoError(t, err)
}
var wg sync.WaitGroup
// If deleteDuring is true, delete the Job after a short delay
// This simulates the TTL controller deleting the Job after it completes
if tt.deleteDuring {
wg.Add(len(objs))
for _, obj := range objs {
u := obj.(*unstructured.Unstructured)
gvr := getGVR(t, fakeMapper, u)
go func(gvr schema.GroupVersionResource, u *unstructured.Unstructured) {
defer wg.Done()
time.Sleep(time.Millisecond * 100)
_ = fakeClient.Tracker().Delete(gvr, u.GetNamespace(), u.GetName())
}(gvr, u)
}
}
resourceList := getResourceListFromRuntimeObjs(t, c, objs)
err := tt.testFunc(&statusWaiter, resourceList, time.Second*3)
wg.Wait() // Ensure goroutines complete before test ends
if tt.expectErrStrs != nil {
require.Error(t, err)
for _, expectedErrStr := range tt.expectErrStrs {
assert.Contains(t, err.Error(), expectedErrStr)
}
return
}
assert.NoError(t, err)
})
}
}

Loading…
Cancel
Save