pods and jobs working

Signed-off-by: Austin Abro <AustinAbro321@gmail.com>
pull/13604/head
Austin Abro 7 months ago
parent d1cc9b39a3
commit 14391dea5b
No known key found for this signature in database
GPG Key ID: 92EB5159E403F9D6

@ -22,9 +22,7 @@ import (
"context"
"fmt"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -42,13 +40,13 @@ type customPodStatusReader struct {
func NewCustomPodStatusReader(mapper meta.RESTMapper) engine.StatusReader {
genericStatusReader := statusreaders.NewGenericStatusReader(mapper, podConditions)
return &customJobStatusReader{
return &customPodStatusReader{
genericStatusReader: genericStatusReader,
}
}
func (j *customPodStatusReader) Supports(gk schema.GroupKind) bool {
return gk == batchv1.SchemeGroupVersion.WithKind("Job").GroupKind()
return gk == corev1.SchemeGroupVersion.WithKind("Pod").GroupKind()
}
func (j *customPodStatusReader) ReadStatus(ctx context.Context, reader engine.ClusterReader, resource object.ObjMetadata) (*event.ResourceStatus, error) {
@ -62,8 +60,8 @@ func (j *customPodStatusReader) ReadStatusForObject(ctx context.Context, reader
func podConditions(u *unstructured.Unstructured) (*status.Result, error) {
obj := u.UnstructuredContent()
phase := status.GetStringField(obj, ".status.phase", "")
switch v1.PodPhase(phase) {
case v1.PodSucceeded:
switch corev1.PodPhase(phase) {
case corev1.PodSucceeded:
message := fmt.Sprintf("pod %s succeeded", u.GetName())
return &status.Result{
Status: status.CurrentStatus,
@ -76,7 +74,7 @@ func podConditions(u *unstructured.Unstructured) (*status.Result, error) {
},
},
}, nil
case v1.PodFailed:
case corev1.PodFailed:
message := fmt.Sprintf("pod %s failed", u.GetName())
return &status.Result{
Status: status.FailedStatus,

@ -20,13 +20,16 @@ import (
"context"
"errors"
"fmt"
"sort"
"time"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/aggregator"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/collector"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/statusreaders"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
@ -40,9 +43,32 @@ type statusWaiter struct {
log func(string, ...interface{})
}
func (w *statusWaiter) WatchUntilReady(resources ResourceList, timeout time.Duration) error {
func alwaysReady(u *unstructured.Unstructured) (*status.Result, error) {
return &status.Result{
Status: status.CurrentStatus,
Message: "Resource is current",
}, nil
}
return nil
func (w *statusWaiter) WatchUntilReady(resourceList ResourceList, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
w.log("waiting for %d pods and jobs to complete with a timeout of %s", len(resourceList), timeout)
sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper)
jobSR := NewCustomJobStatusReader(w.restMapper)
podSR := NewCustomPodStatusReader(w.restMapper)
// We don't want to wait on any other resources as watchUntilReady is only for Helm hooks
genericSR := statusreaders.NewGenericStatusReader(w.restMapper, alwaysReady)
sr := &statusreaders.DelegatingStatusReader{
StatusReaders: []engine.StatusReader{
jobSR,
podSR,
genericSR,
},
}
sw.StatusReader = sr
return w.wait(ctx, resourceList, sw)
}
func (w *statusWaiter) Wait(resourceList ResourceList, timeout time.Duration) error {
@ -85,8 +111,7 @@ func (w *statusWaiter) waitForDelete(ctx context.Context, resourceList ResourceL
}
eventCh := sw.Watch(cancelCtx, resources, watcher.Options{})
statusCollector := collector.NewResourceStatusCollector(resources)
go logResourceStatus(ctx, resources, statusCollector, status.NotFoundStatus, w.log)
done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.NotFoundStatus))
done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.NotFoundStatus, w.log))
<-done
if statusCollector.Error != nil {
@ -129,8 +154,7 @@ func (w *statusWaiter) wait(ctx context.Context, resourceList ResourceList, sw w
eventCh := sw.Watch(cancelCtx, resources, watcher.Options{})
statusCollector := collector.NewResourceStatusCollector(resources)
go logResourceStatus(cancelCtx, resources, statusCollector, status.CurrentStatus, w.log)
done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.CurrentStatus))
done := statusCollector.ListenWithObserver(eventCh, statusObserver(cancel, status.CurrentStatus, w.log))
<-done
if statusCollector.Error != nil {
@ -153,38 +177,33 @@ func (w *statusWaiter) wait(ctx context.Context, resourceList ResourceList, sw w
return nil
}
func statusObserver(cancel context.CancelFunc, desired status.Status) collector.ObserverFunc {
return func(statusCollector *collector.ResourceStatusCollector, _ event.Event) {
rss := []*event.ResourceStatus{}
func statusObserver(cancel context.CancelFunc, desired status.Status, logFn func(string, ...interface{})) collector.ObserverFunc {
return func(statusCollector *collector.ResourceStatusCollector, e event.Event) {
var rss []*event.ResourceStatus
var nonDesiredResources []*event.ResourceStatus
for _, rs := range statusCollector.ResourceStatuses {
if rs == nil {
continue
}
rss = append(rss, rs)
if rs.Status != desired {
nonDesiredResources = append(nonDesiredResources, rs)
}
}
if aggregator.AggregateStatus(rss, desired) == desired {
cancel()
return
}
}
}
func logResourceStatus(ctx context.Context, resources []object.ObjMetadata, sc *collector.ResourceStatusCollector, desiredStatus status.Status, log func(string, ...interface{})) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
for _, id := range resources {
rs := sc.ResourceStatuses[id]
if rs.Status != desiredStatus {
log("waiting for resource, name: %s, kind: %s, desired status: %s, actual status: %s", rs.Identifier.Name, rs.Identifier.GroupKind.Kind, desiredStatus, rs.Status)
// only log one resource to not overwhelm the logs
break
}
}
if len(nonDesiredResources) > 0 {
// Log only the first resource so the user knows what they're waiting for without being overwhelmed
sort.Slice(nonDesiredResources, func(i, j int) bool {
return nonDesiredResources[i].Identifier.Name < nonDesiredResources[j].Identifier.Name
})
first := nonDesiredResources[0]
logFn("waiting for resource: name: %s, kind: %s, desired status: %s, actual status: %s",
first.Identifier.Name, first.Identifier.GroupKind.Kind, desired, first.Status)
}
}
}

@ -17,9 +17,7 @@ limitations under the License.
package kube // import "helm.sh/helm/v3/pkg/kube"
import (
"context"
"errors"
"fmt"
"testing"
"time"
@ -35,10 +33,6 @@ import (
"k8s.io/apimachinery/pkg/util/yaml"
dynamicfake "k8s.io/client-go/dynamic/fake"
"k8s.io/kubectl/pkg/scheme"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/collector"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/testutil"
)
@ -46,7 +40,7 @@ var podCurrentManifest = `
apiVersion: v1
kind: Pod
metadata:
name: good-pod
name: current-pod
namespace: ns
status:
conditions:
@ -100,11 +94,21 @@ status:
status: "True"
`
var podCompleteManifest = `
apiVersion: v1
kind: Pod
metadata:
name: good-pod
namespace: ns
status:
phase: Succeeded
`
var pausedDeploymentManifest = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx
name: paused
namespace: ns-1
generation: 1
spec:
@ -125,6 +129,30 @@ spec:
- containerPort: 80
`
var notReadyDeploymentManifest = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: not-ready
namespace: ns-1
generation: 1
spec:
replicas: 1
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.19.6
ports:
- containerPort: 80
`
func getGVR(t *testing.T, mapper meta.RESTMapper, obj *unstructured.Unstructured) schema.GroupVersionResource {
gvk := obj.GroupVersionKind()
mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
@ -132,31 +160,6 @@ func getGVR(t *testing.T, mapper meta.RESTMapper, obj *unstructured.Unstructured
return mapping.Resource
}
func TestStatusLogger(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*1500)
defer cancel()
readyPod := object.ObjMetadata{
Name: "readyPod",
GroupKind: schema.GroupKind{Kind: "Pod"},
}
notReadyPod := object.ObjMetadata{
Name: "notReadyPod",
GroupKind: schema.GroupKind{Kind: "Pod"},
}
objs := []object.ObjMetadata{readyPod, notReadyPod}
resourceStatusCollector := collector.NewResourceStatusCollector(objs)
resourceStatusCollector.ResourceStatuses[readyPod] = &event.ResourceStatus{
Identifier: readyPod,
Status: status.CurrentStatus,
}
expectedMessage := "waiting for resource, name: notReadyPod, kind: Pod, desired status: Current, actual status: Unknown"
testLogger := func(message string, args ...interface{}) {
assert.Equal(t, expectedMessage, fmt.Sprintf(message, args...))
}
logResourceStatus(ctx, objs, resourceStatusCollector, status.CurrentStatus, testLogger)
}
func TestStatusWaitForDelete(t *testing.T) {
t.Parallel()
tests := []struct {
@ -175,7 +178,7 @@ func TestStatusWaitForDelete(t *testing.T) {
name: "error when not all objects are deleted",
manifestsToCreate: []string{jobCompleteManifest, podCurrentManifest},
manifestsToDelete: []string{jobCompleteManifest},
expectErrs: []error{errors.New("resource still exists, name: good-pod, kind: Pod, status: Current"), errors.New("context deadline exceeded")},
expectErrs: []error{errors.New("resource still exists, name: current-pod, kind: Pod, status: Current"), errors.New("context deadline exceeded")},
},
}
for _, tt := range tests {
@ -378,3 +381,73 @@ func TestWaitForJobComplete(t *testing.T) {
})
}
}
func TestWatchForReady(t *testing.T) {
t.Parallel()
tests := []struct {
name string
objManifests []string
expectErrs []error
}{
{
name: "succeeds if pod and job are complete",
objManifests: []string{jobCompleteManifest, podCompleteManifest},
},
{
name: "succeeds even when a resource that's not a pod or job is complete",
objManifests: []string{notReadyDeploymentManifest},
},
{
name: "Fails if job is not complete",
objManifests: []string{jobReadyManifest},
expectErrs: []error{errors.New("resource not ready, name: ready-not-complete, kind: Job, status: InProgress"), errors.New("context deadline exceeded")},
},
{
name: "Fails if pod is not complete",
objManifests: []string{podCurrentManifest},
expectErrs: []error{errors.New("resource not ready, name: current-pod, kind: Pod, status: InProgress"), errors.New("context deadline exceeded")},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
c := newTestClient(t)
fakeClient := dynamicfake.NewSimpleDynamicClient(scheme.Scheme)
fakeMapper := testutil.NewFakeRESTMapper(
v1.SchemeGroupVersion.WithKind("Pod"),
appsv1.SchemeGroupVersion.WithKind("Deployment"),
batchv1.SchemeGroupVersion.WithKind("Job"),
)
statusWaiter := statusWaiter{
client: fakeClient,
restMapper: fakeMapper,
log: t.Logf,
}
objs := []runtime.Object{}
for _, podYaml := range tt.objManifests {
m := make(map[string]interface{})
err := yaml.Unmarshal([]byte(podYaml), &m)
assert.NoError(t, err)
resource := &unstructured.Unstructured{Object: m}
objs = append(objs, resource)
gvr := getGVR(t, fakeMapper, resource)
err = fakeClient.Tracker().Create(gvr, resource, resource.GetNamespace())
assert.NoError(t, err)
}
resourceList := ResourceList{}
for _, obj := range objs {
list, err := c.Build(objBody(obj), false)
assert.NoError(t, err)
resourceList = append(resourceList, list...)
}
err := statusWaiter.WatchUntilReady(resourceList, time.Second*3)
if tt.expectErrs != nil {
assert.EqualError(t, err, errors.Join(tt.expectErrs...).Error())
return
}
assert.NoError(t, err)
})
}
}

Loading…
Cancel
Save