From ac9012577a8fccd13371966539fb953d4ff043ea Mon Sep 17 00:00:00 2001 From: Austin Abro Date: Mon, 6 Jan 2025 13:06:54 +0000 Subject: [PATCH] status function Signed-off-by: Austin Abro --- pkg/kube/statuswait.go | 50 +++++++++++++------------------------ pkg/kube/statuswait_test.go | 3 +-- 2 files changed, 19 insertions(+), 34 deletions(-) diff --git a/pkg/kube/statuswait.go b/pkg/kube/statuswait.go index bbc92292d..bec38f7c9 100644 --- a/pkg/kube/statuswait.go +++ b/pkg/kube/statuswait.go @@ -69,22 +69,7 @@ func (w *statusWaiter) waitForDelete(ctx context.Context, resourceList ResourceL } eventCh := w.sw.Watch(cancelCtx, resources, watcher.Options{}) statusCollector := collector.NewResourceStatusCollector(resources) - done := statusCollector.ListenWithObserver(eventCh, collector.ObserverFunc( - func(statusCollector *collector.ResourceStatusCollector, _ event.Event) { - rss := []*event.ResourceStatus{} - for _, rs := range statusCollector.ResourceStatuses { - if rs == nil { - continue - } - rss = append(rss, rs) - } - desired := status.NotFoundStatus - if aggregator.AggregateStatus(rss, desired) == desired { - cancel() - return - } - }), - ) + done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.NotFoundStatus)) <-done if statusCollector.Error != nil { @@ -140,22 +125,7 @@ func (w *statusWaiter) wait(ctx context.Context, resourceList ResourceList, wait } eventCh := w.sw.Watch(cancelCtx, resources, watcher.Options{}) statusCollector := collector.NewResourceStatusCollector(resources) - done := statusCollector.ListenWithObserver(eventCh, collector.ObserverFunc( - func(statusCollector *collector.ResourceStatusCollector, _ event.Event) { - rss := []*event.ResourceStatus{} - for _, rs := range statusCollector.ResourceStatuses { - if rs == nil { - continue - } - rss = append(rss, rs) - } - desired := status.CurrentStatus - if aggregator.AggregateStatus(rss, desired) == desired { - cancel() - return - } - }), - ) + done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.CurrentStatus)) <-done if statusCollector.Error != nil { @@ -177,3 +147,19 @@ func (w *statusWaiter) wait(ctx context.Context, resourceList ResourceList, wait } return nil } + +func statusObserver(cancel context.CancelFunc, desired status.Status) collector.ObserverFunc { + return func(statusCollector *collector.ResourceStatusCollector, _ event.Event) { + rss := []*event.ResourceStatus{} + for _, rs := range statusCollector.ResourceStatuses { + if rs == nil { + continue + } + rss = append(rss, rs) + } + if aggregator.AggregateStatus(rss, desired) == desired { + cancel() + return + } + } +} diff --git a/pkg/kube/statuswait_test.go b/pkg/kube/statuswait_test.go index b018691cd..822204dfe 100644 --- a/pkg/kube/statuswait_test.go +++ b/pkg/kube/statuswait_test.go @@ -19,7 +19,6 @@ package kube // import "helm.sh/helm/v3/pkg/kube" import ( "context" "errors" - "log" "testing" "time" @@ -246,7 +245,7 @@ func TestStatusWait(t *testing.T) { } kwaiter := statusWaiter{ sw: statusWatcher, - log: log.Printf, + log: t.Logf, } resourceList := ResourceList{}