From a8f53f98ee2206dababbbc0bb8f1037c529488e7 Mon Sep 17 00:00:00 2001 From: Austin Abro Date: Thu, 6 Feb 2025 15:22:14 +0000 Subject: [PATCH] WIP custom status reader Signed-off-by: Austin Abro --- pkg/kube/client.go | 6 +- pkg/kube/job_status_reader.go | 120 +++++++++++++++++++++++++++++ pkg/kube/job_status_reader_test.go | 79 +++++++++++++++++++ 3 files changed, 204 insertions(+), 1 deletion(-) create mode 100644 pkg/kube/job_status_reader.go create mode 100644 pkg/kube/job_status_reader_test.go diff --git a/pkg/kube/client.go b/pkg/kube/client.go index e3fdb8b3b..b4164a8ff 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -59,6 +59,7 @@ import ( watchtools "k8s.io/client-go/tools/watch" "k8s.io/client-go/util/retry" cmdutil "k8s.io/kubectl/pkg/cmd/util" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/statusreaders" ) // ErrNoObjectsVisited indicates that during a visit operation, no matching objects were found. @@ -133,6 +134,9 @@ func (c *Client) newWaiter(strategy WaitStrategy) (Waiter, error) { return nil, err } sw := watcher.NewDefaultStatusWatcher(dynamicClient, restMapper) + newCustomJobStatusReader := NewCustomJobStatusReader(restMapper) + customSR := statusreaders.NewStatusReader(restMapper, newCustomJobStatusReader) + sw.StatusReader = customSR return &statusWaiter{ sw: sw, log: c.Log, @@ -148,7 +152,7 @@ func New(getter genericclioptions.RESTClientGetter, ws WaitStrategy) (*Client, e getter = genericclioptions.NewConfigFlags(true) } factory := cmdutil.NewFactory(getter) - c := &Client{ + c := &Client{ Factory: factory, Log: nopLogger, } diff --git a/pkg/kube/job_status_reader.go b/pkg/kube/job_status_reader.go new file mode 100644 index 000000000..f6eb8d3d9 --- /dev/null +++ b/pkg/kube/job_status_reader.go @@ -0,0 +1,120 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kube + +// This file was copied and modified from https://github.com/fluxcd/kustomize-controller/blob/main/internal/statusreaders/job.go + +import ( + "context" + "fmt" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/statusreaders" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" + "sigs.k8s.io/cli-utils/pkg/object" +) + +type customJobStatusReader struct { + genericStatusReader engine.StatusReader +} + +func NewCustomJobStatusReader(mapper meta.RESTMapper) engine.StatusReader { + genericStatusReader := statusreaders.NewGenericStatusReader(mapper, jobConditions) + return &customJobStatusReader{ + genericStatusReader: genericStatusReader, + } +} + +func (j *customJobStatusReader) Supports(gk schema.GroupKind) bool { + return gk == batchv1.SchemeGroupVersion.WithKind("Job").GroupKind() +} + +func (j *customJobStatusReader) ReadStatus(ctx context.Context, reader engine.ClusterReader, resource object.ObjMetadata) (*event.ResourceStatus, error) { + return j.genericStatusReader.ReadStatus(ctx, reader, resource) +} + +func (j *customJobStatusReader) ReadStatusForObject(ctx context.Context, reader engine.ClusterReader, resource *unstructured.Unstructured) (*event.ResourceStatus, error) { + return j.genericStatusReader.ReadStatusForObject(ctx, reader, resource) +} + +// Ref: https://github.com/kubernetes-sigs/cli-utils/blob/v0.29.4/pkg/kstatus/status/core.go +// Modified to return Current status only when the Job has completed as opposed to when it's in progress. +func jobConditions(u *unstructured.Unstructured) (*status.Result, error) { + obj := u.UnstructuredContent() + + parallelism := status.GetIntField(obj, ".spec.parallelism", 1) + completions := status.GetIntField(obj, ".spec.completions", parallelism) + succeeded := status.GetIntField(obj, ".status.succeeded", 0) + failed := status.GetIntField(obj, ".status.failed", 0) + + // Conditions + // https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/job/utils.go#L24 + objc, err := status.GetObjectWithConditions(obj) + if err != nil { + return nil, err + } + for _, c := range objc.Status.Conditions { + switch c.Type { + case "Complete": + if c.Status == corev1.ConditionTrue { + message := fmt.Sprintf("Job Completed. succeeded: %d/%d", succeeded, completions) + return &status.Result{ + Status: status.CurrentStatus, + Message: message, + Conditions: []status.Condition{}, + }, nil + } + case "Failed": + message := fmt.Sprintf("Job Failed. failed: %d/%d", failed, completions) + if c.Status == corev1.ConditionTrue { + return &status.Result{ + Status: status.FailedStatus, + Message: message, + Conditions: []status.Condition{ + { + Type: status.ConditionStalled, + Status: corev1.ConditionTrue, + Reason: "JobFailed", + Message: message, + }, + }, + }, nil + } + } + } + + message := "Job in progress" + return &status.Result{ + Status: status.InProgressStatus, + Message: message, + Conditions: []status.Condition{ + { + Type: status.ConditionReconciling, + Status: corev1.ConditionTrue, + Reason: "JobInProgress", + Message: message, + }, + }, + }, nil +} diff --git a/pkg/kube/job_status_reader_test.go b/pkg/kube/job_status_reader_test.go new file mode 100644 index 000000000..af372c8b3 --- /dev/null +++ b/pkg/kube/job_status_reader_test.go @@ -0,0 +1,79 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kube + +// This file was copied and modified from https://github.com/fluxcd/kustomize-controller/blob/main/internal/statusreaders/job.go +import ( + "testing" + + "github.com/stretchr/testify/assert" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + + "sigs.k8s.io/cli-utils/pkg/kstatus/status" +) + +func toUnstructured(obj runtime.Object) (*unstructured.Unstructured, error) { + // If the incoming object is already unstructured, perform a deep copy first + // otherwise DefaultUnstructuredConverter ends up returning the inner map without + // making a copy. + if _, ok := obj.(runtime.Unstructured); ok { + obj = obj.DeepCopyObject() + } + rawMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) + if err != nil { + return nil, err + } + return &unstructured.Unstructured{Object: rawMap}, nil +} + +func TestJobConditions(t *testing.T) { + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job", + }, + Spec: batchv1.JobSpec{}, + Status: batchv1.JobStatus{}, + } + + t.Run("job without Complete condition returns InProgress status", func(t *testing.T) { + us, err := toUnstructured(job) + assert.NoError(t, err) + result, err := jobConditions(us) + assert.NoError(t, err) + assert.Equal(t, status.InProgressStatus, result) + }) + + t.Run("job with Complete condition as True returns Current status", func(t *testing.T) { + job.Status = batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobComplete, + Status: corev1.ConditionTrue, + }, + }, + } + us, err := toUnstructured(job) + assert.NoError(t, err) + result, err := jobConditions(us) + assert.NoError(t, err) + assert.Equal(t, status.CurrentStatus, result.Status) + }) +}