From 14391dea5bf98c54ca0f9d87c82a5328f4bff063 Mon Sep 17 00:00:00 2001 From: Austin Abro Date: Mon, 10 Feb 2025 15:06:16 +0000 Subject: [PATCH] pods and jobs working Signed-off-by: Austin Abro --- pkg/kube/pod_status_reader.go | 12 ++- pkg/kube/statuswait.go | 75 +++++++++++------- pkg/kube/statuswait_test.go | 141 ++++++++++++++++++++++++++-------- 3 files changed, 159 insertions(+), 69 deletions(-) diff --git a/pkg/kube/pod_status_reader.go b/pkg/kube/pod_status_reader.go index 752f73ac1..c44af542e 100644 --- a/pkg/kube/pod_status_reader.go +++ b/pkg/kube/pod_status_reader.go @@ -22,9 +22,7 @@ import ( "context" "fmt" - batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" @@ -42,13 +40,13 @@ type customPodStatusReader struct { func NewCustomPodStatusReader(mapper meta.RESTMapper) engine.StatusReader { genericStatusReader := statusreaders.NewGenericStatusReader(mapper, podConditions) - return &customJobStatusReader{ + return &customPodStatusReader{ genericStatusReader: genericStatusReader, } } func (j *customPodStatusReader) Supports(gk schema.GroupKind) bool { - return gk == batchv1.SchemeGroupVersion.WithKind("Job").GroupKind() + return gk == corev1.SchemeGroupVersion.WithKind("Pod").GroupKind() } func (j *customPodStatusReader) ReadStatus(ctx context.Context, reader engine.ClusterReader, resource object.ObjMetadata) (*event.ResourceStatus, error) { @@ -62,8 +60,8 @@ func (j *customPodStatusReader) ReadStatusForObject(ctx context.Context, reader func podConditions(u *unstructured.Unstructured) (*status.Result, error) { obj := u.UnstructuredContent() phase := status.GetStringField(obj, ".status.phase", "") - switch v1.PodPhase(phase) { - case v1.PodSucceeded: + switch corev1.PodPhase(phase) { + case corev1.PodSucceeded: message := fmt.Sprintf("pod %s succeeded", u.GetName()) return &status.Result{ Status: status.CurrentStatus, @@ -76,7 +74,7 @@ func podConditions(u *unstructured.Unstructured) (*status.Result, error) { }, }, }, nil - case v1.PodFailed: + case corev1.PodFailed: message := fmt.Sprintf("pod %s failed", u.GetName()) return &status.Result{ Status: status.FailedStatus, diff --git a/pkg/kube/statuswait.go b/pkg/kube/statuswait.go index 16751abba..4aff42ff2 100644 --- a/pkg/kube/statuswait.go +++ b/pkg/kube/statuswait.go @@ -20,13 +20,16 @@ import ( "context" "errors" "fmt" + "sort" "time" appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/client-go/dynamic" "sigs.k8s.io/cli-utils/pkg/kstatus/polling/aggregator" "sigs.k8s.io/cli-utils/pkg/kstatus/polling/collector" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine" "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event" "sigs.k8s.io/cli-utils/pkg/kstatus/polling/statusreaders" "sigs.k8s.io/cli-utils/pkg/kstatus/status" @@ -40,9 +43,32 @@ type statusWaiter struct { log func(string, ...interface{}) } -func (w *statusWaiter) WatchUntilReady(resources ResourceList, timeout time.Duration) error { - - return nil +func alwaysReady(u *unstructured.Unstructured) (*status.Result, error) { + return &status.Result{ + Status: status.CurrentStatus, + Message: "Resource is current", + }, nil +} + +func (w *statusWaiter) WatchUntilReady(resourceList ResourceList, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + w.log("waiting for %d pods and jobs to complete with a timeout of %s", len(resourceList), timeout) + sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper) + jobSR := NewCustomJobStatusReader(w.restMapper) + podSR := NewCustomPodStatusReader(w.restMapper) + // We don't want to wait on any other resources as watchUntilReady is only for Helm hooks + genericSR := statusreaders.NewGenericStatusReader(w.restMapper, alwaysReady) + + sr := &statusreaders.DelegatingStatusReader{ + StatusReaders: []engine.StatusReader{ + jobSR, + podSR, + genericSR, + }, + } + sw.StatusReader = sr + return w.wait(ctx, resourceList, sw) } func (w *statusWaiter) Wait(resourceList ResourceList, timeout time.Duration) error { @@ -85,8 +111,7 @@ func (w *statusWaiter) waitForDelete(ctx context.Context, resourceList ResourceL } eventCh := sw.Watch(cancelCtx, resources, watcher.Options{}) statusCollector := collector.NewResourceStatusCollector(resources) - go logResourceStatus(ctx, resources, statusCollector, status.NotFoundStatus, w.log) - done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.NotFoundStatus)) + done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.NotFoundStatus, w.log)) <-done if statusCollector.Error != nil { @@ -129,8 +154,7 @@ func (w *statusWaiter) wait(ctx context.Context, resourceList ResourceList, sw w eventCh := sw.Watch(cancelCtx, resources, watcher.Options{}) statusCollector := collector.NewResourceStatusCollector(resources) - go logResourceStatus(cancelCtx, resources, statusCollector, status.CurrentStatus, w.log) - done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.CurrentStatus)) + done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.CurrentStatus, w.log)) <-done if statusCollector.Error != nil { @@ -153,38 +177,33 @@ func (w *statusWaiter) wait(ctx context.Context, resourceList ResourceList, sw w return nil } -func statusObserver(cancel context.CancelFunc, desired status.Status) collector.ObserverFunc { - return func(statusCollector *collector.ResourceStatusCollector, _ event.Event) { - rss := []*event.ResourceStatus{} +func statusObserver(cancel context.CancelFunc, desired status.Status, logFn func(string, ...interface{})) collector.ObserverFunc { + return func(statusCollector *collector.ResourceStatusCollector, e event.Event) { + var rss []*event.ResourceStatus + var nonDesiredResources []*event.ResourceStatus for _, rs := range statusCollector.ResourceStatuses { if rs == nil { continue } rss = append(rss, rs) + if rs.Status != desired { + nonDesiredResources = append(nonDesiredResources, rs) + } } + if aggregator.AggregateStatus(rss, desired) == desired { cancel() return } - } -} -func logResourceStatus(ctx context.Context, resources []object.ObjMetadata, sc *collector.ResourceStatusCollector, desiredStatus status.Status, log func(string, ...interface{})) { - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - for _, id := range resources { - rs := sc.ResourceStatuses[id] - if rs.Status != desiredStatus { - log("waiting for resource, name: %s, kind: %s, desired status: %s, actual status: %s", rs.Identifier.Name, rs.Identifier.GroupKind.Kind, desiredStatus, rs.Status) - // only log one resource to not overwhelm the logs - break - } - } + if len(nonDesiredResources) > 0 { + // Log only the first resource so the user knows what they're waiting for without being overwhelmed + sort.Slice(nonDesiredResources, func(i, j int) bool { + return nonDesiredResources[i].Identifier.Name < nonDesiredResources[j].Identifier.Name + }) + first := nonDesiredResources[0] + logFn("waiting for resource: name: %s, kind: %s, desired status: %s, actual status: %s", + first.Identifier.Name, first.Identifier.GroupKind.Kind, desired, first.Status) } } } diff --git a/pkg/kube/statuswait_test.go b/pkg/kube/statuswait_test.go index 131224e8b..df16bf7e9 100644 --- a/pkg/kube/statuswait_test.go +++ b/pkg/kube/statuswait_test.go @@ -17,9 +17,7 @@ limitations under the License. package kube // import "helm.sh/helm/v3/pkg/kube" import ( - "context" "errors" - "fmt" "testing" "time" @@ -35,10 +33,6 @@ import ( "k8s.io/apimachinery/pkg/util/yaml" dynamicfake "k8s.io/client-go/dynamic/fake" "k8s.io/kubectl/pkg/scheme" - "sigs.k8s.io/cli-utils/pkg/kstatus/polling/collector" - "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event" - "sigs.k8s.io/cli-utils/pkg/kstatus/status" - "sigs.k8s.io/cli-utils/pkg/object" "sigs.k8s.io/cli-utils/pkg/testutil" ) @@ -46,7 +40,7 @@ var podCurrentManifest = ` apiVersion: v1 kind: Pod metadata: - name: good-pod + name: current-pod namespace: ns status: conditions: @@ -100,11 +94,21 @@ status: status: "True" ` +var podCompleteManifest = ` +apiVersion: v1 +kind: Pod +metadata: + name: good-pod + namespace: ns +status: + phase: Succeeded +` + var pausedDeploymentManifest = ` apiVersion: apps/v1 kind: Deployment metadata: - name: nginx + name: paused namespace: ns-1 generation: 1 spec: @@ -125,6 +129,30 @@ spec: - containerPort: 80 ` +var notReadyDeploymentManifest = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: not-ready + namespace: ns-1 + generation: 1 +spec: + replicas: 1 + selector: + matchLabels: + app: nginx + template: + metadata: + labels: + app: nginx + spec: + containers: + - name: nginx + image: nginx:1.19.6 + ports: + - containerPort: 80 +` + func getGVR(t *testing.T, mapper meta.RESTMapper, obj *unstructured.Unstructured) schema.GroupVersionResource { gvk := obj.GroupVersionKind() mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) @@ -132,31 +160,6 @@ func getGVR(t *testing.T, mapper meta.RESTMapper, obj *unstructured.Unstructured return mapping.Resource } -func TestStatusLogger(t *testing.T) { - t.Parallel() - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*1500) - defer cancel() - readyPod := object.ObjMetadata{ - Name: "readyPod", - GroupKind: schema.GroupKind{Kind: "Pod"}, - } - notReadyPod := object.ObjMetadata{ - Name: "notReadyPod", - GroupKind: schema.GroupKind{Kind: "Pod"}, - } - objs := []object.ObjMetadata{readyPod, notReadyPod} - resourceStatusCollector := collector.NewResourceStatusCollector(objs) - resourceStatusCollector.ResourceStatuses[readyPod] = &event.ResourceStatus{ - Identifier: readyPod, - Status: status.CurrentStatus, - } - expectedMessage := "waiting for resource, name: notReadyPod, kind: Pod, desired status: Current, actual status: Unknown" - testLogger := func(message string, args ...interface{}) { - assert.Equal(t, expectedMessage, fmt.Sprintf(message, args...)) - } - logResourceStatus(ctx, objs, resourceStatusCollector, status.CurrentStatus, testLogger) -} - func TestStatusWaitForDelete(t *testing.T) { t.Parallel() tests := []struct { @@ -175,7 +178,7 @@ func TestStatusWaitForDelete(t *testing.T) { name: "error when not all objects are deleted", manifestsToCreate: []string{jobCompleteManifest, podCurrentManifest}, manifestsToDelete: []string{jobCompleteManifest}, - expectErrs: []error{errors.New("resource still exists, name: good-pod, kind: Pod, status: Current"), errors.New("context deadline exceeded")}, + expectErrs: []error{errors.New("resource still exists, name: current-pod, kind: Pod, status: Current"), errors.New("context deadline exceeded")}, }, } for _, tt := range tests { @@ -378,3 +381,73 @@ func TestWaitForJobComplete(t *testing.T) { }) } } + +func TestWatchForReady(t *testing.T) { + t.Parallel() + tests := []struct { + name string + objManifests []string + expectErrs []error + }{ + { + name: "succeeds if pod and job are complete", + objManifests: []string{jobCompleteManifest, podCompleteManifest}, + }, + { + name: "succeeds even when a resource that's not a pod or job is complete", + objManifests: []string{notReadyDeploymentManifest}, + }, + { + name: "Fails if job is not complete", + objManifests: []string{jobReadyManifest}, + expectErrs: []error{errors.New("resource not ready, name: ready-not-complete, kind: Job, status: InProgress"), errors.New("context deadline exceeded")}, + }, + { + name: "Fails if pod is not complete", + objManifests: []string{podCurrentManifest}, + expectErrs: []error{errors.New("resource not ready, name: current-pod, kind: Pod, status: InProgress"), errors.New("context deadline exceeded")}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + c := newTestClient(t) + fakeClient := dynamicfake.NewSimpleDynamicClient(scheme.Scheme) + fakeMapper := testutil.NewFakeRESTMapper( + v1.SchemeGroupVersion.WithKind("Pod"), + appsv1.SchemeGroupVersion.WithKind("Deployment"), + batchv1.SchemeGroupVersion.WithKind("Job"), + ) + statusWaiter := statusWaiter{ + client: fakeClient, + restMapper: fakeMapper, + log: t.Logf, + } + objs := []runtime.Object{} + for _, podYaml := range tt.objManifests { + m := make(map[string]interface{}) + err := yaml.Unmarshal([]byte(podYaml), &m) + assert.NoError(t, err) + resource := &unstructured.Unstructured{Object: m} + objs = append(objs, resource) + gvr := getGVR(t, fakeMapper, resource) + err = fakeClient.Tracker().Create(gvr, resource, resource.GetNamespace()) + assert.NoError(t, err) + } + resourceList := ResourceList{} + for _, obj := range objs { + list, err := c.Build(objBody(obj), false) + assert.NoError(t, err) + resourceList = append(resourceList, list...) + } + + err := statusWaiter.WatchUntilReady(resourceList, time.Second*3) + if tt.expectErrs != nil { + assert.EqualError(t, err, errors.Join(tt.expectErrs...).Error()) + return + } + assert.NoError(t, err) + }) + } +}