diff --git a/pkg/action/hooks.go b/pkg/action/hooks.go index 49849a27d..28033395b 100644 --- a/pkg/action/hooks.go +++ b/pkg/action/hooks.go @@ -32,8 +32,11 @@ import ( ) // execHook executes all of the hooks for the given hook event. -func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent, waitStrategy kube.WaitStrategy, timeout time.Duration, serverSideApply bool) error { - shutdown, err := cfg.execHookWithDelayedShutdown(rl, hook, waitStrategy, timeout, serverSideApply) +func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent, + waitStrategy kube.WaitStrategy, waitOptions []kube.WaitOption, + timeout time.Duration, serverSideApply bool) error { + + shutdown, err := cfg.execHookWithDelayedShutdown(rl, hook, waitStrategy, waitOptions, timeout, serverSideApply) if shutdown == nil { return err } @@ -53,7 +56,10 @@ func shutdownNoOp() error { } // execHookWithDelayedShutdown executes all of the hooks for the given hook event and returns a shutdownHook function to trigger deletions after doing other things like e.g. retrieving logs. -func (cfg *Configuration) execHookWithDelayedShutdown(rl *release.Release, hook release.HookEvent, waitStrategy kube.WaitStrategy, timeout time.Duration, serverSideApply bool) (ExecuteShutdownFunc, error) { +func (cfg *Configuration) execHookWithDelayedShutdown(rl *release.Release, hook release.HookEvent, + waitStrategy kube.WaitStrategy, waitOptions []kube.WaitOption, timeout time.Duration, + serverSideApply bool) (ExecuteShutdownFunc, error) { + executingHooks := []*release.Hook{} for _, h := range rl.Hooks { @@ -71,7 +77,7 @@ func (cfg *Configuration) execHookWithDelayedShutdown(rl *release.Release, hook // Set default delete policy to before-hook-creation cfg.hookSetDeletePolicy(h) - if err := cfg.deleteHookByPolicy(h, release.HookBeforeHookCreation, waitStrategy, timeout); err != nil { + if err := cfg.deleteHookByPolicy(h, release.HookBeforeHookCreation, waitStrategy, waitOptions, timeout); err != nil { return shutdownNoOp, err } @@ -101,7 +107,12 @@ func (cfg *Configuration) execHookWithDelayedShutdown(rl *release.Release, hook return shutdownNoOp, fmt.Errorf("warning: Hook %s %s failed: %w", hook, h.Path, err) } - waiter, err := cfg.KubeClient.GetWaiter(waitStrategy) + var waiter kube.Waiter + if c, supportsOptions := cfg.KubeClient.(kube.InterfaceWaitOptions); supportsOptions { + waiter, err = c.GetWaiterWithOptions(waitStrategy, waitOptions...) + } else { + waiter, err = cfg.KubeClient.GetWaiter(waitStrategy) + } if err != nil { return shutdownNoOp, fmt.Errorf("unable to get waiter: %w", err) } @@ -120,14 +131,14 @@ func (cfg *Configuration) execHookWithDelayedShutdown(rl *release.Release, hook // If a hook is failed, check the annotation of the hook to determine whether the hook should be deleted // under failed condition. If so, then clear the corresponding resource object in the hook return func() error { - if errDeleting := cfg.deleteHookByPolicy(h, release.HookFailed, waitStrategy, timeout); errDeleting != nil { + if errDeleting := cfg.deleteHookByPolicy(h, release.HookFailed, waitStrategy, waitOptions, timeout); errDeleting != nil { // We log the error here as we want to propagate the hook failure upwards to the release object. log.Printf("error deleting the hook resource on hook failure: %v", errDeleting) } // If a hook is failed, check the annotation of the previous successful hooks to determine whether the hooks // should be deleted under succeeded condition. - if err := cfg.deleteHooksByPolicy(executingHooks[0:i], release.HookSucceeded, waitStrategy, timeout); err != nil { + if err := cfg.deleteHooksByPolicy(executingHooks[0:i], release.HookSucceeded, waitStrategy, waitOptions, timeout); err != nil { return err } return err @@ -145,7 +156,7 @@ func (cfg *Configuration) execHookWithDelayedShutdown(rl *release.Release, hook // We log here as we still want to attempt hook resource deletion even if output logging fails. log.Printf("error outputting logs for hook failure: %v", err) } - if err := cfg.deleteHookByPolicy(h, release.HookSucceeded, waitStrategy, timeout); err != nil { + if err := cfg.deleteHookByPolicy(h, release.HookSucceeded, waitStrategy, waitOptions, timeout); err != nil { return err } } @@ -166,7 +177,9 @@ func (x hookByWeight) Less(i, j int) bool { } // deleteHookByPolicy deletes a hook if the hook policy instructs it to -func (cfg *Configuration) deleteHookByPolicy(h *release.Hook, policy release.HookDeletePolicy, waitStrategy kube.WaitStrategy, timeout time.Duration) error { +func (cfg *Configuration) deleteHookByPolicy(h *release.Hook, policy release.HookDeletePolicy, + waitStrategy kube.WaitStrategy, waitOptions []kube.WaitOption, timeout time.Duration) error { + // Never delete CustomResourceDefinitions; this could cause lots of // cascading garbage collection. if h.Kind == "CustomResourceDefinition" { @@ -182,7 +195,12 @@ func (cfg *Configuration) deleteHookByPolicy(h *release.Hook, policy release.Hoo return joinErrors(errs, "; ") } - waiter, err := cfg.KubeClient.GetWaiter(waitStrategy) + var waiter kube.Waiter + if c, supportsOptions := cfg.KubeClient.(kube.InterfaceWaitOptions); supportsOptions { + waiter, err = c.GetWaiterWithOptions(waitStrategy, waitOptions...) + } else { + waiter, err = cfg.KubeClient.GetWaiter(waitStrategy) + } if err != nil { return err } @@ -194,9 +212,11 @@ func (cfg *Configuration) deleteHookByPolicy(h *release.Hook, policy release.Hoo } // deleteHooksByPolicy deletes all hooks if the hook policy instructs it to -func (cfg *Configuration) deleteHooksByPolicy(hooks []*release.Hook, policy release.HookDeletePolicy, waitStrategy kube.WaitStrategy, timeout time.Duration) error { +func (cfg *Configuration) deleteHooksByPolicy(hooks []*release.Hook, policy release.HookDeletePolicy, + waitStrategy kube.WaitStrategy, waitOptions []kube.WaitOption, timeout time.Duration) error { + for _, h := range hooks { - if err := cfg.deleteHookByPolicy(h, policy, waitStrategy, timeout); err != nil { + if err := cfg.deleteHookByPolicy(h, policy, waitStrategy, waitOptions, timeout); err != nil { return err } } diff --git a/pkg/action/hooks_test.go b/pkg/action/hooks_test.go index 02b70dda1..0270a0630 100644 --- a/pkg/action/hooks_test.go +++ b/pkg/action/hooks_test.go @@ -18,6 +18,7 @@ package action import ( "bytes" + "context" "fmt" "io" "reflect" @@ -278,8 +279,8 @@ func (h *HookFailingKubeClient) Delete(resources kube.ResourceList, deletionProp return h.PrintingKubeClient.Delete(resources, deletionPropagation) } -func (h *HookFailingKubeClient) GetWaiter(strategy kube.WaitStrategy) (kube.Waiter, error) { - waiter, _ := h.PrintingKubeClient.GetWaiter(strategy) +func (h *HookFailingKubeClient) GetWaiterWithOptions(strategy kube.WaitStrategy, opts ...kube.WaitOption) (kube.Waiter, error) { + waiter, _ := h.PrintingKubeClient.GetWaiterWithOptions(strategy, opts...) return &HookFailingKubeWaiter{ PrintingKubeWaiter: waiter.(*kubefake.PrintingKubeWaiter), failOn: h.failOn, @@ -394,7 +395,7 @@ data: } serverSideApply := true - err := configuration.execHook(&tc.inputRelease, hookEvent, kube.StatusWatcherStrategy, 600, serverSideApply) + err := configuration.execHook(&tc.inputRelease, hookEvent, kube.StatusWatcherStrategy, nil, 600, serverSideApply) if !reflect.DeepEqual(kubeClient.deleteRecord, tc.expectedDeleteRecord) { t.Fatalf("Got unexpected delete record, expected: %#v, but got: %#v", kubeClient.deleteRecord, tc.expectedDeleteRecord) @@ -442,3 +443,51 @@ func TestConfiguration_hookSetDeletePolicy(t *testing.T) { }) } } + +func TestExecHook_WaitOptionsPassedDownstream(t *testing.T) { + is := assert.New(t) + + failer := &kubefake.FailingKubeClient{ + PrintingKubeClient: kubefake.PrintingKubeClient{Out: io.Discard}, + } + + configuration := &Configuration{ + Releases: storage.Init(driver.NewMemory()), + KubeClient: failer, + Capabilities: common.DefaultCapabilities, + } + + rel := &release.Release{ + Name: "test-release", + Namespace: "test", + Hooks: []*release.Hook{ + { + Name: "test-hook", + Kind: "ConfigMap", + Path: "templates/hook.yaml", + Manifest: `apiVersion: v1 +kind: ConfigMap +metadata: + name: test-hook + namespace: test +data: + foo: bar +`, + Weight: 0, + Events: []release.HookEvent{ + release.HookPreInstall, + }, + }, + }, + } + + // Use WithWaitContext as a marker WaitOption that we can track + ctx := context.Background() + waitOptions := []kube.WaitOption{kube.WithWaitContext(ctx)} + + err := configuration.execHook(rel, release.HookPreInstall, kube.StatusWatcherStrategy, waitOptions, 600, false) + is.NoError(err) + + // Verify that WaitOptions were passed to GetWaiter + is.NotEmpty(failer.RecordedWaitOptions, "WaitOptions should be passed to GetWaiter") +} diff --git a/pkg/action/install.go b/pkg/action/install.go index b379d6873..38355491a 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -95,6 +95,7 @@ type Install struct { DisableHooks bool Replace bool WaitStrategy kube.WaitStrategy + WaitOptions []kube.WaitOption WaitForJobs bool Devel bool DependencyUpdate bool @@ -201,7 +202,13 @@ func (i *Install) installCRDs(crds []chart.CRD) error { totalItems = append(totalItems, res...) } if len(totalItems) > 0 { - waiter, err := i.cfg.KubeClient.GetWaiter(i.WaitStrategy) + var waiter kube.Waiter + var err error + if c, supportsOptions := i.cfg.KubeClient.(kube.InterfaceWaitOptions); supportsOptions { + waiter, err = c.GetWaiterWithOptions(i.WaitStrategy, i.WaitOptions...) + } else { + waiter, err = i.cfg.KubeClient.GetWaiter(i.WaitStrategy) + } if err != nil { return fmt.Errorf("unable to get waiter: %w", err) } @@ -480,7 +487,7 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource var err error // pre-install hooks if !i.DisableHooks { - if err := i.cfg.execHook(rel, release.HookPreInstall, i.WaitStrategy, i.Timeout, i.ServerSideApply); err != nil { + if err := i.cfg.execHook(rel, release.HookPreInstall, i.WaitStrategy, i.WaitOptions, i.Timeout, i.ServerSideApply); err != nil { return rel, fmt.Errorf("failed pre-install: %s", err) } } @@ -506,7 +513,12 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource return rel, err } - waiter, err := i.cfg.KubeClient.GetWaiter(i.WaitStrategy) + var waiter kube.Waiter + if c, supportsOptions := i.cfg.KubeClient.(kube.InterfaceWaitOptions); supportsOptions { + waiter, err = c.GetWaiterWithOptions(i.WaitStrategy, i.WaitOptions...) + } else { + waiter, err = i.cfg.KubeClient.GetWaiter(i.WaitStrategy) + } if err != nil { return rel, fmt.Errorf("failed to get waiter: %w", err) } @@ -521,7 +533,7 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource } if !i.DisableHooks { - if err := i.cfg.execHook(rel, release.HookPostInstall, i.WaitStrategy, i.Timeout, i.ServerSideApply); err != nil { + if err := i.cfg.execHook(rel, release.HookPostInstall, i.WaitStrategy, i.WaitOptions, i.Timeout, i.ServerSideApply); err != nil { return rel, fmt.Errorf("failed post-install: %s", err) } } @@ -555,6 +567,7 @@ func (i *Install) failRelease(rel *release.Release, err error) (*release.Release uninstall.KeepHistory = false uninstall.Timeout = i.Timeout uninstall.WaitStrategy = i.WaitStrategy + uninstall.WaitOptions = i.WaitOptions if _, uninstallErr := uninstall.Run(i.ReleaseName); uninstallErr != nil { return rel, fmt.Errorf("an error occurred while uninstalling the release. original install error: %w: %w", err, uninstallErr) } diff --git a/pkg/action/install_test.go b/pkg/action/install_test.go index 9a0ca8d22..47080aef8 100644 --- a/pkg/action/install_test.go +++ b/pkg/action/install_test.go @@ -1186,3 +1186,25 @@ func TestCheckDependencies_MissingDependency(t *testing.T) { assert.ErrorContains(t, CheckDependencies(mockChart, []ci.Dependency{&dependency}), "missing in charts") } + +func TestInstallRelease_WaitOptionsPassedDownstream(t *testing.T) { + is := assert.New(t) + + instAction := installAction(t) + instAction.ReleaseName = "wait-options-test" + instAction.WaitStrategy = kube.StatusWatcherStrategy + + // Use WithWaitContext as a marker WaitOption that we can track + ctx := context.Background() + instAction.WaitOptions = []kube.WaitOption{kube.WithWaitContext(ctx)} + + // Access the underlying FailingKubeClient to check recorded options + failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + + vals := map[string]interface{}{} + _, err := instAction.Run(buildChart(), vals) + is.NoError(err) + + // Verify that WaitOptions were passed to GetWaiter + is.NotEmpty(failer.RecordedWaitOptions, "WaitOptions should be passed to GetWaiter") +} diff --git a/pkg/action/release_testing.go b/pkg/action/release_testing.go index 992cdd701..043a41236 100644 --- a/pkg/action/release_testing.go +++ b/pkg/action/release_testing.go @@ -41,8 +41,9 @@ const ( // // It provides the implementation of 'helm test'. type ReleaseTesting struct { - cfg *Configuration - Timeout time.Duration + cfg *Configuration + Timeout time.Duration + WaitOptions []kube.WaitOption // Used for fetching logs from test pods Namespace string Filters map[string][]string @@ -102,7 +103,7 @@ func (r *ReleaseTesting) Run(name string) (ri.Releaser, ExecuteShutdownFunc, err } serverSideApply := rel.ApplyMethod == string(release.ApplyMethodServerSideApply) - shutdown, err := r.cfg.execHookWithDelayedShutdown(rel, release.HookTest, kube.StatusWatcherStrategy, r.Timeout, serverSideApply) + shutdown, err := r.cfg.execHookWithDelayedShutdown(rel, release.HookTest, kube.StatusWatcherStrategy, r.WaitOptions, r.Timeout, serverSideApply) if err != nil { rel.Hooks = append(skippedHooks, rel.Hooks...) diff --git a/pkg/action/release_testing_test.go b/pkg/action/release_testing_test.go index bece0b475..ab35e104a 100644 --- a/pkg/action/release_testing_test.go +++ b/pkg/action/release_testing_test.go @@ -18,6 +18,7 @@ package action import ( "bytes" + "context" "errors" "io" "os" @@ -27,6 +28,7 @@ import ( "github.com/stretchr/testify/require" "helm.sh/helm/v4/pkg/cli" + "helm.sh/helm/v4/pkg/kube" kubefake "helm.sh/helm/v4/pkg/kube/fake" release "helm.sh/helm/v4/pkg/release/v1" ) @@ -89,3 +91,29 @@ func TestReleaseTestingGetPodLogs_PodRetrievalError(t *testing.T) { require.ErrorContains(t, client.GetPodLogs(&bytes.Buffer{}, &release.Release{Hooks: hooks}), "unable to get pod logs") } + +func TestReleaseTesting_WaitOptionsPassedDownstream(t *testing.T) { + is := assert.New(t) + config := actionConfigFixture(t) + + // Create a release with a test hook + rel := releaseStub() + rel.Name = "wait-options-test-release" + rel.ApplyMethod = "csa" + require.NoError(t, config.Releases.Create(rel)) + + client := NewReleaseTesting(config) + + // Use WithWaitContext as a marker WaitOption that we can track + ctx := context.Background() + client.WaitOptions = []kube.WaitOption{kube.WithWaitContext(ctx)} + + // Access the underlying FailingKubeClient to check recorded options + failer := config.KubeClient.(*kubefake.FailingKubeClient) + + _, _, err := client.Run(rel.Name) + is.NoError(err) + + // Verify that WaitOptions were passed to GetWaiter + is.NotEmpty(failer.RecordedWaitOptions, "WaitOptions should be passed to GetWaiter") +} diff --git a/pkg/action/rollback.go b/pkg/action/rollback.go index 4cdb2d33b..03150532e 100644 --- a/pkg/action/rollback.go +++ b/pkg/action/rollback.go @@ -40,6 +40,7 @@ type Rollback struct { Version int Timeout time.Duration WaitStrategy kube.WaitStrategy + WaitOptions []kube.WaitOption WaitForJobs bool DisableHooks bool // DryRunStrategy can be set to prepare, but not execute the operation and whether or not to interact with the remote cluster @@ -210,7 +211,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas // pre-rollback hooks if !r.DisableHooks { - if err := r.cfg.execHook(targetRelease, release.HookPreRollback, r.WaitStrategy, r.Timeout, serverSideApply); err != nil { + if err := r.cfg.execHook(targetRelease, release.HookPreRollback, r.WaitStrategy, r.WaitOptions, r.Timeout, serverSideApply); err != nil { return targetRelease, err } } else { @@ -251,7 +252,12 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas return targetRelease, err } - waiter, err := r.cfg.KubeClient.GetWaiter(r.WaitStrategy) + var waiter kube.Waiter + if c, supportsOptions := r.cfg.KubeClient.(kube.InterfaceWaitOptions); supportsOptions { + waiter, err = c.GetWaiterWithOptions(r.WaitStrategy, r.WaitOptions...) + } else { + waiter, err = r.cfg.KubeClient.GetWaiter(r.WaitStrategy) + } if err != nil { return nil, fmt.Errorf("unable to get waiter: %w", err) } @@ -273,7 +279,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas // post-rollback hooks if !r.DisableHooks { - if err := r.cfg.execHook(targetRelease, release.HookPostRollback, r.WaitStrategy, r.Timeout, serverSideApply); err != nil { + if err := r.cfg.execHook(targetRelease, release.HookPostRollback, r.WaitStrategy, r.WaitOptions, r.Timeout, serverSideApply); err != nil { return targetRelease, err } } diff --git a/pkg/action/rollback_test.go b/pkg/action/rollback_test.go index 5158bee26..deb6c7c80 100644 --- a/pkg/action/rollback_test.go +++ b/pkg/action/rollback_test.go @@ -17,12 +17,15 @@ limitations under the License. package action import ( + "context" "errors" "io" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "helm.sh/helm/v4/pkg/kube" kubefake "helm.sh/helm/v4/pkg/kube/fake" ) @@ -43,3 +46,40 @@ func TestRollbackRun_UnreachableKubeClient(t *testing.T) { client := NewRollback(config) assert.Error(t, client.Run("")) } + +func TestRollback_WaitOptionsPassedDownstream(t *testing.T) { + is := assert.New(t) + config := actionConfigFixture(t) + + // Create a deployed release and a second version to roll back to + rel := releaseStub() + rel.Name = "wait-options-rollback" + rel.Info.Status = "deployed" + rel.ApplyMethod = "csa" + require.NoError(t, config.Releases.Create(rel)) + + rel2 := releaseStub() + rel2.Name = "wait-options-rollback" + rel2.Version = 2 + rel2.Info.Status = "deployed" + rel2.ApplyMethod = "csa" + require.NoError(t, config.Releases.Create(rel2)) + + client := NewRollback(config) + client.Version = 1 + client.WaitStrategy = kube.StatusWatcherStrategy + client.ServerSideApply = "auto" + + // Use WithWaitContext as a marker WaitOption that we can track + ctx := context.Background() + client.WaitOptions = []kube.WaitOption{kube.WithWaitContext(ctx)} + + // Access the underlying FailingKubeClient to check recorded options + failer := config.KubeClient.(*kubefake.FailingKubeClient) + + err := client.Run(rel.Name) + is.NoError(err) + + // Verify that WaitOptions were passed to GetWaiter + is.NotEmpty(failer.RecordedWaitOptions, "WaitOptions should be passed to GetWaiter") +} diff --git a/pkg/action/uninstall.go b/pkg/action/uninstall.go index efbc72fef..79156991c 100644 --- a/pkg/action/uninstall.go +++ b/pkg/action/uninstall.go @@ -45,6 +45,7 @@ type Uninstall struct { IgnoreNotFound bool KeepHistory bool WaitStrategy kube.WaitStrategy + WaitOptions []kube.WaitOption DeletionPropagation string Timeout time.Duration Description string @@ -63,7 +64,13 @@ func (u *Uninstall) Run(name string) (*releasei.UninstallReleaseResponse, error) return nil, err } - waiter, err := u.cfg.KubeClient.GetWaiter(u.WaitStrategy) + var waiter kube.Waiter + var err error + if c, supportsOptions := u.cfg.KubeClient.(kube.InterfaceWaitOptions); supportsOptions { + waiter, err = c.GetWaiterWithOptions(u.WaitStrategy, u.WaitOptions...) + } else { + waiter, err = u.cfg.KubeClient.GetWaiter(u.WaitStrategy) + } if err != nil { return nil, err } @@ -127,7 +134,7 @@ func (u *Uninstall) Run(name string) (*releasei.UninstallReleaseResponse, error) if !u.DisableHooks { serverSideApply := true - if err := u.cfg.execHook(rel, release.HookPreDelete, u.WaitStrategy, u.Timeout, serverSideApply); err != nil { + if err := u.cfg.execHook(rel, release.HookPreDelete, u.WaitStrategy, u.WaitOptions, u.Timeout, serverSideApply); err != nil { return res, err } } else { @@ -157,7 +164,7 @@ func (u *Uninstall) Run(name string) (*releasei.UninstallReleaseResponse, error) if !u.DisableHooks { serverSideApply := true - if err := u.cfg.execHook(rel, release.HookPostDelete, u.WaitStrategy, u.Timeout, serverSideApply); err != nil { + if err := u.cfg.execHook(rel, release.HookPostDelete, u.WaitStrategy, u.WaitOptions, u.Timeout, serverSideApply); err != nil { errs = append(errs, err) } } diff --git a/pkg/action/uninstall_test.go b/pkg/action/uninstall_test.go index 6c4ad977f..b5a76d983 100644 --- a/pkg/action/uninstall_test.go +++ b/pkg/action/uninstall_test.go @@ -17,6 +17,7 @@ limitations under the License. package action import ( + "context" "errors" "fmt" "io" @@ -169,3 +170,40 @@ func TestUninstallRun_UnreachableKubeClient(t *testing.T) { assert.Nil(t, result) assert.ErrorContains(t, err, "connection refused") } + +func TestUninstall_WaitOptionsPassedDownstream(t *testing.T) { + is := assert.New(t) + + unAction := uninstallAction(t) + unAction.DisableHooks = true + unAction.DryRun = false + unAction.WaitStrategy = kube.StatusWatcherStrategy + + // Use WithWaitContext as a marker WaitOption that we can track + ctx := context.Background() + unAction.WaitOptions = []kube.WaitOption{kube.WithWaitContext(ctx)} + + rel := releaseStub() + rel.Name = "wait-options-uninstall" + rel.Manifest = `{ + "apiVersion": "v1", + "kind": "Secret", + "metadata": { + "name": "secret" + }, + "type": "Opaque", + "data": { + "password": "password" + } + }` + require.NoError(t, unAction.cfg.Releases.Create(rel)) + + // Access the underlying FailingKubeClient to check recorded options + failer := unAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + + _, err := unAction.Run(rel.Name) + is.NoError(err) + + // Verify that WaitOptions were passed to GetWaiter + is.NotEmpty(failer.RecordedWaitOptions, "WaitOptions should be passed to GetWaiter") +} diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index 13d28fd4d..4b99be603 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -72,6 +72,8 @@ type Upgrade struct { Timeout time.Duration // WaitStrategy determines what type of waiting should be done WaitStrategy kube.WaitStrategy + // WaitOptions are additional options for waiting on resources + WaitOptions []kube.WaitOption // WaitForJobs determines whether the wait operation for the Jobs should be performed after the upgrade is requested. WaitForJobs bool // DisableHooks disables hook processing if set to true. @@ -452,7 +454,7 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele // pre-upgrade hooks if !u.DisableHooks { - if err := u.cfg.execHook(upgradedRelease, release.HookPreUpgrade, u.WaitStrategy, u.Timeout, serverSideApply); err != nil { + if err := u.cfg.execHook(upgradedRelease, release.HookPreUpgrade, u.WaitStrategy, u.WaitOptions, u.Timeout, serverSideApply); err != nil { u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %s", err)) return } @@ -473,7 +475,12 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele return } - waiter, err := u.cfg.KubeClient.GetWaiter(u.WaitStrategy) + var waiter kube.Waiter + if c, supportsOptions := u.cfg.KubeClient.(kube.InterfaceWaitOptions); supportsOptions { + waiter, err = c.GetWaiterWithOptions(u.WaitStrategy, u.WaitOptions...) + } else { + waiter, err = u.cfg.KubeClient.GetWaiter(u.WaitStrategy) + } if err != nil { u.cfg.recordRelease(originalRelease) u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) @@ -495,7 +502,7 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele // post-upgrade hooks if !u.DisableHooks { - if err := u.cfg.execHook(upgradedRelease, release.HookPostUpgrade, u.WaitStrategy, u.Timeout, serverSideApply); err != nil { + if err := u.cfg.execHook(upgradedRelease, release.HookPostUpgrade, u.WaitStrategy, u.WaitOptions, u.Timeout, serverSideApply); err != nil { u.reportToPerformUpgrade(c, upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %s", err)) return } @@ -570,6 +577,7 @@ func (u *Upgrade) failRelease(rel *release.Release, created kube.ResourceList, e rollin := NewRollback(u.cfg) rollin.Version = filteredHistory[0].Version rollin.WaitStrategy = u.WaitStrategy + rollin.WaitOptions = u.WaitOptions rollin.WaitForJobs = u.WaitForJobs rollin.DisableHooks = u.DisableHooks rollin.ForceReplace = u.ForceReplace diff --git a/pkg/action/upgrade_test.go b/pkg/action/upgrade_test.go index 17c4927cc..848e8a682 100644 --- a/pkg/action/upgrade_test.go +++ b/pkg/action/upgrade_test.go @@ -775,3 +775,30 @@ func TestObjectKey(t *testing.T) { assert.Equal(t, "apps/v1/Deployment/namespace/name", objectKey(&info)) } + +func TestUpgradeRelease_WaitOptionsPassedDownstream(t *testing.T) { + is := assert.New(t) + req := require.New(t) + + upAction := upgradeAction(t) + rel := releaseStub() + rel.Name = "wait-options-test" + rel.Info.Status = common.StatusDeployed + req.NoError(upAction.cfg.Releases.Create(rel)) + + upAction.WaitStrategy = kube.StatusWatcherStrategy + + // Use WithWaitContext as a marker WaitOption that we can track + ctx := context.Background() + upAction.WaitOptions = []kube.WaitOption{kube.WithWaitContext(ctx)} + + // Access the underlying FailingKubeClient to check recorded options + failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient) + + vals := map[string]interface{}{} + _, err := upAction.Run(rel.Name, buildChart(), vals) + req.NoError(err) + + // Verify that WaitOptions were passed to GetWaiter + is.NotEmpty(failer.RecordedWaitOptions, "WaitOptions should be passed to GetWaiter") +} diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 6e09fceac..2fa855c1e 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -87,6 +87,8 @@ type Client struct { // WaitContext is an optional context to use for wait operations. // If not set, a context will be created internally using the // timeout provided to the wait functions. + // + // Deprecated: Use WithWaitContext wait option when getting a Waiter instead. WaitContext context.Context Waiter @@ -139,7 +141,11 @@ func init() { } } -func (c *Client) newStatusWatcher() (*statusWaiter, error) { +func (c *Client) newStatusWatcher(opts ...WaitOption) (*statusWaiter, error) { + var o waitOptions + for _, opt := range opts { + opt(&o) + } cfg, err := c.Factory.ToRESTConfig() if err != nil { return nil, err @@ -156,14 +162,23 @@ func (c *Client) newStatusWatcher() (*statusWaiter, error) { if err != nil { return nil, err } + waitContext := o.ctx + if waitContext == nil { + waitContext = c.WaitContext + } return &statusWaiter{ restMapper: restMapper, client: dynamicClient, - ctx: c.WaitContext, + ctx: waitContext, + readers: o.statusReaders, }, nil } -func (c *Client) GetWaiter(strategy WaitStrategy) (Waiter, error) { +func (c *Client) GetWaiter(ws WaitStrategy) (Waiter, error) { + return c.GetWaiterWithOptions(ws) +} + +func (c *Client) GetWaiterWithOptions(strategy WaitStrategy, opts ...WaitOption) (Waiter, error) { switch strategy { case LegacyStrategy: kc, err := c.Factory.KubernetesClientSet() @@ -172,9 +187,9 @@ func (c *Client) GetWaiter(strategy WaitStrategy) (Waiter, error) { } return &legacyWaiter{kubeClient: kc, ctx: c.WaitContext}, nil case StatusWatcherStrategy: - return c.newStatusWatcher() + return c.newStatusWatcher(opts...) case HookOnlyStrategy: - sw, err := c.newStatusWatcher() + sw, err := c.newStatusWatcher(opts...) if err != nil { return nil, err } @@ -187,8 +202,12 @@ func (c *Client) GetWaiter(strategy WaitStrategy) (Waiter, error) { } func (c *Client) SetWaiter(ws WaitStrategy) error { + return c.SetWaiterWithOptions(ws) +} + +func (c *Client) SetWaiterWithOptions(ws WaitStrategy, opts ...WaitOption) error { var err error - c.Waiter, err = c.GetWaiter(ws) + c.Waiter, err = c.GetWaiterWithOptions(ws, opts...) if err != nil { return err } diff --git a/pkg/kube/client_test.go b/pkg/kube/client_test.go index d442864f8..fcfd16ee7 100644 --- a/pkg/kube/client_test.go +++ b/pkg/kube/client_test.go @@ -28,6 +28,10 @@ import ( "testing" "time" + "github.com/fluxcd/cli-utils/pkg/kstatus/polling/engine" + "github.com/fluxcd/cli-utils/pkg/kstatus/polling/event" + "github.com/fluxcd/cli-utils/pkg/kstatus/status" + "github.com/fluxcd/cli-utils/pkg/object" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -722,7 +726,7 @@ func TestWait(t *testing.T) { }), } var err error - c.Waiter, err = c.GetWaiter(LegacyStrategy) + c.Waiter, err = c.GetWaiterWithOptions(LegacyStrategy) if err != nil { t.Fatal(err) } @@ -783,7 +787,7 @@ func TestWaitJob(t *testing.T) { }), } var err error - c.Waiter, err = c.GetWaiter(LegacyStrategy) + c.Waiter, err = c.GetWaiterWithOptions(LegacyStrategy) if err != nil { t.Fatal(err) } @@ -845,7 +849,7 @@ func TestWaitDelete(t *testing.T) { }), } var err error - c.Waiter, err = c.GetWaiter(LegacyStrategy) + c.Waiter, err = c.GetWaiterWithOptions(LegacyStrategy) if err != nil { t.Fatal(err) } @@ -1852,7 +1856,7 @@ func TestClientWaitContextCancellationLegacy(t *testing.T) { } var err error - c.Waiter, err = c.GetWaiter(LegacyStrategy) + c.Waiter, err = c.GetWaiterWithOptions(LegacyStrategy) require.NoError(t, err) resources, err := c.Build(objBody(&podList), false) @@ -1907,7 +1911,7 @@ func TestClientWaitWithJobsContextCancellationLegacy(t *testing.T) { } var err error - c.Waiter, err = c.GetWaiter(LegacyStrategy) + c.Waiter, err = c.GetWaiterWithOptions(LegacyStrategy) require.NoError(t, err) resources, err := c.Build(objBody(job), false) @@ -1968,7 +1972,7 @@ func TestClientWaitForDeleteContextCancellationLegacy(t *testing.T) { } var err error - c.Waiter, err = c.GetWaiter(LegacyStrategy) + c.Waiter, err = c.GetWaiterWithOptions(LegacyStrategy) require.NoError(t, err) resources, err := c.Build(objBody(&pod), false) @@ -2030,7 +2034,7 @@ func TestClientWaitContextNilDoesNotPanic(t *testing.T) { } var err error - c.Waiter, err = c.GetWaiter(LegacyStrategy) + c.Waiter, err = c.GetWaiterWithOptions(LegacyStrategy) require.NoError(t, err) resources, err := c.Build(objBody(&podList), false) @@ -2080,7 +2084,7 @@ func TestClientWaitContextPreCancelledLegacy(t *testing.T) { } var err error - c.Waiter, err = c.GetWaiter(LegacyStrategy) + c.Waiter, err = c.GetWaiterWithOptions(LegacyStrategy) require.NoError(t, err) resources, err := c.Build(objBody(&podList), false) @@ -2111,7 +2115,7 @@ metadata: namespace: default ` var err error - c.Waiter, err = c.GetWaiter(StatusWatcherStrategy) + c.Waiter, err = c.GetWaiterWithOptions(StatusWatcherStrategy) require.NoError(t, err) resources, err := c.Build(strings.NewReader(podManifest), false) @@ -2138,7 +2142,7 @@ metadata: namespace: default ` var err error - c.Waiter, err = c.GetWaiter(StatusWatcherStrategy) + c.Waiter, err = c.GetWaiterWithOptions(StatusWatcherStrategy) require.NoError(t, err) resources, err := c.Build(strings.NewReader(jobManifest), false) @@ -2170,7 +2174,7 @@ status: phase: Running ` var err error - c.Waiter, err = c.GetWaiter(StatusWatcherStrategy) + c.Waiter, err = c.GetWaiterWithOptions(StatusWatcherStrategy) require.NoError(t, err) resources, err := c.Build(strings.NewReader(podManifest), false) @@ -2182,3 +2186,100 @@ status: require.Error(t, err) assert.Contains(t, err.Error(), "context canceled", "expected context canceled error, got: %v", err) } + +// testStatusReader is a custom status reader for testing that returns a configurable status. +type testStatusReader struct { + supportedGK schema.GroupKind + status status.Status +} + +func (r *testStatusReader) Supports(gk schema.GroupKind) bool { + return gk == r.supportedGK +} + +func (r *testStatusReader) ReadStatus(_ context.Context, _ engine.ClusterReader, id object.ObjMetadata) (*event.ResourceStatus, error) { + return &event.ResourceStatus{ + Identifier: id, + Status: r.status, + Message: "test status reader", + }, nil +} + +func (r *testStatusReader) ReadStatusForObject(_ context.Context, _ engine.ClusterReader, u *unstructured.Unstructured) (*event.ResourceStatus, error) { + id := object.ObjMetadata{ + Namespace: u.GetNamespace(), + Name: u.GetName(), + GroupKind: u.GroupVersionKind().GroupKind(), + } + return &event.ResourceStatus{ + Identifier: id, + Status: r.status, + Message: "test status reader", + }, nil +} + +func TestClientStatusReadersPassedToStatusWaiter(t *testing.T) { + // This test verifies that Client.StatusReaders is correctly passed through + // to the statusWaiter when using the StatusWatcherStrategy. + // We use a custom status reader that immediately returns CurrentStatus for pods, + // which allows a pod without Ready condition to pass the wait. + podManifest := ` +apiVersion: v1 +kind: Pod +metadata: + name: test-pod + namespace: default +` + + c := newTestClient(t) + statusReaders := []engine.StatusReader{ + &testStatusReader{ + supportedGK: v1.SchemeGroupVersion.WithKind("Pod").GroupKind(), + status: status.CurrentStatus, + }, + } + + var err error + c.Waiter, err = c.GetWaiterWithOptions(StatusWatcherStrategy, WithKStatusReaders(statusReaders...)) + require.NoError(t, err) + + resources, err := c.Build(strings.NewReader(podManifest), false) + require.NoError(t, err) + + // The pod has no Ready condition, but our custom reader returns CurrentStatus, + // so the wait should succeed immediately without timeout. + err = c.Wait(resources, time.Second*3) + require.NoError(t, err) +} + +func TestClientStatusReadersWithWaitWithJobs(t *testing.T) { + // This test verifies that Client.StatusReaders is correctly passed through + // to the statusWaiter when using WaitWithJobs. + jobManifest := ` +apiVersion: batch/v1 +kind: Job +metadata: + name: test-job + namespace: default +` + + c := newTestClient(t) + statusReaders := []engine.StatusReader{ + &testStatusReader{ + supportedGK: schema.GroupKind{Group: "batch", Kind: "Job"}, + status: status.CurrentStatus, + }, + } + + var err error + c.Waiter, err = c.GetWaiterWithOptions(StatusWatcherStrategy, WithKStatusReaders(statusReaders...)) + require.NoError(t, err) + + resources, err := c.Build(strings.NewReader(jobManifest), false) + require.NoError(t, err) + + // The job has no Complete condition, but our custom reader returns CurrentStatus, + // so the wait should succeed immediately without timeout. + err = c.WaitWithJobs(resources, time.Second*3) + require.NoError(t, err) +} diff --git a/pkg/kube/fake/failing_kube_client.go b/pkg/kube/fake/failing_kube_client.go index 31d0082cc..0f7787f79 100644 --- a/pkg/kube/fake/failing_kube_client.go +++ b/pkg/kube/fake/failing_kube_client.go @@ -47,6 +47,8 @@ type FailingKubeClient struct { WaitForDeleteError error WatchUntilReadyError error WaitDuration time.Duration + // RecordedWaitOptions stores the WaitOptions passed to GetWaiter for testing + RecordedWaitOptions []kube.WaitOption } var _ kube.Interface = &FailingKubeClient{} @@ -153,7 +155,13 @@ func (f *FailingKubeClient) BuildTable(r io.Reader, _ bool) (kube.ResourceList, } func (f *FailingKubeClient) GetWaiter(ws kube.WaitStrategy) (kube.Waiter, error) { - waiter, _ := f.PrintingKubeClient.GetWaiter(ws) + return f.GetWaiterWithOptions(ws) +} + +func (f *FailingKubeClient) GetWaiterWithOptions(ws kube.WaitStrategy, opts ...kube.WaitOption) (kube.Waiter, error) { + // Record the WaitOptions for testing + f.RecordedWaitOptions = append(f.RecordedWaitOptions, opts...) + waiter, _ := f.PrintingKubeClient.GetWaiterWithOptions(ws, opts...) printingKubeWaiter, _ := waiter.(*PrintingKubeWaiter) return &FailingKubeWaiter{ PrintingKubeWaiter: printingKubeWaiter, diff --git a/pkg/kube/fake/printer.go b/pkg/kube/fake/printer.go index a7aad1dac..e3fa11576 100644 --- a/pkg/kube/fake/printer.go +++ b/pkg/kube/fake/printer.go @@ -148,7 +148,11 @@ func (p *PrintingKubeClient) DeleteWithPropagationPolicy(resources kube.Resource return &kube.Result{Deleted: resources}, nil } -func (p *PrintingKubeClient) GetWaiter(_ kube.WaitStrategy) (kube.Waiter, error) { +func (p *PrintingKubeClient) GetWaiter(ws kube.WaitStrategy) (kube.Waiter, error) { + return p.GetWaiterWithOptions(ws) +} + +func (p *PrintingKubeClient) GetWaiterWithOptions(_ kube.WaitStrategy, _ ...kube.WaitOption) (kube.Waiter, error) { return &PrintingKubeWaiter{Out: p.Out, LogOutput: p.LogOutput}, nil } diff --git a/pkg/kube/interface.go b/pkg/kube/interface.go index cc934ae1e..63c784751 100644 --- a/pkg/kube/interface.go +++ b/pkg/kube/interface.go @@ -56,7 +56,7 @@ type Interface interface { // IsReachable checks whether the client is able to connect to the cluster. IsReachable() error - // Get Waiter gets the Kube.Waiter + // GetWaiter gets the Kube.Waiter. GetWaiter(ws WaitStrategy) (Waiter, error) // GetPodList lists all pods that match the specified listOptions @@ -99,3 +99,14 @@ type Waiter interface { // error. WatchUntilReady(resources ResourceList, timeout time.Duration) error } + +// InterfaceWaitOptions defines an interface that extends Interface with +// methods that accept wait options. +// +// TODO Helm 5: Remove InterfaceWaitOptions and integrate its method(s) into the Interface. +type InterfaceWaitOptions interface { + // GetWaiter gets the Kube.Waiter with options. + GetWaiterWithOptions(ws WaitStrategy, opts ...WaitOption) (Waiter, error) +} + +var _ InterfaceWaitOptions = (*Client)(nil) diff --git a/pkg/kube/options.go b/pkg/kube/options.go new file mode 100644 index 000000000..49c6229ba --- /dev/null +++ b/pkg/kube/options.go @@ -0,0 +1,45 @@ +/* +Copyright The Helm Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kube + +import ( + "context" + + "github.com/fluxcd/cli-utils/pkg/kstatus/polling/engine" +) + +// WaitOption is a function that configures an option for waiting on resources. +type WaitOption func(*waitOptions) + +// WithWaitContext sets the context for waiting on resources. +func WithWaitContext(ctx context.Context) WaitOption { + return func(wo *waitOptions) { + wo.ctx = ctx + } +} + +// WithKStatusReaders sets the status readers to be used while waiting on resources. +func WithKStatusReaders(readers ...engine.StatusReader) WaitOption { + return func(wo *waitOptions) { + wo.statusReaders = readers + } +} + +type waitOptions struct { + ctx context.Context + statusReaders []engine.StatusReader +} diff --git a/pkg/kube/statuswait.go b/pkg/kube/statuswait.go index a518f0c04..5acc6f19d 100644 --- a/pkg/kube/statuswait.go +++ b/pkg/kube/statuswait.go @@ -45,6 +45,7 @@ type statusWaiter struct { client dynamic.Interface restMapper meta.RESTMapper ctx context.Context + readers []engine.StatusReader } // DefaultStatusWatcherTimeout is the timeout used by the status waiter when a @@ -71,15 +72,13 @@ func (w *statusWaiter) WatchUntilReady(resourceList ResourceList, timeout time.D sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper) jobSR := helmStatusReaders.NewCustomJobStatusReader(w.restMapper) podSR := helmStatusReaders.NewCustomPodStatusReader(w.restMapper) - // We don't want to wait on any other resources as watchUntilReady is only for Helm hooks + // We don't want to wait on any other resources as watchUntilReady is only for Helm hooks. + // If custom readers are defined they can be used as Helm hooks support any resource. + // We put them in front since the DelegatingStatusReader uses the first reader that matches. genericSR := statusreaders.NewGenericStatusReader(w.restMapper, alwaysReady) sr := &statusreaders.DelegatingStatusReader{ - StatusReaders: []engine.StatusReader{ - jobSR, - podSR, - genericSR, - }, + StatusReaders: append(w.readers, jobSR, podSR, genericSR), } sw.StatusReader = sr return w.wait(ctx, resourceList, sw) @@ -93,6 +92,7 @@ func (w *statusWaiter) Wait(resourceList ResourceList, timeout time.Duration) er defer cancel() slog.Debug("waiting for resources", "count", len(resourceList), "timeout", timeout) sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper) + sw.StatusReader = statusreaders.NewStatusReader(w.restMapper, w.readers...) return w.wait(ctx, resourceList, sw) } @@ -105,7 +105,8 @@ func (w *statusWaiter) WaitWithJobs(resourceList ResourceList, timeout time.Dura slog.Debug("waiting for resources", "count", len(resourceList), "timeout", timeout) sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper) newCustomJobStatusReader := helmStatusReaders.NewCustomJobStatusReader(w.restMapper) - customSR := statusreaders.NewStatusReader(w.restMapper, newCustomJobStatusReader) + readers := append(w.readers, newCustomJobStatusReader) + customSR := statusreaders.NewStatusReader(w.restMapper, readers...) sw.StatusReader = customSR return w.wait(ctx, resourceList, sw) } diff --git a/pkg/kube/statuswait_test.go b/pkg/kube/statuswait_test.go index a8ff4e0e6..a4e7ff62c 100644 --- a/pkg/kube/statuswait_test.go +++ b/pkg/kube/statuswait_test.go @@ -14,15 +14,21 @@ See the License for the specific language governing permissions and limitations under the License. */ -package kube // import "helm.sh/helm/v3/pkg/kube" +package kube // import "helm.sh/helm/v4/pkg/kube" import ( + "context" "errors" "fmt" "strings" + "sync/atomic" "testing" "time" + "github.com/fluxcd/cli-utils/pkg/kstatus/polling/engine" + "github.com/fluxcd/cli-utils/pkg/kstatus/polling/event" + "github.com/fluxcd/cli-utils/pkg/kstatus/status" + "github.com/fluxcd/cli-utils/pkg/object" "github.com/fluxcd/cli-utils/pkg/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -933,3 +939,272 @@ func TestStatusWaitMixedResources(t *testing.T) { }) } } + +// mockStatusReader is a custom status reader for testing that tracks when it's used +// and returns a configurable status for resources it supports. +type mockStatusReader struct { + supportedGK schema.GroupKind + status status.Status + callCount atomic.Int32 +} + +func (m *mockStatusReader) Supports(gk schema.GroupKind) bool { + return gk == m.supportedGK +} + +func (m *mockStatusReader) ReadStatus(_ context.Context, _ engine.ClusterReader, id object.ObjMetadata) (*event.ResourceStatus, error) { + m.callCount.Add(1) + return &event.ResourceStatus{ + Identifier: id, + Status: m.status, + Message: "mock status reader", + }, nil +} + +func (m *mockStatusReader) ReadStatusForObject(_ context.Context, _ engine.ClusterReader, u *unstructured.Unstructured) (*event.ResourceStatus, error) { + m.callCount.Add(1) + id := object.ObjMetadata{ + Namespace: u.GetNamespace(), + Name: u.GetName(), + GroupKind: u.GroupVersionKind().GroupKind(), + } + return &event.ResourceStatus{ + Identifier: id, + Status: m.status, + Message: "mock status reader", + }, nil +} + +func TestStatusWaitWithCustomReaders(t *testing.T) { + t.Parallel() + tests := []struct { + name string + objManifests []string + customReader *mockStatusReader + expectErrs []error + }{ + { + name: "custom reader makes pod immediately current", + objManifests: []string{podNoStatusManifest}, + customReader: &mockStatusReader{ + supportedGK: v1.SchemeGroupVersion.WithKind("Pod").GroupKind(), + status: status.CurrentStatus, + }, + expectErrs: nil, + }, + { + name: "custom reader returns in-progress status", + objManifests: []string{podCurrentManifest}, + customReader: &mockStatusReader{ + supportedGK: v1.SchemeGroupVersion.WithKind("Pod").GroupKind(), + status: status.InProgressStatus, + }, + expectErrs: []error{errors.New("resource not ready, name: current-pod, kind: Pod, status: InProgress"), errors.New("context deadline exceeded")}, + }, + { + name: "custom reader for different resource type is not used", + objManifests: []string{podCurrentManifest}, + customReader: &mockStatusReader{ + supportedGK: batchv1.SchemeGroupVersion.WithKind("Job").GroupKind(), + status: status.InProgressStatus, + }, + expectErrs: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + c := newTestClient(t) + fakeClient := dynamicfake.NewSimpleDynamicClient(scheme.Scheme) + fakeMapper := testutil.NewFakeRESTMapper( + v1.SchemeGroupVersion.WithKind("Pod"), + batchv1.SchemeGroupVersion.WithKind("Job"), + ) + statusWaiter := statusWaiter{ + client: fakeClient, + restMapper: fakeMapper, + readers: []engine.StatusReader{tt.customReader}, + } + objs := getRuntimeObjFromManifests(t, tt.objManifests) + for _, obj := range objs { + u := obj.(*unstructured.Unstructured) + gvr := getGVR(t, fakeMapper, u) + err := fakeClient.Tracker().Create(gvr, u, u.GetNamespace()) + assert.NoError(t, err) + } + resourceList := getResourceListFromRuntimeObjs(t, c, objs) + err := statusWaiter.Wait(resourceList, time.Second*3) + if tt.expectErrs != nil { + assert.EqualError(t, err, errors.Join(tt.expectErrs...).Error()) + return + } + assert.NoError(t, err) + }) + } +} + +func TestStatusWaitWithJobsAndCustomReaders(t *testing.T) { + t.Parallel() + tests := []struct { + name string + objManifests []string + customReader *mockStatusReader + expectErrs []error + }{ + { + name: "custom reader makes job immediately current", + objManifests: []string{jobNoStatusManifest}, + customReader: &mockStatusReader{ + supportedGK: batchv1.SchemeGroupVersion.WithKind("Job").GroupKind(), + status: status.CurrentStatus, + }, + expectErrs: nil, + }, + { + name: "custom reader for pod works with WaitWithJobs", + objManifests: []string{podNoStatusManifest}, + customReader: &mockStatusReader{ + supportedGK: v1.SchemeGroupVersion.WithKind("Pod").GroupKind(), + status: status.CurrentStatus, + }, + expectErrs: nil, + }, + { + name: "built-in job reader is still appended after custom readers", + objManifests: []string{jobCompleteManifest}, + customReader: &mockStatusReader{ + supportedGK: v1.SchemeGroupVersion.WithKind("Pod").GroupKind(), + status: status.CurrentStatus, + }, + expectErrs: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + c := newTestClient(t) + fakeClient := dynamicfake.NewSimpleDynamicClient(scheme.Scheme) + fakeMapper := testutil.NewFakeRESTMapper( + v1.SchemeGroupVersion.WithKind("Pod"), + batchv1.SchemeGroupVersion.WithKind("Job"), + ) + statusWaiter := statusWaiter{ + client: fakeClient, + restMapper: fakeMapper, + readers: []engine.StatusReader{tt.customReader}, + } + objs := getRuntimeObjFromManifests(t, tt.objManifests) + for _, obj := range objs { + u := obj.(*unstructured.Unstructured) + gvr := getGVR(t, fakeMapper, u) + err := fakeClient.Tracker().Create(gvr, u, u.GetNamespace()) + assert.NoError(t, err) + } + resourceList := getResourceListFromRuntimeObjs(t, c, objs) + err := statusWaiter.WaitWithJobs(resourceList, time.Second*3) + if tt.expectErrs != nil { + assert.EqualError(t, err, errors.Join(tt.expectErrs...).Error()) + return + } + assert.NoError(t, err) + }) + } +} + +func TestWatchUntilReadyWithCustomReaders(t *testing.T) { + t.Parallel() + tests := []struct { + name string + objManifests []string + customReader *mockStatusReader + expectErrs []error + }{ + { + name: "custom reader makes job immediately current for hooks", + objManifests: []string{jobNoStatusManifest}, + customReader: &mockStatusReader{ + supportedGK: batchv1.SchemeGroupVersion.WithKind("Job").GroupKind(), + status: status.CurrentStatus, + }, + expectErrs: nil, + }, + { + name: "custom reader makes pod immediately current for hooks", + objManifests: []string{podCurrentManifest}, + customReader: &mockStatusReader{ + supportedGK: v1.SchemeGroupVersion.WithKind("Pod").GroupKind(), + status: status.CurrentStatus, + }, + expectErrs: nil, + }, + { + name: "custom reader takes precedence over built-in pod reader", + objManifests: []string{podCompleteManifest}, + customReader: &mockStatusReader{ + supportedGK: v1.SchemeGroupVersion.WithKind("Pod").GroupKind(), + status: status.InProgressStatus, + }, + expectErrs: []error{errors.New("resource not ready, name: good-pod, kind: Pod, status: InProgress"), errors.New("context deadline exceeded")}, + }, + { + name: "custom reader takes precedence over built-in job reader", + objManifests: []string{jobCompleteManifest}, + customReader: &mockStatusReader{ + supportedGK: batchv1.SchemeGroupVersion.WithKind("Job").GroupKind(), + status: status.InProgressStatus, + }, + expectErrs: []error{errors.New("resource not ready, name: test, kind: Job, status: InProgress"), errors.New("context deadline exceeded")}, + }, + { + name: "custom reader for different resource type does not affect pods", + objManifests: []string{podCompleteManifest}, + customReader: &mockStatusReader{ + supportedGK: batchv1.SchemeGroupVersion.WithKind("Job").GroupKind(), + status: status.InProgressStatus, + }, + expectErrs: nil, + }, + { + name: "built-in readers still work when custom reader does not match", + objManifests: []string{jobCompleteManifest}, + customReader: &mockStatusReader{ + supportedGK: v1.SchemeGroupVersion.WithKind("Pod").GroupKind(), + status: status.InProgressStatus, + }, + expectErrs: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + c := newTestClient(t) + fakeClient := dynamicfake.NewSimpleDynamicClient(scheme.Scheme) + fakeMapper := testutil.NewFakeRESTMapper( + v1.SchemeGroupVersion.WithKind("Pod"), + batchv1.SchemeGroupVersion.WithKind("Job"), + ) + statusWaiter := statusWaiter{ + client: fakeClient, + restMapper: fakeMapper, + readers: []engine.StatusReader{tt.customReader}, + } + objs := getRuntimeObjFromManifests(t, tt.objManifests) + for _, obj := range objs { + u := obj.(*unstructured.Unstructured) + gvr := getGVR(t, fakeMapper, u) + err := fakeClient.Tracker().Create(gvr, u, u.GetNamespace()) + assert.NoError(t, err) + } + resourceList := getResourceListFromRuntimeObjs(t, c, objs) + err := statusWaiter.WatchUntilReady(resourceList, time.Second*3) + if tt.expectErrs != nil { + assert.EqualError(t, err, errors.Join(tt.expectErrs...).Error()) + return + } + assert.NoError(t, err) + }) + } +}