diff --git a/pkg/kube/statuswait.go b/pkg/kube/statuswait.go index 01024afa6..3db7b127b 100644 --- a/pkg/kube/statuswait.go +++ b/pkg/kube/statuswait.go @@ -33,6 +33,7 @@ import ( "github.com/fluxcd/cli-utils/pkg/kstatus/watcher" "github.com/fluxcd/cli-utils/pkg/object" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/client-go/dynamic" @@ -144,7 +145,7 @@ func (w *statusWaiter) waitForDelete(ctx context.Context, resourceList ResourceL RESTScopeStrategy: watcher.RESTScopeNamespace, }) statusCollector := collector.NewResourceStatusCollector(resources) - done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.NotFoundStatus, w.Logger())) + done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.NotFoundStatus, nil, w.Logger())) <-done if statusCollector.Error != nil { @@ -173,16 +174,21 @@ func (w *statusWaiter) wait(ctx context.Context, resourceList ResourceList, sw w cancelCtx, cancel := context.WithCancel(ctx) defer cancel() resources := []object.ObjMetadata{} + jobsWithTTL := make(map[object.ObjMetadata]struct{}) for _, resource := range resourceList { + obj, err := object.RuntimeToObjMeta(resource.Object) + if err != nil { + return err + } switch value := AsVersioned(resource).(type) { case *appsv1.Deployment: if value.Spec.Paused { continue } - } - obj, err := object.RuntimeToObjMeta(resource.Object) - if err != nil { - return err + case *batchv1.Job: + if value.Spec.TTLSecondsAfterFinished != nil { + jobsWithTTL[obj] = struct{}{} + } } resources = append(resources, obj) } @@ -191,7 +197,7 @@ func (w *statusWaiter) wait(ctx context.Context, resourceList ResourceList, sw w RESTScopeStrategy: watcher.RESTScopeNamespace, }) statusCollector := collector.NewResourceStatusCollector(resources) - done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.CurrentStatus, w.Logger())) + done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.CurrentStatus, jobsWithTTL, w.Logger())) <-done if statusCollector.Error != nil { @@ -201,7 +207,15 @@ func (w *statusWaiter) wait(ctx context.Context, resourceList ResourceList, sw w errs := []error{} for _, id := range resources { rs := statusCollector.ResourceStatuses[id] - if rs.Status == status.CurrentStatus { + effectiveStatus := rs.Status + // Treat NotFound Jobs with TTL as Current for aggregation purposes. + // This handles the case where the TTL controller deletes the Job after it completes. + if rs.Status == status.NotFoundStatus { + if _, hasTTL := jobsWithTTL[id]; hasTTL { + effectiveStatus = status.CurrentStatus + } + } + if effectiveStatus == status.CurrentStatus { continue } errs = append(errs, fmt.Errorf("resource %s/%s/%s not ready. status: %s, message: %s", @@ -230,7 +244,7 @@ func contextWithTimeout(ctx context.Context, timeout time.Duration) (context.Con return watchtools.ContextWithOptionalTimeout(ctx, timeout) } -func statusObserver(cancel context.CancelFunc, desired status.Status, logger *slog.Logger) collector.ObserverFunc { +func statusObserver(cancel context.CancelFunc, desired status.Status, jobsWithTTL map[object.ObjMetadata]struct{}, logger *slog.Logger) collector.ObserverFunc { return func(statusCollector *collector.ResourceStatusCollector, _ event.Event) { var rss []*event.ResourceStatus var nonDesiredResources []*event.ResourceStatus @@ -239,17 +253,29 @@ func statusObserver(cancel context.CancelFunc, desired status.Status, logger *sl continue } // If a resource is already deleted before waiting has started, it will show as unknown. - // This check ensures we don't wait forever for a resource that is already deleted. + // This check ensures we do not wait forever for a resource that is already deleted. if rs.Status == status.UnknownStatus && desired == status.NotFoundStatus { continue } - // Failed is a terminal state. This check ensures we don't wait forever for a resource + // Failed is a terminal state. This check ensures we do not wait forever for a resource // that has already failed, as intervention is required to resolve the failure. if rs.Status == status.FailedStatus && desired == status.CurrentStatus { continue } - rss = append(rss, rs) - if rs.Status != desired { + // Treat NotFound Jobs with TTL as Current for aggregation purposes. + // This handles the case where the TTL controller deletes the Job after it completes. + effectiveStatus := rs.Status + if rs.Status == status.NotFoundStatus && desired == status.CurrentStatus { + if _, hasTTL := jobsWithTTL[rs.Identifier]; hasTTL { + effectiveStatus = status.CurrentStatus + } + } + rss = append(rss, &event.ResourceStatus{ + Identifier: rs.Identifier, + Status: effectiveStatus, + Message: rs.Message, + }) + if effectiveStatus != desired { nonDesiredResources = append(nonDesiredResources, rs) } } @@ -261,7 +287,7 @@ func statusObserver(cancel context.CancelFunc, desired status.Status, logger *sl } if len(nonDesiredResources) > 0 { - // Log a single resource so the user knows what they're waiting for without an overwhelming amount of output + // Log a single resource so the user knows what they are waiting for without an overwhelming amount of output sort.Slice(nonDesiredResources, func(i, j int) bool { return nonDesiredResources[i].Identifier.Name < nonDesiredResources[j].Identifier.Name }) diff --git a/pkg/kube/statuswait_test.go b/pkg/kube/statuswait_test.go index d2dd57872..0ef13ed72 100644 --- a/pkg/kube/statuswait_test.go +++ b/pkg/kube/statuswait_test.go @@ -22,6 +22,7 @@ import ( "fmt" "log/slog" "strings" + "sync" "sync/atomic" "testing" "time" @@ -123,6 +124,23 @@ status: message: "Job has reached the specified backoff limit" ` +var jobWithTTLManifest = ` +apiVersion: batch/v1 +kind: Job +metadata: + name: job-with-ttl + namespace: default + generation: 1 +spec: + ttlSecondsAfterFinished: 0 +status: + succeeded: 1 + active: 0 + conditions: + - type: Complete + status: "True" +` + var podCompleteManifest = ` apiVersion: v1 kind: Pod @@ -1818,3 +1836,104 @@ func TestWatchUntilReadyWithCustomReaders(t *testing.T) { }) } } + +// TestJobWithTTLDeletedDuringWait tests that Jobs with ttlSecondsAfterFinished +// are properly handled when they get deleted by the TTL controller after completion. +// This simulates the regression scenario from issue #31786. +func TestJobWithTTLDeletedDuringWait(t *testing.T) { + t.Parallel() + tests := []struct { + name string + objManifests []string + deleteDuring bool // If true, delete the Job during the wait + expectErrStrs []string + testFunc func(*statusWaiter, ResourceList, time.Duration) error + }{ + { + name: "Job with TTL completes and is deleted during WatchUntilReady", + objManifests: []string{jobWithTTLManifest}, + deleteDuring: true, + testFunc: func(sw *statusWaiter, rl ResourceList, timeout time.Duration) error { + return sw.WatchUntilReady(rl, timeout) + }, + }, + { + name: "Job with TTL completes and is deleted during WaitWithJobs", + objManifests: []string{jobWithTTLManifest}, + deleteDuring: true, + testFunc: func(sw *statusWaiter, rl ResourceList, timeout time.Duration) error { + return sw.WaitWithJobs(rl, timeout) + }, + }, + { + name: "Job with TTL completes and is deleted during Wait", + objManifests: []string{jobWithTTLManifest}, + deleteDuring: true, + testFunc: func(sw *statusWaiter, rl ResourceList, timeout time.Duration) error { + return sw.Wait(rl, timeout) + }, + }, + { + name: "Job with TTL exists and is complete (not deleted)", + objManifests: []string{jobWithTTLManifest}, + deleteDuring: false, + testFunc: func(sw *statusWaiter, rl ResourceList, timeout time.Duration) error { + return sw.WatchUntilReady(rl, timeout) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + c := newTestClient(t) + fakeClient := dynamicfake.NewSimpleDynamicClient(scheme.Scheme) + fakeMapper := testutil.NewFakeRESTMapper( + batchv1.SchemeGroupVersion.WithKind("Job"), + ) + statusWaiter := statusWaiter{ + client: fakeClient, + restMapper: fakeMapper, + } + statusWaiter.SetLogger(slog.Default().Handler()) + objs := getRuntimeObjFromManifests(t, tt.objManifests) + + // Create the objects initially + for _, obj := range objs { + u := obj.(*unstructured.Unstructured) + gvr := getGVR(t, fakeMapper, u) + err := fakeClient.Tracker().Create(gvr, u, u.GetNamespace()) + require.NoError(t, err) + } + + var wg sync.WaitGroup + // If deleteDuring is true, delete the Job after a short delay + // This simulates the TTL controller deleting the Job after it completes + if tt.deleteDuring { + wg.Add(len(objs)) + for _, obj := range objs { + u := obj.(*unstructured.Unstructured) + gvr := getGVR(t, fakeMapper, u) + go func(gvr schema.GroupVersionResource, u *unstructured.Unstructured) { + defer wg.Done() + time.Sleep(time.Millisecond * 100) + _ = fakeClient.Tracker().Delete(gvr, u.GetNamespace(), u.GetName()) + }(gvr, u) + } + } + + resourceList := getResourceListFromRuntimeObjs(t, c, objs) + err := tt.testFunc(&statusWaiter, resourceList, time.Second*3) + wg.Wait() // Ensure goroutines complete before test ends + + if tt.expectErrStrs != nil { + require.Error(t, err) + for _, expectedErrStr := range tt.expectErrStrs { + assert.Contains(t, err.Error(), expectedErrStr) + } + return + } + assert.NoError(t, err) + }) + } +}