From a5909993231c0826a7c5c139241d0e053ce9d03e Mon Sep 17 00:00:00 2001 From: Austin Abro Date: Thu, 6 Feb 2025 19:53:42 +0000 Subject: [PATCH] switch client Signed-off-by: Austin Abro --- pkg/kube/client.go | 11 +++-------- pkg/kube/statuswait.go | 33 +++++++++++++++++++-------------- pkg/kube/statuswait_test.go | 22 ++++++++++++---------- 3 files changed, 34 insertions(+), 32 deletions(-) diff --git a/pkg/kube/client.go b/pkg/kube/client.go index b4164a8ff..3753998ff 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -37,7 +37,6 @@ import ( apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" - "sigs.k8s.io/cli-utils/pkg/kstatus/watcher" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" multierror "github.com/hashicorp/go-multierror" @@ -59,7 +58,6 @@ import ( watchtools "k8s.io/client-go/tools/watch" "k8s.io/client-go/util/retry" cmdutil "k8s.io/kubectl/pkg/cmd/util" - "sigs.k8s.io/cli-utils/pkg/kstatus/polling/statusreaders" ) // ErrNoObjectsVisited indicates that during a visit operation, no matching objects were found. @@ -133,13 +131,10 @@ func (c *Client) newWaiter(strategy WaitStrategy) (Waiter, error) { if err != nil { return nil, err } - sw := watcher.NewDefaultStatusWatcher(dynamicClient, restMapper) - newCustomJobStatusReader := NewCustomJobStatusReader(restMapper) - customSR := statusreaders.NewStatusReader(restMapper, newCustomJobStatusReader) - sw.StatusReader = customSR return &statusWaiter{ - sw: sw, - log: c.Log, + restMapper: restMapper, + client: dynamicClient, + log: c.Log, }, nil default: return nil, errors.New("unknown wait strategy") diff --git a/pkg/kube/statuswait.go b/pkg/kube/statuswait.go index 7ac4706ee..1aa424c4c 100644 --- a/pkg/kube/statuswait.go +++ b/pkg/kube/statuswait.go @@ -23,42 +23,51 @@ import ( "time" appsv1 "k8s.io/api/apps/v1" - batchv1 "k8s.io/api/batch/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/client-go/dynamic" "sigs.k8s.io/cli-utils/pkg/kstatus/polling/aggregator" "sigs.k8s.io/cli-utils/pkg/kstatus/polling/collector" "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/statusreaders" "sigs.k8s.io/cli-utils/pkg/kstatus/status" "sigs.k8s.io/cli-utils/pkg/kstatus/watcher" "sigs.k8s.io/cli-utils/pkg/object" ) type statusWaiter struct { - sw watcher.StatusWatcher - log func(string, ...interface{}) + client dynamic.Interface + restMapper meta.RESTMapper + log func(string, ...interface{}) } func (w *statusWaiter) Wait(resourceList ResourceList, timeout time.Duration) error { ctx, cancel := context.WithTimeout(context.TODO(), timeout) defer cancel() w.log("beginning wait for %d resources with timeout of %s", len(resourceList), timeout) - return w.wait(ctx, resourceList, false) + sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper) + return w.wait(ctx, resourceList, sw) } func (w *statusWaiter) WaitWithJobs(resourceList ResourceList, timeout time.Duration) error { ctx, cancel := context.WithTimeout(context.TODO(), timeout) defer cancel() w.log("beginning wait for %d resources with timeout of %s", len(resourceList), timeout) - return w.wait(ctx, resourceList, true) + sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper) + newCustomJobStatusReader := NewCustomJobStatusReader(w.restMapper) + customSR := statusreaders.NewStatusReader(w.restMapper, newCustomJobStatusReader) + sw.StatusReader = customSR + return w.wait(ctx, resourceList, sw) } func (w *statusWaiter) WaitForDelete(resourceList ResourceList, timeout time.Duration) error { ctx, cancel := context.WithTimeout(context.TODO(), timeout) defer cancel() w.log("beginning wait for %d resources to be deleted with timeout of %s", len(resourceList), timeout) - return w.waitForDelete(ctx, resourceList) + sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper) + return w.waitForDelete(ctx, resourceList, sw) } -func (w *statusWaiter) waitForDelete(ctx context.Context, resourceList ResourceList) error { +func (w *statusWaiter) waitForDelete(ctx context.Context, resourceList ResourceList, sw watcher.StatusWatcher) error { cancelCtx, cancel := context.WithCancel(ctx) defer cancel() resources := []object.ObjMetadata{} @@ -69,7 +78,7 @@ func (w *statusWaiter) waitForDelete(ctx context.Context, resourceList ResourceL } resources = append(resources, obj) } - eventCh := w.sw.Watch(cancelCtx, resources, watcher.Options{}) + eventCh := sw.Watch(cancelCtx, resources, watcher.Options{}) statusCollector := collector.NewResourceStatusCollector(resources) go logResourceStatus(ctx, resources, statusCollector, status.NotFoundStatus, w.log) done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.NotFoundStatus)) @@ -95,16 +104,12 @@ func (w *statusWaiter) waitForDelete(ctx context.Context, resourceList ResourceL return nil } -func (w *statusWaiter) wait(ctx context.Context, resourceList ResourceList, waitForJobs bool) error { +func (w *statusWaiter) wait(ctx context.Context, resourceList ResourceList, sw watcher.StatusWatcher) error { cancelCtx, cancel := context.WithCancel(ctx) defer cancel() resources := []object.ObjMetadata{} for _, resource := range resourceList { switch value := AsVersioned(resource).(type) { - case *batchv1.Job: - if !waitForJobs { - continue - } case *appsv1.Deployment: if value.Spec.Paused { continue @@ -117,7 +122,7 @@ func (w *statusWaiter) wait(ctx context.Context, resourceList ResourceList, wait resources = append(resources, obj) } - eventCh := w.sw.Watch(cancelCtx, resources, watcher.Options{}) + eventCh := sw.Watch(cancelCtx, resources, watcher.Options{}) statusCollector := collector.NewResourceStatusCollector(resources) go logResourceStatus(cancelCtx, resources, statusCollector, status.CurrentStatus, w.log) done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.CurrentStatus)) diff --git a/pkg/kube/statuswait_test.go b/pkg/kube/statuswait_test.go index d853e0012..f3694953c 100644 --- a/pkg/kube/statuswait_test.go +++ b/pkg/kube/statuswait_test.go @@ -38,7 +38,6 @@ import ( "sigs.k8s.io/cli-utils/pkg/kstatus/polling/collector" "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event" "sigs.k8s.io/cli-utils/pkg/kstatus/status" - "sigs.k8s.io/cli-utils/pkg/kstatus/watcher" "sigs.k8s.io/cli-utils/pkg/object" "sigs.k8s.io/cli-utils/pkg/testutil" ) @@ -178,10 +177,10 @@ func TestStatusWaitForDelete(t *testing.T) { appsv1.SchemeGroupVersion.WithKind("Deployment"), batchv1.SchemeGroupVersion.WithKind("Job"), ) - statusWatcher := watcher.NewDefaultStatusWatcher(fakeClient, fakeMapper) statusWaiter := statusWaiter{ - sw: statusWatcher, - log: t.Logf, + restMapper: fakeMapper, + client: fakeClient, + log: t.Logf, } createdObjs := []runtime.Object{} for _, manifest := range tt.manifestsToCreate { @@ -275,10 +274,10 @@ func TestStatusWait(t *testing.T) { appsv1.SchemeGroupVersion.WithKind("Deployment"), batchv1.SchemeGroupVersion.WithKind("Job"), ) - statusWatcher := watcher.NewDefaultStatusWatcher(fakeClient, fakeMapper) statusWaiter := statusWaiter{ - sw: statusWatcher, - log: t.Logf, + client: fakeClient, + restMapper: fakeMapper, + log: t.Logf, } objs := []runtime.Object{} @@ -299,9 +298,12 @@ func TestStatusWait(t *testing.T) { resourceList = append(resourceList, list...) } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - err := statusWaiter.wait(ctx, resourceList, tt.waitForJobs) + var err error + if tt.waitForJobs { + err = statusWaiter.Wait(resourceList, time.Second*3) + } else { + err = statusWaiter.WaitWithJobs(resourceList, time.Second*3) + } if tt.expectErrs != nil { assert.EqualError(t, err, errors.Join(tt.expectErrs...).Error()) return