From 4c1758143fd5bfed4ed42fa73fd051ae6e90f642 Mon Sep 17 00:00:00 2001 From: Austin Abro Date: Thu, 26 Dec 2024 16:09:54 +0000 Subject: [PATCH] basic design up and balling Signed-off-by: Austin Abro --- pkg/action/action.go | 3 +- pkg/kube/client.go | 99 ++++++++++++++++++++--------------------- pkg/kube/client_test.go | 36 +++++++++++++-- pkg/kube/interface.go | 32 +++++++------ pkg/kube/kready.go | 80 +++++++++++++++++++++++++++++++++ pkg/kube/wait.go | 13 ++++++ 6 files changed, 193 insertions(+), 70 deletions(-) diff --git a/pkg/action/action.go b/pkg/action/action.go index 45f1a14e2..8fa3ae289 100644 --- a/pkg/action/action.go +++ b/pkg/action/action.go @@ -371,7 +371,8 @@ func (cfg *Configuration) recordRelease(r *release.Release) { // Init initializes the action configuration func (cfg *Configuration) Init(getter genericclioptions.RESTClientGetter, namespace, helmDriver string, log DebugLog) error { - kc := kube.New(getter) + // TODO I don't love that this ends up using nil instead of a real watcher + kc := kube.New(getter, nil) kc.Log = log lazyClient := &lazyClient{ diff --git a/pkg/kube/client.go b/pkg/kube/client.go index b38b4b094..b1b1d4835 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -92,6 +92,11 @@ type Client struct { Namespace string kubeClient *kubernetes.Clientset + // Another potential option rather than having the waiter as a field + // would be to have a field that decides what type of waiter to use + // then instantiate it during the method + // of course the fields could take a waiter as well + waiter Waiter } func init() { @@ -105,14 +110,53 @@ func init() { } } +func getStatusWatcher(factory Factory) (watcher.StatusWatcher, error) { + cfg, err := factory.ToRESTConfig() + if err != nil { + return nil, err + } + // factory.DynamicClient() may be a better choice here + dynamicClient, err := dynamic.NewForConfig(cfg) + if err != nil { + return nil, err + } + // Not sure if I should use factory methods to get this http client or I should do this + // For example, I could likely use this as well, but it seems like I should use the factory methods instead + // httpClient, err := rest.HTTPClientFor(cfg) + // if err != nil { + // return err + // } + client, err := factory.RESTClient() + if err != nil { + return nil, err + } + restMapper, err := apiutil.NewDynamicRESTMapper(cfg, client.Client) + if err != nil { + return nil, err + } + sw := watcher.NewDefaultStatusWatcher(dynamicClient, restMapper) + return sw, nil +} + // New creates a new Client. -func New(getter genericclioptions.RESTClientGetter) *Client { +func New(getter genericclioptions.RESTClientGetter, waiter Waiter) *Client { if getter == nil { getter = genericclioptions.NewConfigFlags(true) + } + factory := cmdutil.NewFactory(getter) + if waiter == nil { + sw, err := getStatusWatcher(factory) + if err != nil { + // TODO, likely will move how the stats watcher is created so it doesn't need to be created + // unless it's going to be used + panic(err) + } + waiter = &kstatusWaiter{sw, nopLogger} } return &Client{ - Factory: cmdutil.NewFactory(getter), + Factory: factory, Log: nopLogger, + waiter: waiter, } } @@ -291,44 +335,7 @@ func getResource(info *resource.Info) (runtime.Object, error) { // Wait waits up to the given timeout for the specified resources to be ready. func (c *Client) Wait(resources ResourceList, timeout time.Duration) error { - // cs, err := c.getKubeClient() - // if err != nil { - // return err - // } - // checker := NewReadyChecker(cs, c.Log, PausedAsReady(true)) - // w := waiter{ - // c: checker, - // log: c.Log, - // timeout: timeout, - // } - // w.waitForResources() - cfg, err := c.Factory.ToRESTConfig() - if err != nil { - return err - } - dynamicClient, err := dynamic.NewForConfig(cfg) - if err != nil { - return err - } - // Not sure if I should use factory methods to get this http client or I should do this - // For example, I could likely use this as well, but it seems like I should use the factory methods instead - // httpClient, err := rest.HTTPClientFor(cfg) - // if err != nil { - // return err - // } - client, err := c.Factory.RESTClient() - if err != nil { - return err - } - restMapper, err := apiutil.NewDynamicRESTMapper(cfg, client.Client) - if err != nil { - return err - } - sw := watcher.NewDefaultStatusWatcher(dynamicClient, restMapper) - // return sw, nil - ctx, cancel := context.WithTimeout(context.TODO(), timeout) - defer cancel() - return WaitForReady(ctx, sw, resources) + return c.waiter.Wait(resources, timeout) } // WaitForReady waits for all of the objects to reach a ready state. @@ -389,17 +396,7 @@ func WaitForReady(ctx context.Context, sw watcher.StatusWatcher, resourceList Re // WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs. func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) error { - cs, err := c.getKubeClient() - if err != nil { - return err - } - checker := NewReadyChecker(cs, c.Log, PausedAsReady(true), CheckJobs(true)) - w := waiter{ - c: checker, - log: c.Log, - timeout: timeout, - } - return w.waitForResources(resources) + return c.waiter.WaitWithJobs(resources, timeout) } // WaitForDelete wait up to the given timeout for the specified resources to be deleted. diff --git a/pkg/kube/client_test.go b/pkg/kube/client_test.go index 7f3ba65be..b12897121 100644 --- a/pkg/kube/client_test.go +++ b/pkg/kube/client_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -453,10 +454,10 @@ func TestPerform(t *testing.T) { } } -// Likely it is not possible to get this test to work with kstatus given that it seems +// Likely it is not possible to get this test to work with kstatus given that it seems // kstatus is not making constant get checks on the resources and is instead waiting for events // Potentially the test could be reworked to make the pods after five seconds -// would need this -> +// would need this -> func TestWait(t *testing.T) { podList := newPodList("starfish", "otter", "squid") @@ -517,6 +518,15 @@ func TestWait(t *testing.T) { } }), } + cs, err := c.getKubeClient() + require.NoError(t, err) + checker := NewReadyChecker(cs, c.Log, PausedAsReady(true)) + w := &waiter{ + c: checker, + log: c.Log, + timeout: time.Second * 30, + } + c.waiter = w resources, err := c.Build(objBody(&podList), false) if err != nil { t.Fatal(err) @@ -569,6 +579,15 @@ func TestWaitJob(t *testing.T) { } }), } + cs, err := c.getKubeClient() + require.NoError(t, err) + checker := NewReadyChecker(cs, c.Log, PausedAsReady(true), CheckJobs(true)) + w := &waiter{ + c: checker, + log: c.Log, + timeout: time.Second * 30, + } + c.waiter = w resources, err := c.Build(objBody(job), false) if err != nil { t.Fatal(err) @@ -623,6 +642,15 @@ func TestWaitDelete(t *testing.T) { } }), } + cs, err := c.getKubeClient() + require.NoError(t, err) + checker := NewReadyChecker(cs, c.Log, PausedAsReady(true)) + w := &waiter{ + c: checker, + log: c.Log, + timeout: time.Second * 30, + } + c.waiter = w resources, err := c.Build(objBody(&pod), false) if err != nil { t.Fatal(err) @@ -649,7 +677,7 @@ func TestWaitDelete(t *testing.T) { func TestReal(t *testing.T) { t.Skip("This is a live test, comment this line to run") - c := New(nil) + c := New(nil, nil) resources, err := c.Build(strings.NewReader(guestbookManifest), false) if err != nil { t.Fatal(err) @@ -659,7 +687,7 @@ func TestReal(t *testing.T) { } testSvcEndpointManifest := testServiceManifest + "\n---\n" + testEndpointManifest - c = New(nil) + c = New(nil, nil) resources, err = c.Build(strings.NewReader(testSvcEndpointManifest), false) if err != nil { t.Fatal(err) diff --git a/pkg/kube/interface.go b/pkg/kube/interface.go index af3823a3e..40880005a 100644 --- a/pkg/kube/interface.go +++ b/pkg/kube/interface.go @@ -32,16 +32,13 @@ type Interface interface { // Create creates one or more resources. Create(resources ResourceList) (*Result, error) - // Wait waits up to the given timeout for the specified resources to be ready. - // TODO introduce another interface for the waiting of the KubeClient - Wait(resources ResourceList, timeout time.Duration) error - - // WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs. - WaitWithJobs(resources ResourceList, timeout time.Duration) error - // Delete destroys one or more resources. Delete(resources ResourceList) (*Result, []error) + // Update updates one or more resources or creates the resource + // if it doesn't exist. + Update(original, target ResourceList, force bool) (*Result, error) + // WatchUntilReady watches the resources given and waits until it is ready. // // This method is mainly for hook implementations. It watches for a resource to @@ -51,11 +48,12 @@ type Interface interface { // For Pods, "ready" means the Pod phase is marked "succeeded". // For all other kinds, it means the kind was created or modified without // error. + // TODO: Is watch until ready really behavior we want over the resources actually being ready? WatchUntilReady(resources ResourceList, timeout time.Duration) error - // Update updates one or more resources or creates the resource - // if it doesn't exist. - Update(original, target ResourceList, force bool) (*Result, error) + // WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase + // and returns said phase (PodSucceeded or PodFailed qualify). + WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) // Build creates a resource list from a Reader. // @@ -65,12 +63,18 @@ type Interface interface { // Validates against OpenAPI schema if validate is true. Build(reader io.Reader, validate bool) (ResourceList, error) - // WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase - // and returns said phase (PodSucceeded or PodFailed qualify). - WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) - // IsReachable checks whether the client is able to connect to the cluster. IsReachable() error + Waiter +} + +// Waiter defines methods related to waiting for resource states. +type Waiter interface { + // Wait waits up to the given timeout for the specified resources to be ready. + Wait(resources ResourceList, timeout time.Duration) error + + // WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs. + WaitWithJobs(resources ResourceList, timeout time.Duration) error } // InterfaceExt is introduced to avoid breaking backwards compatibility for Interface implementers. diff --git a/pkg/kube/kready.go b/pkg/kube/kready.go index 0752ba481..c199eecc6 100644 --- a/pkg/kube/kready.go +++ b/pkg/kube/kready.go @@ -16,3 +16,83 @@ limitations under the License. package kube // import "helm.sh/helm/v3/pkg/kube" +import ( + "context" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/aggregator" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/collector" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" + "sigs.k8s.io/cli-utils/pkg/kstatus/watcher" + "sigs.k8s.io/cli-utils/pkg/object" +) + +type kstatusWaiter struct { + // Add any necessary dependencies, e.g., Kubernetes API client. + sw watcher.StatusWatcher + log func(string, ...interface{}) +} + +func (w *kstatusWaiter) Wait(resourceList ResourceList, timeout time.Duration) error { + ctx := context.TODO() + cancelCtx, cancel := context.WithCancel(ctx) + defer cancel() + // TODO maybe a simpler way to transfer the objects + runtimeObjs := []runtime.Object{} + for _, resource := range resourceList { + runtimeObjs = append(runtimeObjs, resource.Object) + } + resources := []object.ObjMetadata{} + for _, runtimeObj := range runtimeObjs { + obj, err := object.RuntimeToObjMeta(runtimeObj) + if err != nil { + return err + } + resources = append(resources, obj) + } + eventCh := w.sw.Watch(cancelCtx, resources, watcher.Options{}) + statusCollector := collector.NewResourceStatusCollector(resources) + done := statusCollector.ListenWithObserver(eventCh, collector.ObserverFunc( + func(statusCollector *collector.ResourceStatusCollector, _ event.Event) { + rss := []*event.ResourceStatus{} + for _, rs := range statusCollector.ResourceStatuses { + if rs == nil { + continue + } + rss = append(rss, rs) + } + desired := status.CurrentStatus + if aggregator.AggregateStatus(rss, desired) == desired { + cancel() + return + } + }), + ) + <-done + + if statusCollector.Error != nil { + return statusCollector.Error + } + + // Only check parent context error, otherwise we would error when desired status is achieved. + if ctx.Err() != nil { + var err error + for _, id := range resources { + rs := statusCollector.ResourceStatuses[id] + if rs.Status == status.CurrentStatus { + continue + } + err = fmt.Errorf("%s: %s not ready, status: %s", rs.Identifier.Name, rs.Identifier.GroupKind.Kind, rs.Status) + } + return fmt.Errorf("not all resources ready: %w: %w", ctx.Err(), err) + } + return nil +} + +func (w *kstatusWaiter) WaitWithJobs(resources ResourceList, timeout time.Duration) error { + // Implementation + panic("not implemented") +} diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index bdafc8255..de00aae47 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -44,6 +44,19 @@ type waiter struct { log func(string, ...interface{}) } +func (w *waiter) Wait(resources ResourceList, timeout time.Duration) error { + w.timeout = timeout + return w.waitForResources(resources) +} + +func (w *waiter) WaitWithJobs(resources ResourceList, timeout time.Duration) error { + // Implementation + // TODO this function doesn't make sense unless you pass a readyChecker to it + // TODO pass context instead + w.timeout = timeout + return w.waitForResources(resources) +} + // waitForResources polls to get the current status of all pods, PVCs, Services and // Jobs(optional) until all are ready or a timeout is reached func (w *waiter) waitForResources(created ResourceList) error {