diff --git a/pkg/kube/kwait.go b/pkg/kube/kwait.go index f173a074e..ae7fcbe43 100644 --- a/pkg/kube/kwait.go +++ b/pkg/kube/kwait.go @@ -51,7 +51,58 @@ func (w *kstatusWaiter) WaitWithJobs(resourceList ResourceList, timeout time.Dur } func (w *kstatusWaiter) waitForDelete(ctx context.Context, resourceList ResourceList) error { - _, cancel := context.WithCancel(ctx) + cancelCtx, cancel := context.WithCancel(ctx) + defer cancel() + runtimeObjs := []runtime.Object{} + for _, resource := range resourceList { + runtimeObjs = append(runtimeObjs, resource.Object) + } + resources := []object.ObjMetadata{} + for _, runtimeObj := range runtimeObjs { + obj, err := object.RuntimeToObjMeta(runtimeObj) + if err != nil { + return err + } + resources = append(resources, obj) + } + eventCh := w.sw.Watch(cancelCtx, resources, watcher.Options{}) + statusCollector := collector.NewResourceStatusCollector(resources) + done := statusCollector.ListenWithObserver(eventCh, collector.ObserverFunc( + func(statusCollector *collector.ResourceStatusCollector, _ event.Event) { + rss := []*event.ResourceStatus{} + for _, rs := range statusCollector.ResourceStatuses { + if rs == nil { + continue + } + rss = append(rss, rs) + } + desired := status.NotFoundStatus + if aggregator.AggregateStatus(rss, desired) == desired { + cancel() + return + } + }), + ) + <-done + + if statusCollector.Error != nil { + return statusCollector.Error + } + + // Only check parent context error, otherwise we would error when desired status is achieved. + if ctx.Err() != nil { + errs := []error{} + for _, id := range resources { + rs := statusCollector.ResourceStatuses[id] + if rs.Status == status.CurrentStatus { + continue + } + errs = append(errs, fmt.Errorf("%s: %s not ready, status: %s", rs.Identifier.Name, rs.Identifier.GroupKind.Kind, rs.Status)) + } + errs = append(errs, ctx.Err()) + return errors.Join(errs...) + } + return nil defer cancel() return nil } diff --git a/pkg/kube/kwait_test.go b/pkg/kube/kwait_test.go index 2301c373d..f910a4a9b 100644 --- a/pkg/kube/kwait_test.go +++ b/pkg/kube/kwait_test.go @@ -29,7 +29,6 @@ import ( batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -120,17 +119,15 @@ func getGVR(t *testing.T, mapper meta.RESTMapper, obj *unstructured.Unstructured func TestKWaitForDelete(t *testing.T) { t.Parallel() tests := []struct { - name string - objs []runtime.Object - expectErrs []error - waitForJobs bool - pausedAsReady bool + name string + objYamls []string + expectErrs []error + waitForJobs bool }{ { - name: "Pod is deleted", - objs: []runtime.Object{ - &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod", Namespace: "ns"}}, - }, + name: "wait for pod to be deleted", + objYamls: []string{podCurrent}, + expectErrs: nil, }, } for _, tt := range tests { @@ -150,14 +147,27 @@ func TestKWaitForDelete(t *testing.T) { } ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() + objs := []runtime.Object{} + for _, podYaml := range tt.objYamls { + m := make(map[string]interface{}) + err := yaml.Unmarshal([]byte(podYaml), &m) + assert.NoError(t, err) + resource := &unstructured.Unstructured{Object: m} + objs = append(objs, resource) + gvr := getGVR(t, fakeMapper, resource) + err = fakeClient.Tracker().Create(gvr, resource, resource.GetNamespace()) + assert.NoError(t, err) + go func(){ + time.Sleep(2 * time.Second) + err = fakeClient.Tracker().Delete(gvr, resource.GetNamespace(), resource.GetName()) + assert.NoError(t, err) + }() + } resourceList := ResourceList{} - for _, obj := range tt.objs { + for _, obj := range objs { list, err := c.Build(objBody(obj), false) assert.NoError(t, err) - // gvr := getGVR(t, fakeMapper, obj.) - // err = fakeClient.Tracker().Create(gvr, obj, ) - // assert.NoError(t, err) - // resourceList = append(resourceList, list...) + resourceList = append(resourceList, list...) } err := kwaiter.waitForDelete(ctx, resourceList) if tt.expectErrs != nil { @@ -173,11 +183,10 @@ func TestKWaitForDelete(t *testing.T) { func TestKWaitJob(t *testing.T) { t.Parallel() tests := []struct { - name string - objYamls []string - expectErrs []error - waitForJobs bool - pausedAsReady bool + name string + objYamls []string + expectErrs []error + waitForJobs bool }{ { name: "Job is complete", @@ -207,10 +216,9 @@ func TestKWaitJob(t *testing.T) { expectErrs: []error{errors.New("in-progress-pod: Pod not ready, status: InProgress"), errors.New("context deadline exceeded")}, }, { - name: "paused deployment passes", - objYamls: []string{pausedDeploymentYaml}, - expectErrs: nil, - pausedAsReady: true, + name: "paused deployment passes", + objYamls: []string{pausedDeploymentYaml}, + expectErrs: nil, }, }