From e11a73b34ef8455a44d5e087ec1f9e3aa994e201 Mon Sep 17 00:00:00 2001 From: mehrdadbn9 Date: Sat, 21 Feb 2026 07:43:19 +0330 Subject: [PATCH] fix: handle Jobs with TTL in status watcher Jobs with ttlSecondsAfterFinished set will be automatically deleted by the TTL controller after completion. The kstatus watcher would get stuck waiting for these Jobs when they become NotFound after the TTL controller deletes them. This fix: - Collects Jobs with TTL during wait setup - Computes obj once per resource to avoid duplicated work - Passes TTL Jobs context to the statusObserver - Treats NotFound status for Jobs with TTL as Current status - This allows the wait to complete successfully when the Job is deleted This approach keeps the Jobs in the watch list, so Helm still waits for the Job to complete, but handles the case where the TTL controller deletes the Job after completion. Fixes #31786 Signed-off-by: mehrdadbn9 --- pkg/kube/statuswait.go | 52 ++++++++++++---- pkg/kube/statuswait_test.go | 119 ++++++++++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+), 13 deletions(-) diff --git a/pkg/kube/statuswait.go b/pkg/kube/statuswait.go index 01024afa6..3db7b127b 100644 --- a/pkg/kube/statuswait.go +++ b/pkg/kube/statuswait.go @@ -33,6 +33,7 @@ import ( "github.com/fluxcd/cli-utils/pkg/kstatus/watcher" "github.com/fluxcd/cli-utils/pkg/object" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/client-go/dynamic" @@ -144,7 +145,7 @@ func (w *statusWaiter) waitForDelete(ctx context.Context, resourceList ResourceL RESTScopeStrategy: watcher.RESTScopeNamespace, }) statusCollector := collector.NewResourceStatusCollector(resources) - done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.NotFoundStatus, w.Logger())) + done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.NotFoundStatus, nil, w.Logger())) <-done if statusCollector.Error != nil { @@ -173,16 +174,21 @@ func (w *statusWaiter) wait(ctx context.Context, resourceList ResourceList, sw w cancelCtx, cancel := context.WithCancel(ctx) defer cancel() resources := []object.ObjMetadata{} + jobsWithTTL := make(map[object.ObjMetadata]struct{}) for _, resource := range resourceList { + obj, err := object.RuntimeToObjMeta(resource.Object) + if err != nil { + return err + } switch value := AsVersioned(resource).(type) { case *appsv1.Deployment: if value.Spec.Paused { continue } - } - obj, err := object.RuntimeToObjMeta(resource.Object) - if err != nil { - return err + case *batchv1.Job: + if value.Spec.TTLSecondsAfterFinished != nil { + jobsWithTTL[obj] = struct{}{} + } } resources = append(resources, obj) } @@ -191,7 +197,7 @@ func (w *statusWaiter) wait(ctx context.Context, resourceList ResourceList, sw w RESTScopeStrategy: watcher.RESTScopeNamespace, }) statusCollector := collector.NewResourceStatusCollector(resources) - done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.CurrentStatus, w.Logger())) + done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.CurrentStatus, jobsWithTTL, w.Logger())) <-done if statusCollector.Error != nil { @@ -201,7 +207,15 @@ func (w *statusWaiter) wait(ctx context.Context, resourceList ResourceList, sw w errs := []error{} for _, id := range resources { rs := statusCollector.ResourceStatuses[id] - if rs.Status == status.CurrentStatus { + effectiveStatus := rs.Status + // Treat NotFound Jobs with TTL as Current for aggregation purposes. + // This handles the case where the TTL controller deletes the Job after it completes. + if rs.Status == status.NotFoundStatus { + if _, hasTTL := jobsWithTTL[id]; hasTTL { + effectiveStatus = status.CurrentStatus + } + } + if effectiveStatus == status.CurrentStatus { continue } errs = append(errs, fmt.Errorf("resource %s/%s/%s not ready. status: %s, message: %s", @@ -230,7 +244,7 @@ func contextWithTimeout(ctx context.Context, timeout time.Duration) (context.Con return watchtools.ContextWithOptionalTimeout(ctx, timeout) } -func statusObserver(cancel context.CancelFunc, desired status.Status, logger *slog.Logger) collector.ObserverFunc { +func statusObserver(cancel context.CancelFunc, desired status.Status, jobsWithTTL map[object.ObjMetadata]struct{}, logger *slog.Logger) collector.ObserverFunc { return func(statusCollector *collector.ResourceStatusCollector, _ event.Event) { var rss []*event.ResourceStatus var nonDesiredResources []*event.ResourceStatus @@ -239,17 +253,29 @@ func statusObserver(cancel context.CancelFunc, desired status.Status, logger *sl continue } // If a resource is already deleted before waiting has started, it will show as unknown. - // This check ensures we don't wait forever for a resource that is already deleted. + // This check ensures we do not wait forever for a resource that is already deleted. if rs.Status == status.UnknownStatus && desired == status.NotFoundStatus { continue } - // Failed is a terminal state. This check ensures we don't wait forever for a resource + // Failed is a terminal state. This check ensures we do not wait forever for a resource // that has already failed, as intervention is required to resolve the failure. if rs.Status == status.FailedStatus && desired == status.CurrentStatus { continue } - rss = append(rss, rs) - if rs.Status != desired { + // Treat NotFound Jobs with TTL as Current for aggregation purposes. + // This handles the case where the TTL controller deletes the Job after it completes. + effectiveStatus := rs.Status + if rs.Status == status.NotFoundStatus && desired == status.CurrentStatus { + if _, hasTTL := jobsWithTTL[rs.Identifier]; hasTTL { + effectiveStatus = status.CurrentStatus + } + } + rss = append(rss, &event.ResourceStatus{ + Identifier: rs.Identifier, + Status: effectiveStatus, + Message: rs.Message, + }) + if effectiveStatus != desired { nonDesiredResources = append(nonDesiredResources, rs) } } @@ -261,7 +287,7 @@ func statusObserver(cancel context.CancelFunc, desired status.Status, logger *sl } if len(nonDesiredResources) > 0 { - // Log a single resource so the user knows what they're waiting for without an overwhelming amount of output + // Log a single resource so the user knows what they are waiting for without an overwhelming amount of output sort.Slice(nonDesiredResources, func(i, j int) bool { return nonDesiredResources[i].Identifier.Name < nonDesiredResources[j].Identifier.Name }) diff --git a/pkg/kube/statuswait_test.go b/pkg/kube/statuswait_test.go index d2dd57872..0ef13ed72 100644 --- a/pkg/kube/statuswait_test.go +++ b/pkg/kube/statuswait_test.go @@ -22,6 +22,7 @@ import ( "fmt" "log/slog" "strings" + "sync" "sync/atomic" "testing" "time" @@ -123,6 +124,23 @@ status: message: "Job has reached the specified backoff limit" ` +var jobWithTTLManifest = ` +apiVersion: batch/v1 +kind: Job +metadata: + name: job-with-ttl + namespace: default + generation: 1 +spec: + ttlSecondsAfterFinished: 0 +status: + succeeded: 1 + active: 0 + conditions: + - type: Complete + status: "True" +` + var podCompleteManifest = ` apiVersion: v1 kind: Pod @@ -1818,3 +1836,104 @@ func TestWatchUntilReadyWithCustomReaders(t *testing.T) { }) } } + +// TestJobWithTTLDeletedDuringWait tests that Jobs with ttlSecondsAfterFinished +// are properly handled when they get deleted by the TTL controller after completion. +// This simulates the regression scenario from issue #31786. +func TestJobWithTTLDeletedDuringWait(t *testing.T) { + t.Parallel() + tests := []struct { + name string + objManifests []string + deleteDuring bool // If true, delete the Job during the wait + expectErrStrs []string + testFunc func(*statusWaiter, ResourceList, time.Duration) error + }{ + { + name: "Job with TTL completes and is deleted during WatchUntilReady", + objManifests: []string{jobWithTTLManifest}, + deleteDuring: true, + testFunc: func(sw *statusWaiter, rl ResourceList, timeout time.Duration) error { + return sw.WatchUntilReady(rl, timeout) + }, + }, + { + name: "Job with TTL completes and is deleted during WaitWithJobs", + objManifests: []string{jobWithTTLManifest}, + deleteDuring: true, + testFunc: func(sw *statusWaiter, rl ResourceList, timeout time.Duration) error { + return sw.WaitWithJobs(rl, timeout) + }, + }, + { + name: "Job with TTL completes and is deleted during Wait", + objManifests: []string{jobWithTTLManifest}, + deleteDuring: true, + testFunc: func(sw *statusWaiter, rl ResourceList, timeout time.Duration) error { + return sw.Wait(rl, timeout) + }, + }, + { + name: "Job with TTL exists and is complete (not deleted)", + objManifests: []string{jobWithTTLManifest}, + deleteDuring: false, + testFunc: func(sw *statusWaiter, rl ResourceList, timeout time.Duration) error { + return sw.WatchUntilReady(rl, timeout) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + c := newTestClient(t) + fakeClient := dynamicfake.NewSimpleDynamicClient(scheme.Scheme) + fakeMapper := testutil.NewFakeRESTMapper( + batchv1.SchemeGroupVersion.WithKind("Job"), + ) + statusWaiter := statusWaiter{ + client: fakeClient, + restMapper: fakeMapper, + } + statusWaiter.SetLogger(slog.Default().Handler()) + objs := getRuntimeObjFromManifests(t, tt.objManifests) + + // Create the objects initially + for _, obj := range objs { + u := obj.(*unstructured.Unstructured) + gvr := getGVR(t, fakeMapper, u) + err := fakeClient.Tracker().Create(gvr, u, u.GetNamespace()) + require.NoError(t, err) + } + + var wg sync.WaitGroup + // If deleteDuring is true, delete the Job after a short delay + // This simulates the TTL controller deleting the Job after it completes + if tt.deleteDuring { + wg.Add(len(objs)) + for _, obj := range objs { + u := obj.(*unstructured.Unstructured) + gvr := getGVR(t, fakeMapper, u) + go func(gvr schema.GroupVersionResource, u *unstructured.Unstructured) { + defer wg.Done() + time.Sleep(time.Millisecond * 100) + _ = fakeClient.Tracker().Delete(gvr, u.GetNamespace(), u.GetName()) + }(gvr, u) + } + } + + resourceList := getResourceListFromRuntimeObjs(t, c, objs) + err := tt.testFunc(&statusWaiter, resourceList, time.Second*3) + wg.Wait() // Ensure goroutines complete before test ends + + if tt.expectErrStrs != nil { + require.Error(t, err) + for _, expectedErrStr := range tt.expectErrStrs { + assert.Contains(t, err.Error(), expectedErrStr) + } + return + } + assert.NoError(t, err) + }) + } +}