switch client

Signed-off-by: Austin Abro <AustinAbro321@gmail.com>
pull/13604/head
Austin Abro 7 months ago
parent fc10174fa0
commit a590999323
No known key found for this signature in database
GPG Key ID: 92EB5159E403F9D6

@ -37,7 +37,6 @@ import (
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
multierror "github.com/hashicorp/go-multierror"
@ -59,7 +58,6 @@ import (
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/client-go/util/retry"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/statusreaders"
)
// ErrNoObjectsVisited indicates that during a visit operation, no matching objects were found.
@ -133,12 +131,9 @@ func (c *Client) newWaiter(strategy WaitStrategy) (Waiter, error) {
if err != nil {
return nil, err
}
sw := watcher.NewDefaultStatusWatcher(dynamicClient, restMapper)
newCustomJobStatusReader := NewCustomJobStatusReader(restMapper)
customSR := statusreaders.NewStatusReader(restMapper, newCustomJobStatusReader)
sw.StatusReader = customSR
return &statusWaiter{
sw: sw,
restMapper: restMapper,
client: dynamicClient,
log: c.Log,
}, nil
default:

@ -23,17 +23,20 @@ import (
"time"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/dynamic"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/aggregator"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/collector"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/statusreaders"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/object"
)
type statusWaiter struct {
sw watcher.StatusWatcher
client dynamic.Interface
restMapper meta.RESTMapper
log func(string, ...interface{})
}
@ -41,24 +44,30 @@ func (w *statusWaiter) Wait(resourceList ResourceList, timeout time.Duration) er
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
w.log("beginning wait for %d resources with timeout of %s", len(resourceList), timeout)
return w.wait(ctx, resourceList, false)
sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper)
return w.wait(ctx, resourceList, sw)
}
func (w *statusWaiter) WaitWithJobs(resourceList ResourceList, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
w.log("beginning wait for %d resources with timeout of %s", len(resourceList), timeout)
return w.wait(ctx, resourceList, true)
sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper)
newCustomJobStatusReader := NewCustomJobStatusReader(w.restMapper)
customSR := statusreaders.NewStatusReader(w.restMapper, newCustomJobStatusReader)
sw.StatusReader = customSR
return w.wait(ctx, resourceList, sw)
}
func (w *statusWaiter) WaitForDelete(resourceList ResourceList, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
w.log("beginning wait for %d resources to be deleted with timeout of %s", len(resourceList), timeout)
return w.waitForDelete(ctx, resourceList)
sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper)
return w.waitForDelete(ctx, resourceList, sw)
}
func (w *statusWaiter) waitForDelete(ctx context.Context, resourceList ResourceList) error {
func (w *statusWaiter) waitForDelete(ctx context.Context, resourceList ResourceList, sw watcher.StatusWatcher) error {
cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()
resources := []object.ObjMetadata{}
@ -69,7 +78,7 @@ func (w *statusWaiter) waitForDelete(ctx context.Context, resourceList ResourceL
}
resources = append(resources, obj)
}
eventCh := w.sw.Watch(cancelCtx, resources, watcher.Options{})
eventCh := sw.Watch(cancelCtx, resources, watcher.Options{})
statusCollector := collector.NewResourceStatusCollector(resources)
go logResourceStatus(ctx, resources, statusCollector, status.NotFoundStatus, w.log)
done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.NotFoundStatus))
@ -95,16 +104,12 @@ func (w *statusWaiter) waitForDelete(ctx context.Context, resourceList ResourceL
return nil
}
func (w *statusWaiter) wait(ctx context.Context, resourceList ResourceList, waitForJobs bool) error {
func (w *statusWaiter) wait(ctx context.Context, resourceList ResourceList, sw watcher.StatusWatcher) error {
cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()
resources := []object.ObjMetadata{}
for _, resource := range resourceList {
switch value := AsVersioned(resource).(type) {
case *batchv1.Job:
if !waitForJobs {
continue
}
case *appsv1.Deployment:
if value.Spec.Paused {
continue
@ -117,7 +122,7 @@ func (w *statusWaiter) wait(ctx context.Context, resourceList ResourceList, wait
resources = append(resources, obj)
}
eventCh := w.sw.Watch(cancelCtx, resources, watcher.Options{})
eventCh := sw.Watch(cancelCtx, resources, watcher.Options{})
statusCollector := collector.NewResourceStatusCollector(resources)
go logResourceStatus(cancelCtx, resources, statusCollector, status.CurrentStatus, w.log)
done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.CurrentStatus))

@ -38,7 +38,6 @@ import (
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/collector"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/testutil"
)
@ -178,9 +177,9 @@ func TestStatusWaitForDelete(t *testing.T) {
appsv1.SchemeGroupVersion.WithKind("Deployment"),
batchv1.SchemeGroupVersion.WithKind("Job"),
)
statusWatcher := watcher.NewDefaultStatusWatcher(fakeClient, fakeMapper)
statusWaiter := statusWaiter{
sw: statusWatcher,
restMapper: fakeMapper,
client: fakeClient,
log: t.Logf,
}
createdObjs := []runtime.Object{}
@ -275,9 +274,9 @@ func TestStatusWait(t *testing.T) {
appsv1.SchemeGroupVersion.WithKind("Deployment"),
batchv1.SchemeGroupVersion.WithKind("Job"),
)
statusWatcher := watcher.NewDefaultStatusWatcher(fakeClient, fakeMapper)
statusWaiter := statusWaiter{
sw: statusWatcher,
client: fakeClient,
restMapper: fakeMapper,
log: t.Logf,
}
objs := []runtime.Object{}
@ -299,9 +298,12 @@ func TestStatusWait(t *testing.T) {
resourceList = append(resourceList, list...)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
err := statusWaiter.wait(ctx, resourceList, tt.waitForJobs)
var err error
if tt.waitForJobs {
err = statusWaiter.Wait(resourceList, time.Second*3)
} else {
err = statusWaiter.WaitWithJobs(resourceList, time.Second*3)
}
if tt.expectErrs != nil {
assert.EqualError(t, err, errors.Join(tt.expectErrs...).Error())
return

Loading…
Cancel
Save