pkg/kube: introduce support for custom kstatus readers

Signed-off-by: Matheus Pimenta <matheuscscp@gmail.com>
pull/31706/head
Matheus Pimenta 5 days ago
parent 3120e88f9b
commit 59ece92bed
No known key found for this signature in database
GPG Key ID: 4639F038AE28FBFF

@ -32,8 +32,11 @@ import (
)
// execHook executes all of the hooks for the given hook event.
func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent, waitStrategy kube.WaitStrategy, timeout time.Duration, serverSideApply bool) error {
shutdown, err := cfg.execHookWithDelayedShutdown(rl, hook, waitStrategy, timeout, serverSideApply)
func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent,
waitStrategy kube.WaitStrategy, waitOptions []kube.WaitOption,
timeout time.Duration, serverSideApply bool) error {
shutdown, err := cfg.execHookWithDelayedShutdown(rl, hook, waitStrategy, waitOptions, timeout, serverSideApply)
if shutdown == nil {
return err
}
@ -53,7 +56,10 @@ func shutdownNoOp() error {
}
// execHookWithDelayedShutdown executes all of the hooks for the given hook event and returns a shutdownHook function to trigger deletions after doing other things like e.g. retrieving logs.
func (cfg *Configuration) execHookWithDelayedShutdown(rl *release.Release, hook release.HookEvent, waitStrategy kube.WaitStrategy, timeout time.Duration, serverSideApply bool) (ExecuteShutdownFunc, error) {
func (cfg *Configuration) execHookWithDelayedShutdown(rl *release.Release, hook release.HookEvent,
waitStrategy kube.WaitStrategy, waitOptions []kube.WaitOption, timeout time.Duration,
serverSideApply bool) (ExecuteShutdownFunc, error) {
executingHooks := []*release.Hook{}
for _, h := range rl.Hooks {
@ -71,7 +77,7 @@ func (cfg *Configuration) execHookWithDelayedShutdown(rl *release.Release, hook
// Set default delete policy to before-hook-creation
cfg.hookSetDeletePolicy(h)
if err := cfg.deleteHookByPolicy(h, release.HookBeforeHookCreation, waitStrategy, timeout); err != nil {
if err := cfg.deleteHookByPolicy(h, release.HookBeforeHookCreation, waitStrategy, waitOptions, timeout); err != nil {
return shutdownNoOp, err
}
@ -101,7 +107,12 @@ func (cfg *Configuration) execHookWithDelayedShutdown(rl *release.Release, hook
return shutdownNoOp, fmt.Errorf("warning: Hook %s %s failed: %w", hook, h.Path, err)
}
waiter, err := cfg.KubeClient.GetWaiter(waitStrategy)
var waiter kube.Waiter
if c, supportsOptions := cfg.KubeClient.(kube.InterfaceWaitOptions); supportsOptions {
waiter, err = c.GetWaiterWithOptions(waitStrategy, waitOptions...)
} else {
waiter, err = cfg.KubeClient.GetWaiter(waitStrategy)
}
if err != nil {
return shutdownNoOp, fmt.Errorf("unable to get waiter: %w", err)
}
@ -120,14 +131,14 @@ func (cfg *Configuration) execHookWithDelayedShutdown(rl *release.Release, hook
// If a hook is failed, check the annotation of the hook to determine whether the hook should be deleted
// under failed condition. If so, then clear the corresponding resource object in the hook
return func() error {
if errDeleting := cfg.deleteHookByPolicy(h, release.HookFailed, waitStrategy, timeout); errDeleting != nil {
if errDeleting := cfg.deleteHookByPolicy(h, release.HookFailed, waitStrategy, waitOptions, timeout); errDeleting != nil {
// We log the error here as we want to propagate the hook failure upwards to the release object.
log.Printf("error deleting the hook resource on hook failure: %v", errDeleting)
}
// If a hook is failed, check the annotation of the previous successful hooks to determine whether the hooks
// should be deleted under succeeded condition.
if err := cfg.deleteHooksByPolicy(executingHooks[0:i], release.HookSucceeded, waitStrategy, timeout); err != nil {
if err := cfg.deleteHooksByPolicy(executingHooks[0:i], release.HookSucceeded, waitStrategy, waitOptions, timeout); err != nil {
return err
}
return err
@ -145,7 +156,7 @@ func (cfg *Configuration) execHookWithDelayedShutdown(rl *release.Release, hook
// We log here as we still want to attempt hook resource deletion even if output logging fails.
log.Printf("error outputting logs for hook failure: %v", err)
}
if err := cfg.deleteHookByPolicy(h, release.HookSucceeded, waitStrategy, timeout); err != nil {
if err := cfg.deleteHookByPolicy(h, release.HookSucceeded, waitStrategy, waitOptions, timeout); err != nil {
return err
}
}
@ -166,7 +177,9 @@ func (x hookByWeight) Less(i, j int) bool {
}
// deleteHookByPolicy deletes a hook if the hook policy instructs it to
func (cfg *Configuration) deleteHookByPolicy(h *release.Hook, policy release.HookDeletePolicy, waitStrategy kube.WaitStrategy, timeout time.Duration) error {
func (cfg *Configuration) deleteHookByPolicy(h *release.Hook, policy release.HookDeletePolicy,
waitStrategy kube.WaitStrategy, waitOptions []kube.WaitOption, timeout time.Duration) error {
// Never delete CustomResourceDefinitions; this could cause lots of
// cascading garbage collection.
if h.Kind == "CustomResourceDefinition" {
@ -182,7 +195,12 @@ func (cfg *Configuration) deleteHookByPolicy(h *release.Hook, policy release.Hoo
return joinErrors(errs, "; ")
}
waiter, err := cfg.KubeClient.GetWaiter(waitStrategy)
var waiter kube.Waiter
if c, supportsOptions := cfg.KubeClient.(kube.InterfaceWaitOptions); supportsOptions {
waiter, err = c.GetWaiterWithOptions(waitStrategy, waitOptions...)
} else {
waiter, err = cfg.KubeClient.GetWaiter(waitStrategy)
}
if err != nil {
return err
}
@ -194,9 +212,11 @@ func (cfg *Configuration) deleteHookByPolicy(h *release.Hook, policy release.Hoo
}
// deleteHooksByPolicy deletes all hooks if the hook policy instructs it to
func (cfg *Configuration) deleteHooksByPolicy(hooks []*release.Hook, policy release.HookDeletePolicy, waitStrategy kube.WaitStrategy, timeout time.Duration) error {
func (cfg *Configuration) deleteHooksByPolicy(hooks []*release.Hook, policy release.HookDeletePolicy,
waitStrategy kube.WaitStrategy, waitOptions []kube.WaitOption, timeout time.Duration) error {
for _, h := range hooks {
if err := cfg.deleteHookByPolicy(h, policy, waitStrategy, timeout); err != nil {
if err := cfg.deleteHookByPolicy(h, policy, waitStrategy, waitOptions, timeout); err != nil {
return err
}
}

@ -18,6 +18,7 @@ package action
import (
"bytes"
"context"
"fmt"
"io"
"reflect"
@ -278,8 +279,8 @@ func (h *HookFailingKubeClient) Delete(resources kube.ResourceList, deletionProp
return h.PrintingKubeClient.Delete(resources, deletionPropagation)
}
func (h *HookFailingKubeClient) GetWaiter(strategy kube.WaitStrategy) (kube.Waiter, error) {
waiter, _ := h.PrintingKubeClient.GetWaiter(strategy)
func (h *HookFailingKubeClient) GetWaiterWithOptions(strategy kube.WaitStrategy, opts ...kube.WaitOption) (kube.Waiter, error) {
waiter, _ := h.PrintingKubeClient.GetWaiterWithOptions(strategy, opts...)
return &HookFailingKubeWaiter{
PrintingKubeWaiter: waiter.(*kubefake.PrintingKubeWaiter),
failOn: h.failOn,
@ -394,7 +395,7 @@ data:
}
serverSideApply := true
err := configuration.execHook(&tc.inputRelease, hookEvent, kube.StatusWatcherStrategy, 600, serverSideApply)
err := configuration.execHook(&tc.inputRelease, hookEvent, kube.StatusWatcherStrategy, nil, 600, serverSideApply)
if !reflect.DeepEqual(kubeClient.deleteRecord, tc.expectedDeleteRecord) {
t.Fatalf("Got unexpected delete record, expected: %#v, but got: %#v", kubeClient.deleteRecord, tc.expectedDeleteRecord)
@ -442,3 +443,51 @@ func TestConfiguration_hookSetDeletePolicy(t *testing.T) {
})
}
}
func TestExecHook_WaitOptionsPassedDownstream(t *testing.T) {
is := assert.New(t)
failer := &kubefake.FailingKubeClient{
PrintingKubeClient: kubefake.PrintingKubeClient{Out: io.Discard},
}
configuration := &Configuration{
Releases: storage.Init(driver.NewMemory()),
KubeClient: failer,
Capabilities: common.DefaultCapabilities,
}
rel := &release.Release{
Name: "test-release",
Namespace: "test",
Hooks: []*release.Hook{
{
Name: "test-hook",
Kind: "ConfigMap",
Path: "templates/hook.yaml",
Manifest: `apiVersion: v1
kind: ConfigMap
metadata:
name: test-hook
namespace: test
data:
foo: bar
`,
Weight: 0,
Events: []release.HookEvent{
release.HookPreInstall,
},
},
},
}
// Use WithWaitContext as a marker WaitOption that we can track
ctx := context.Background()
waitOptions := []kube.WaitOption{kube.WithWaitContext(ctx)}
err := configuration.execHook(rel, release.HookPreInstall, kube.StatusWatcherStrategy, waitOptions, 600, false)
is.NoError(err)
// Verify that WaitOptions were passed to GetWaiter
is.NotEmpty(failer.RecordedWaitOptions, "WaitOptions should be passed to GetWaiter")
}

@ -95,6 +95,7 @@ type Install struct {
DisableHooks bool
Replace bool
WaitStrategy kube.WaitStrategy
WaitOptions []kube.WaitOption
WaitForJobs bool
Devel bool
DependencyUpdate bool
@ -201,7 +202,13 @@ func (i *Install) installCRDs(crds []chart.CRD) error {
totalItems = append(totalItems, res...)
}
if len(totalItems) > 0 {
waiter, err := i.cfg.KubeClient.GetWaiter(i.WaitStrategy)
var waiter kube.Waiter
var err error
if c, supportsOptions := i.cfg.KubeClient.(kube.InterfaceWaitOptions); supportsOptions {
waiter, err = c.GetWaiterWithOptions(i.WaitStrategy, i.WaitOptions...)
} else {
waiter, err = i.cfg.KubeClient.GetWaiter(i.WaitStrategy)
}
if err != nil {
return fmt.Errorf("unable to get waiter: %w", err)
}
@ -480,7 +487,7 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource
var err error
// pre-install hooks
if !i.DisableHooks {
if err := i.cfg.execHook(rel, release.HookPreInstall, i.WaitStrategy, i.Timeout, i.ServerSideApply); err != nil {
if err := i.cfg.execHook(rel, release.HookPreInstall, i.WaitStrategy, i.WaitOptions, i.Timeout, i.ServerSideApply); err != nil {
return rel, fmt.Errorf("failed pre-install: %s", err)
}
}
@ -506,7 +513,12 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource
return rel, err
}
waiter, err := i.cfg.KubeClient.GetWaiter(i.WaitStrategy)
var waiter kube.Waiter
if c, supportsOptions := i.cfg.KubeClient.(kube.InterfaceWaitOptions); supportsOptions {
waiter, err = c.GetWaiterWithOptions(i.WaitStrategy, i.WaitOptions...)
} else {
waiter, err = i.cfg.KubeClient.GetWaiter(i.WaitStrategy)
}
if err != nil {
return rel, fmt.Errorf("failed to get waiter: %w", err)
}
@ -521,7 +533,7 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource
}
if !i.DisableHooks {
if err := i.cfg.execHook(rel, release.HookPostInstall, i.WaitStrategy, i.Timeout, i.ServerSideApply); err != nil {
if err := i.cfg.execHook(rel, release.HookPostInstall, i.WaitStrategy, i.WaitOptions, i.Timeout, i.ServerSideApply); err != nil {
return rel, fmt.Errorf("failed post-install: %s", err)
}
}
@ -555,6 +567,7 @@ func (i *Install) failRelease(rel *release.Release, err error) (*release.Release
uninstall.KeepHistory = false
uninstall.Timeout = i.Timeout
uninstall.WaitStrategy = i.WaitStrategy
uninstall.WaitOptions = i.WaitOptions
if _, uninstallErr := uninstall.Run(i.ReleaseName); uninstallErr != nil {
return rel, fmt.Errorf("an error occurred while uninstalling the release. original install error: %w: %w", err, uninstallErr)
}

@ -1186,3 +1186,25 @@ func TestCheckDependencies_MissingDependency(t *testing.T) {
assert.ErrorContains(t, CheckDependencies(mockChart, []ci.Dependency{&dependency}), "missing in charts")
}
func TestInstallRelease_WaitOptionsPassedDownstream(t *testing.T) {
is := assert.New(t)
instAction := installAction(t)
instAction.ReleaseName = "wait-options-test"
instAction.WaitStrategy = kube.StatusWatcherStrategy
// Use WithWaitContext as a marker WaitOption that we can track
ctx := context.Background()
instAction.WaitOptions = []kube.WaitOption{kube.WithWaitContext(ctx)}
// Access the underlying FailingKubeClient to check recorded options
failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
vals := map[string]interface{}{}
_, err := instAction.Run(buildChart(), vals)
is.NoError(err)
// Verify that WaitOptions were passed to GetWaiter
is.NotEmpty(failer.RecordedWaitOptions, "WaitOptions should be passed to GetWaiter")
}

@ -41,8 +41,9 @@ const (
//
// It provides the implementation of 'helm test'.
type ReleaseTesting struct {
cfg *Configuration
Timeout time.Duration
cfg *Configuration
Timeout time.Duration
WaitOptions []kube.WaitOption
// Used for fetching logs from test pods
Namespace string
Filters map[string][]string
@ -102,7 +103,7 @@ func (r *ReleaseTesting) Run(name string) (ri.Releaser, ExecuteShutdownFunc, err
}
serverSideApply := rel.ApplyMethod == string(release.ApplyMethodServerSideApply)
shutdown, err := r.cfg.execHookWithDelayedShutdown(rel, release.HookTest, kube.StatusWatcherStrategy, r.Timeout, serverSideApply)
shutdown, err := r.cfg.execHookWithDelayedShutdown(rel, release.HookTest, kube.StatusWatcherStrategy, r.WaitOptions, r.Timeout, serverSideApply)
if err != nil {
rel.Hooks = append(skippedHooks, rel.Hooks...)

@ -18,6 +18,7 @@ package action
import (
"bytes"
"context"
"errors"
"io"
"os"
@ -27,6 +28,7 @@ import (
"github.com/stretchr/testify/require"
"helm.sh/helm/v4/pkg/cli"
"helm.sh/helm/v4/pkg/kube"
kubefake "helm.sh/helm/v4/pkg/kube/fake"
release "helm.sh/helm/v4/pkg/release/v1"
)
@ -89,3 +91,29 @@ func TestReleaseTestingGetPodLogs_PodRetrievalError(t *testing.T) {
require.ErrorContains(t, client.GetPodLogs(&bytes.Buffer{}, &release.Release{Hooks: hooks}), "unable to get pod logs")
}
func TestReleaseTesting_WaitOptionsPassedDownstream(t *testing.T) {
is := assert.New(t)
config := actionConfigFixture(t)
// Create a release with a test hook
rel := releaseStub()
rel.Name = "wait-options-test-release"
rel.ApplyMethod = "csa"
require.NoError(t, config.Releases.Create(rel))
client := NewReleaseTesting(config)
// Use WithWaitContext as a marker WaitOption that we can track
ctx := context.Background()
client.WaitOptions = []kube.WaitOption{kube.WithWaitContext(ctx)}
// Access the underlying FailingKubeClient to check recorded options
failer := config.KubeClient.(*kubefake.FailingKubeClient)
_, _, err := client.Run(rel.Name)
is.NoError(err)
// Verify that WaitOptions were passed to GetWaiter
is.NotEmpty(failer.RecordedWaitOptions, "WaitOptions should be passed to GetWaiter")
}

@ -40,6 +40,7 @@ type Rollback struct {
Version int
Timeout time.Duration
WaitStrategy kube.WaitStrategy
WaitOptions []kube.WaitOption
WaitForJobs bool
DisableHooks bool
// DryRunStrategy can be set to prepare, but not execute the operation and whether or not to interact with the remote cluster
@ -210,7 +211,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
// pre-rollback hooks
if !r.DisableHooks {
if err := r.cfg.execHook(targetRelease, release.HookPreRollback, r.WaitStrategy, r.Timeout, serverSideApply); err != nil {
if err := r.cfg.execHook(targetRelease, release.HookPreRollback, r.WaitStrategy, r.WaitOptions, r.Timeout, serverSideApply); err != nil {
return targetRelease, err
}
} else {
@ -251,7 +252,12 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
return targetRelease, err
}
waiter, err := r.cfg.KubeClient.GetWaiter(r.WaitStrategy)
var waiter kube.Waiter
if c, supportsOptions := r.cfg.KubeClient.(kube.InterfaceWaitOptions); supportsOptions {
waiter, err = c.GetWaiterWithOptions(r.WaitStrategy, r.WaitOptions...)
} else {
waiter, err = r.cfg.KubeClient.GetWaiter(r.WaitStrategy)
}
if err != nil {
return nil, fmt.Errorf("unable to get waiter: %w", err)
}
@ -273,7 +279,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
// post-rollback hooks
if !r.DisableHooks {
if err := r.cfg.execHook(targetRelease, release.HookPostRollback, r.WaitStrategy, r.Timeout, serverSideApply); err != nil {
if err := r.cfg.execHook(targetRelease, release.HookPostRollback, r.WaitStrategy, r.WaitOptions, r.Timeout, serverSideApply); err != nil {
return targetRelease, err
}
}

@ -17,12 +17,15 @@ limitations under the License.
package action
import (
"context"
"errors"
"io"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"helm.sh/helm/v4/pkg/kube"
kubefake "helm.sh/helm/v4/pkg/kube/fake"
)
@ -43,3 +46,40 @@ func TestRollbackRun_UnreachableKubeClient(t *testing.T) {
client := NewRollback(config)
assert.Error(t, client.Run(""))
}
func TestRollback_WaitOptionsPassedDownstream(t *testing.T) {
is := assert.New(t)
config := actionConfigFixture(t)
// Create a deployed release and a second version to roll back to
rel := releaseStub()
rel.Name = "wait-options-rollback"
rel.Info.Status = "deployed"
rel.ApplyMethod = "csa"
require.NoError(t, config.Releases.Create(rel))
rel2 := releaseStub()
rel2.Name = "wait-options-rollback"
rel2.Version = 2
rel2.Info.Status = "deployed"
rel2.ApplyMethod = "csa"
require.NoError(t, config.Releases.Create(rel2))
client := NewRollback(config)
client.Version = 1
client.WaitStrategy = kube.StatusWatcherStrategy
client.ServerSideApply = "auto"
// Use WithWaitContext as a marker WaitOption that we can track
ctx := context.Background()
client.WaitOptions = []kube.WaitOption{kube.WithWaitContext(ctx)}
// Access the underlying FailingKubeClient to check recorded options
failer := config.KubeClient.(*kubefake.FailingKubeClient)
err := client.Run(rel.Name)
is.NoError(err)
// Verify that WaitOptions were passed to GetWaiter
is.NotEmpty(failer.RecordedWaitOptions, "WaitOptions should be passed to GetWaiter")
}

@ -45,6 +45,7 @@ type Uninstall struct {
IgnoreNotFound bool
KeepHistory bool
WaitStrategy kube.WaitStrategy
WaitOptions []kube.WaitOption
DeletionPropagation string
Timeout time.Duration
Description string
@ -63,7 +64,13 @@ func (u *Uninstall) Run(name string) (*releasei.UninstallReleaseResponse, error)
return nil, err
}
waiter, err := u.cfg.KubeClient.GetWaiter(u.WaitStrategy)
var waiter kube.Waiter
var err error
if c, supportsOptions := u.cfg.KubeClient.(kube.InterfaceWaitOptions); supportsOptions {
waiter, err = c.GetWaiterWithOptions(u.WaitStrategy, u.WaitOptions...)
} else {
waiter, err = u.cfg.KubeClient.GetWaiter(u.WaitStrategy)
}
if err != nil {
return nil, err
}
@ -127,7 +134,7 @@ func (u *Uninstall) Run(name string) (*releasei.UninstallReleaseResponse, error)
if !u.DisableHooks {
serverSideApply := true
if err := u.cfg.execHook(rel, release.HookPreDelete, u.WaitStrategy, u.Timeout, serverSideApply); err != nil {
if err := u.cfg.execHook(rel, release.HookPreDelete, u.WaitStrategy, u.WaitOptions, u.Timeout, serverSideApply); err != nil {
return res, err
}
} else {
@ -157,7 +164,7 @@ func (u *Uninstall) Run(name string) (*releasei.UninstallReleaseResponse, error)
if !u.DisableHooks {
serverSideApply := true
if err := u.cfg.execHook(rel, release.HookPostDelete, u.WaitStrategy, u.Timeout, serverSideApply); err != nil {
if err := u.cfg.execHook(rel, release.HookPostDelete, u.WaitStrategy, u.WaitOptions, u.Timeout, serverSideApply); err != nil {
errs = append(errs, err)
}
}

@ -17,6 +17,7 @@ limitations under the License.
package action
import (
"context"
"errors"
"fmt"
"io"
@ -169,3 +170,40 @@ func TestUninstallRun_UnreachableKubeClient(t *testing.T) {
assert.Nil(t, result)
assert.ErrorContains(t, err, "connection refused")
}
func TestUninstall_WaitOptionsPassedDownstream(t *testing.T) {
is := assert.New(t)
unAction := uninstallAction(t)
unAction.DisableHooks = true
unAction.DryRun = false
unAction.WaitStrategy = kube.StatusWatcherStrategy
// Use WithWaitContext as a marker WaitOption that we can track
ctx := context.Background()
unAction.WaitOptions = []kube.WaitOption{kube.WithWaitContext(ctx)}
rel := releaseStub()
rel.Name = "wait-options-uninstall"
rel.Manifest = `{
"apiVersion": "v1",
"kind": "Secret",
"metadata": {
"name": "secret"
},
"type": "Opaque",
"data": {
"password": "password"
}
}`
require.NoError(t, unAction.cfg.Releases.Create(rel))
// Access the underlying FailingKubeClient to check recorded options
failer := unAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
_, err := unAction.Run(rel.Name)
is.NoError(err)
// Verify that WaitOptions were passed to GetWaiter
is.NotEmpty(failer.RecordedWaitOptions, "WaitOptions should be passed to GetWaiter")
}

@ -72,6 +72,8 @@ type Upgrade struct {
Timeout time.Duration
// WaitStrategy determines what type of waiting should be done
WaitStrategy kube.WaitStrategy
// WaitOptions are additional options for waiting on resources
WaitOptions []kube.WaitOption
// WaitForJobs determines whether the wait operation for the Jobs should be performed after the upgrade is requested.
WaitForJobs bool
// DisableHooks disables hook processing if set to true.
@ -452,7 +454,7 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele
// pre-upgrade hooks
if !u.DisableHooks {
if err := u.cfg.execHook(upgradedRelease, release.HookPreUpgrade, u.WaitStrategy, u.Timeout, serverSideApply); err != nil {
if err := u.cfg.execHook(upgradedRelease, release.HookPreUpgrade, u.WaitStrategy, u.WaitOptions, u.Timeout, serverSideApply); err != nil {
u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %s", err))
return
}
@ -473,7 +475,12 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele
return
}
waiter, err := u.cfg.KubeClient.GetWaiter(u.WaitStrategy)
var waiter kube.Waiter
if c, supportsOptions := u.cfg.KubeClient.(kube.InterfaceWaitOptions); supportsOptions {
waiter, err = c.GetWaiterWithOptions(u.WaitStrategy, u.WaitOptions...)
} else {
waiter, err = u.cfg.KubeClient.GetWaiter(u.WaitStrategy)
}
if err != nil {
u.cfg.recordRelease(originalRelease)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)
@ -495,7 +502,7 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele
// post-upgrade hooks
if !u.DisableHooks {
if err := u.cfg.execHook(upgradedRelease, release.HookPostUpgrade, u.WaitStrategy, u.Timeout, serverSideApply); err != nil {
if err := u.cfg.execHook(upgradedRelease, release.HookPostUpgrade, u.WaitStrategy, u.WaitOptions, u.Timeout, serverSideApply); err != nil {
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %s", err))
return
}
@ -570,6 +577,7 @@ func (u *Upgrade) failRelease(rel *release.Release, created kube.ResourceList, e
rollin := NewRollback(u.cfg)
rollin.Version = filteredHistory[0].Version
rollin.WaitStrategy = u.WaitStrategy
rollin.WaitOptions = u.WaitOptions
rollin.WaitForJobs = u.WaitForJobs
rollin.DisableHooks = u.DisableHooks
rollin.ForceReplace = u.ForceReplace

@ -775,3 +775,30 @@ func TestObjectKey(t *testing.T) {
assert.Equal(t, "apps/v1/Deployment/namespace/name", objectKey(&info))
}
func TestUpgradeRelease_WaitOptionsPassedDownstream(t *testing.T) {
is := assert.New(t)
req := require.New(t)
upAction := upgradeAction(t)
rel := releaseStub()
rel.Name = "wait-options-test"
rel.Info.Status = common.StatusDeployed
req.NoError(upAction.cfg.Releases.Create(rel))
upAction.WaitStrategy = kube.StatusWatcherStrategy
// Use WithWaitContext as a marker WaitOption that we can track
ctx := context.Background()
upAction.WaitOptions = []kube.WaitOption{kube.WithWaitContext(ctx)}
// Access the underlying FailingKubeClient to check recorded options
failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
vals := map[string]interface{}{}
_, err := upAction.Run(rel.Name, buildChart(), vals)
req.NoError(err)
// Verify that WaitOptions were passed to GetWaiter
is.NotEmpty(failer.RecordedWaitOptions, "WaitOptions should be passed to GetWaiter")
}

@ -87,6 +87,8 @@ type Client struct {
// WaitContext is an optional context to use for wait operations.
// If not set, a context will be created internally using the
// timeout provided to the wait functions.
//
// Deprecated: Use WithWaitContext wait option when getting a Waiter instead.
WaitContext context.Context
Waiter
@ -139,7 +141,11 @@ func init() {
}
}
func (c *Client) newStatusWatcher() (*statusWaiter, error) {
func (c *Client) newStatusWatcher(opts ...WaitOption) (*statusWaiter, error) {
var o waitOptions
for _, opt := range opts {
opt(&o)
}
cfg, err := c.Factory.ToRESTConfig()
if err != nil {
return nil, err
@ -156,14 +162,23 @@ func (c *Client) newStatusWatcher() (*statusWaiter, error) {
if err != nil {
return nil, err
}
waitContext := o.ctx
if waitContext == nil {
waitContext = c.WaitContext
}
return &statusWaiter{
restMapper: restMapper,
client: dynamicClient,
ctx: c.WaitContext,
ctx: waitContext,
readers: o.statusReaders,
}, nil
}
func (c *Client) GetWaiter(strategy WaitStrategy) (Waiter, error) {
func (c *Client) GetWaiter(ws WaitStrategy) (Waiter, error) {
return c.GetWaiterWithOptions(ws)
}
func (c *Client) GetWaiterWithOptions(strategy WaitStrategy, opts ...WaitOption) (Waiter, error) {
switch strategy {
case LegacyStrategy:
kc, err := c.Factory.KubernetesClientSet()
@ -172,9 +187,9 @@ func (c *Client) GetWaiter(strategy WaitStrategy) (Waiter, error) {
}
return &legacyWaiter{kubeClient: kc, ctx: c.WaitContext}, nil
case StatusWatcherStrategy:
return c.newStatusWatcher()
return c.newStatusWatcher(opts...)
case HookOnlyStrategy:
sw, err := c.newStatusWatcher()
sw, err := c.newStatusWatcher(opts...)
if err != nil {
return nil, err
}
@ -187,8 +202,12 @@ func (c *Client) GetWaiter(strategy WaitStrategy) (Waiter, error) {
}
func (c *Client) SetWaiter(ws WaitStrategy) error {
return c.SetWaiterWithOptions(ws)
}
func (c *Client) SetWaiterWithOptions(ws WaitStrategy, opts ...WaitOption) error {
var err error
c.Waiter, err = c.GetWaiter(ws)
c.Waiter, err = c.GetWaiterWithOptions(ws, opts...)
if err != nil {
return err
}

@ -28,6 +28,10 @@ import (
"testing"
"time"
"github.com/fluxcd/cli-utils/pkg/kstatus/polling/engine"
"github.com/fluxcd/cli-utils/pkg/kstatus/polling/event"
"github.com/fluxcd/cli-utils/pkg/kstatus/status"
"github.com/fluxcd/cli-utils/pkg/object"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -722,7 +726,7 @@ func TestWait(t *testing.T) {
}),
}
var err error
c.Waiter, err = c.GetWaiter(LegacyStrategy)
c.Waiter, err = c.GetWaiterWithOptions(LegacyStrategy)
if err != nil {
t.Fatal(err)
}
@ -783,7 +787,7 @@ func TestWaitJob(t *testing.T) {
}),
}
var err error
c.Waiter, err = c.GetWaiter(LegacyStrategy)
c.Waiter, err = c.GetWaiterWithOptions(LegacyStrategy)
if err != nil {
t.Fatal(err)
}
@ -845,7 +849,7 @@ func TestWaitDelete(t *testing.T) {
}),
}
var err error
c.Waiter, err = c.GetWaiter(LegacyStrategy)
c.Waiter, err = c.GetWaiterWithOptions(LegacyStrategy)
if err != nil {
t.Fatal(err)
}
@ -1852,7 +1856,7 @@ func TestClientWaitContextCancellationLegacy(t *testing.T) {
}
var err error
c.Waiter, err = c.GetWaiter(LegacyStrategy)
c.Waiter, err = c.GetWaiterWithOptions(LegacyStrategy)
require.NoError(t, err)
resources, err := c.Build(objBody(&podList), false)
@ -1907,7 +1911,7 @@ func TestClientWaitWithJobsContextCancellationLegacy(t *testing.T) {
}
var err error
c.Waiter, err = c.GetWaiter(LegacyStrategy)
c.Waiter, err = c.GetWaiterWithOptions(LegacyStrategy)
require.NoError(t, err)
resources, err := c.Build(objBody(job), false)
@ -1968,7 +1972,7 @@ func TestClientWaitForDeleteContextCancellationLegacy(t *testing.T) {
}
var err error
c.Waiter, err = c.GetWaiter(LegacyStrategy)
c.Waiter, err = c.GetWaiterWithOptions(LegacyStrategy)
require.NoError(t, err)
resources, err := c.Build(objBody(&pod), false)
@ -2030,7 +2034,7 @@ func TestClientWaitContextNilDoesNotPanic(t *testing.T) {
}
var err error
c.Waiter, err = c.GetWaiter(LegacyStrategy)
c.Waiter, err = c.GetWaiterWithOptions(LegacyStrategy)
require.NoError(t, err)
resources, err := c.Build(objBody(&podList), false)
@ -2080,7 +2084,7 @@ func TestClientWaitContextPreCancelledLegacy(t *testing.T) {
}
var err error
c.Waiter, err = c.GetWaiter(LegacyStrategy)
c.Waiter, err = c.GetWaiterWithOptions(LegacyStrategy)
require.NoError(t, err)
resources, err := c.Build(objBody(&podList), false)
@ -2111,7 +2115,7 @@ metadata:
namespace: default
`
var err error
c.Waiter, err = c.GetWaiter(StatusWatcherStrategy)
c.Waiter, err = c.GetWaiterWithOptions(StatusWatcherStrategy)
require.NoError(t, err)
resources, err := c.Build(strings.NewReader(podManifest), false)
@ -2138,7 +2142,7 @@ metadata:
namespace: default
`
var err error
c.Waiter, err = c.GetWaiter(StatusWatcherStrategy)
c.Waiter, err = c.GetWaiterWithOptions(StatusWatcherStrategy)
require.NoError(t, err)
resources, err := c.Build(strings.NewReader(jobManifest), false)
@ -2170,7 +2174,7 @@ status:
phase: Running
`
var err error
c.Waiter, err = c.GetWaiter(StatusWatcherStrategy)
c.Waiter, err = c.GetWaiterWithOptions(StatusWatcherStrategy)
require.NoError(t, err)
resources, err := c.Build(strings.NewReader(podManifest), false)
@ -2182,3 +2186,100 @@ status:
require.Error(t, err)
assert.Contains(t, err.Error(), "context canceled", "expected context canceled error, got: %v", err)
}
// testStatusReader is a custom status reader for testing that returns a configurable status.
type testStatusReader struct {
supportedGK schema.GroupKind
status status.Status
}
func (r *testStatusReader) Supports(gk schema.GroupKind) bool {
return gk == r.supportedGK
}
func (r *testStatusReader) ReadStatus(_ context.Context, _ engine.ClusterReader, id object.ObjMetadata) (*event.ResourceStatus, error) {
return &event.ResourceStatus{
Identifier: id,
Status: r.status,
Message: "test status reader",
}, nil
}
func (r *testStatusReader) ReadStatusForObject(_ context.Context, _ engine.ClusterReader, u *unstructured.Unstructured) (*event.ResourceStatus, error) {
id := object.ObjMetadata{
Namespace: u.GetNamespace(),
Name: u.GetName(),
GroupKind: u.GroupVersionKind().GroupKind(),
}
return &event.ResourceStatus{
Identifier: id,
Status: r.status,
Message: "test status reader",
}, nil
}
func TestClientStatusReadersPassedToStatusWaiter(t *testing.T) {
// This test verifies that Client.StatusReaders is correctly passed through
// to the statusWaiter when using the StatusWatcherStrategy.
// We use a custom status reader that immediately returns CurrentStatus for pods,
// which allows a pod without Ready condition to pass the wait.
podManifest := `
apiVersion: v1
kind: Pod
metadata:
name: test-pod
namespace: default
`
c := newTestClient(t)
statusReaders := []engine.StatusReader{
&testStatusReader{
supportedGK: v1.SchemeGroupVersion.WithKind("Pod").GroupKind(),
status: status.CurrentStatus,
},
}
var err error
c.Waiter, err = c.GetWaiterWithOptions(StatusWatcherStrategy, WithKStatusReaders(statusReaders...))
require.NoError(t, err)
resources, err := c.Build(strings.NewReader(podManifest), false)
require.NoError(t, err)
// The pod has no Ready condition, but our custom reader returns CurrentStatus,
// so the wait should succeed immediately without timeout.
err = c.Wait(resources, time.Second*3)
require.NoError(t, err)
}
func TestClientStatusReadersWithWaitWithJobs(t *testing.T) {
// This test verifies that Client.StatusReaders is correctly passed through
// to the statusWaiter when using WaitWithJobs.
jobManifest := `
apiVersion: batch/v1
kind: Job
metadata:
name: test-job
namespace: default
`
c := newTestClient(t)
statusReaders := []engine.StatusReader{
&testStatusReader{
supportedGK: schema.GroupKind{Group: "batch", Kind: "Job"},
status: status.CurrentStatus,
},
}
var err error
c.Waiter, err = c.GetWaiterWithOptions(StatusWatcherStrategy, WithKStatusReaders(statusReaders...))
require.NoError(t, err)
resources, err := c.Build(strings.NewReader(jobManifest), false)
require.NoError(t, err)
// The job has no Complete condition, but our custom reader returns CurrentStatus,
// so the wait should succeed immediately without timeout.
err = c.WaitWithJobs(resources, time.Second*3)
require.NoError(t, err)
}

@ -47,6 +47,8 @@ type FailingKubeClient struct {
WaitForDeleteError error
WatchUntilReadyError error
WaitDuration time.Duration
// RecordedWaitOptions stores the WaitOptions passed to GetWaiter for testing
RecordedWaitOptions []kube.WaitOption
}
var _ kube.Interface = &FailingKubeClient{}
@ -153,7 +155,13 @@ func (f *FailingKubeClient) BuildTable(r io.Reader, _ bool) (kube.ResourceList,
}
func (f *FailingKubeClient) GetWaiter(ws kube.WaitStrategy) (kube.Waiter, error) {
waiter, _ := f.PrintingKubeClient.GetWaiter(ws)
return f.GetWaiterWithOptions(ws)
}
func (f *FailingKubeClient) GetWaiterWithOptions(ws kube.WaitStrategy, opts ...kube.WaitOption) (kube.Waiter, error) {
// Record the WaitOptions for testing
f.RecordedWaitOptions = append(f.RecordedWaitOptions, opts...)
waiter, _ := f.PrintingKubeClient.GetWaiterWithOptions(ws, opts...)
printingKubeWaiter, _ := waiter.(*PrintingKubeWaiter)
return &FailingKubeWaiter{
PrintingKubeWaiter: printingKubeWaiter,

@ -148,7 +148,11 @@ func (p *PrintingKubeClient) DeleteWithPropagationPolicy(resources kube.Resource
return &kube.Result{Deleted: resources}, nil
}
func (p *PrintingKubeClient) GetWaiter(_ kube.WaitStrategy) (kube.Waiter, error) {
func (p *PrintingKubeClient) GetWaiter(ws kube.WaitStrategy) (kube.Waiter, error) {
return p.GetWaiterWithOptions(ws)
}
func (p *PrintingKubeClient) GetWaiterWithOptions(_ kube.WaitStrategy, _ ...kube.WaitOption) (kube.Waiter, error) {
return &PrintingKubeWaiter{Out: p.Out, LogOutput: p.LogOutput}, nil
}

@ -56,7 +56,7 @@ type Interface interface {
// IsReachable checks whether the client is able to connect to the cluster.
IsReachable() error
// Get Waiter gets the Kube.Waiter
// GetWaiter gets the Kube.Waiter.
GetWaiter(ws WaitStrategy) (Waiter, error)
// GetPodList lists all pods that match the specified listOptions
@ -99,3 +99,14 @@ type Waiter interface {
// error.
WatchUntilReady(resources ResourceList, timeout time.Duration) error
}
// InterfaceWaitOptions defines an interface that extends Interface with
// methods that accept wait options.
//
// TODO Helm 5: Remove InterfaceWaitOptions and integrate its method(s) into the Interface.
type InterfaceWaitOptions interface {
// GetWaiter gets the Kube.Waiter with options.
GetWaiterWithOptions(ws WaitStrategy, opts ...WaitOption) (Waiter, error)
}
var _ InterfaceWaitOptions = (*Client)(nil)

@ -0,0 +1,45 @@
/*
Copyright The Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kube
import (
"context"
"github.com/fluxcd/cli-utils/pkg/kstatus/polling/engine"
)
// WaitOption is a function that configures an option for waiting on resources.
type WaitOption func(*waitOptions)
// WithWaitContext sets the context for waiting on resources.
func WithWaitContext(ctx context.Context) WaitOption {
return func(wo *waitOptions) {
wo.ctx = ctx
}
}
// WithKStatusReaders sets the status readers to be used while waiting on resources.
func WithKStatusReaders(readers ...engine.StatusReader) WaitOption {
return func(wo *waitOptions) {
wo.statusReaders = readers
}
}
type waitOptions struct {
ctx context.Context
statusReaders []engine.StatusReader
}

@ -45,6 +45,7 @@ type statusWaiter struct {
client dynamic.Interface
restMapper meta.RESTMapper
ctx context.Context
readers []engine.StatusReader
}
// DefaultStatusWatcherTimeout is the timeout used by the status waiter when a
@ -71,15 +72,13 @@ func (w *statusWaiter) WatchUntilReady(resourceList ResourceList, timeout time.D
sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper)
jobSR := helmStatusReaders.NewCustomJobStatusReader(w.restMapper)
podSR := helmStatusReaders.NewCustomPodStatusReader(w.restMapper)
// We don't want to wait on any other resources as watchUntilReady is only for Helm hooks
// We don't want to wait on any other resources as watchUntilReady is only for Helm hooks.
// If custom readers are defined they can be used as Helm hooks support any resource.
// We put them in front since the DelegatingStatusReader uses the first reader that matches.
genericSR := statusreaders.NewGenericStatusReader(w.restMapper, alwaysReady)
sr := &statusreaders.DelegatingStatusReader{
StatusReaders: []engine.StatusReader{
jobSR,
podSR,
genericSR,
},
StatusReaders: append(w.readers, jobSR, podSR, genericSR),
}
sw.StatusReader = sr
return w.wait(ctx, resourceList, sw)
@ -93,6 +92,7 @@ func (w *statusWaiter) Wait(resourceList ResourceList, timeout time.Duration) er
defer cancel()
slog.Debug("waiting for resources", "count", len(resourceList), "timeout", timeout)
sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper)
sw.StatusReader = statusreaders.NewStatusReader(w.restMapper, w.readers...)
return w.wait(ctx, resourceList, sw)
}
@ -105,7 +105,8 @@ func (w *statusWaiter) WaitWithJobs(resourceList ResourceList, timeout time.Dura
slog.Debug("waiting for resources", "count", len(resourceList), "timeout", timeout)
sw := watcher.NewDefaultStatusWatcher(w.client, w.restMapper)
newCustomJobStatusReader := helmStatusReaders.NewCustomJobStatusReader(w.restMapper)
customSR := statusreaders.NewStatusReader(w.restMapper, newCustomJobStatusReader)
readers := append(w.readers, newCustomJobStatusReader)
customSR := statusreaders.NewStatusReader(w.restMapper, readers...)
sw.StatusReader = customSR
return w.wait(ctx, resourceList, sw)
}

@ -14,15 +14,21 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package kube // import "helm.sh/helm/v3/pkg/kube"
package kube // import "helm.sh/helm/v4/pkg/kube"
import (
"context"
"errors"
"fmt"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/fluxcd/cli-utils/pkg/kstatus/polling/engine"
"github.com/fluxcd/cli-utils/pkg/kstatus/polling/event"
"github.com/fluxcd/cli-utils/pkg/kstatus/status"
"github.com/fluxcd/cli-utils/pkg/object"
"github.com/fluxcd/cli-utils/pkg/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -933,3 +939,272 @@ func TestStatusWaitMixedResources(t *testing.T) {
})
}
}
// mockStatusReader is a custom status reader for testing that tracks when it's used
// and returns a configurable status for resources it supports.
type mockStatusReader struct {
supportedGK schema.GroupKind
status status.Status
callCount atomic.Int32
}
func (m *mockStatusReader) Supports(gk schema.GroupKind) bool {
return gk == m.supportedGK
}
func (m *mockStatusReader) ReadStatus(_ context.Context, _ engine.ClusterReader, id object.ObjMetadata) (*event.ResourceStatus, error) {
m.callCount.Add(1)
return &event.ResourceStatus{
Identifier: id,
Status: m.status,
Message: "mock status reader",
}, nil
}
func (m *mockStatusReader) ReadStatusForObject(_ context.Context, _ engine.ClusterReader, u *unstructured.Unstructured) (*event.ResourceStatus, error) {
m.callCount.Add(1)
id := object.ObjMetadata{
Namespace: u.GetNamespace(),
Name: u.GetName(),
GroupKind: u.GroupVersionKind().GroupKind(),
}
return &event.ResourceStatus{
Identifier: id,
Status: m.status,
Message: "mock status reader",
}, nil
}
func TestStatusWaitWithCustomReaders(t *testing.T) {
t.Parallel()
tests := []struct {
name string
objManifests []string
customReader *mockStatusReader
expectErrs []error
}{
{
name: "custom reader makes pod immediately current",
objManifests: []string{podNoStatusManifest},
customReader: &mockStatusReader{
supportedGK: v1.SchemeGroupVersion.WithKind("Pod").GroupKind(),
status: status.CurrentStatus,
},
expectErrs: nil,
},
{
name: "custom reader returns in-progress status",
objManifests: []string{podCurrentManifest},
customReader: &mockStatusReader{
supportedGK: v1.SchemeGroupVersion.WithKind("Pod").GroupKind(),
status: status.InProgressStatus,
},
expectErrs: []error{errors.New("resource not ready, name: current-pod, kind: Pod, status: InProgress"), errors.New("context deadline exceeded")},
},
{
name: "custom reader for different resource type is not used",
objManifests: []string{podCurrentManifest},
customReader: &mockStatusReader{
supportedGK: batchv1.SchemeGroupVersion.WithKind("Job").GroupKind(),
status: status.InProgressStatus,
},
expectErrs: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
c := newTestClient(t)
fakeClient := dynamicfake.NewSimpleDynamicClient(scheme.Scheme)
fakeMapper := testutil.NewFakeRESTMapper(
v1.SchemeGroupVersion.WithKind("Pod"),
batchv1.SchemeGroupVersion.WithKind("Job"),
)
statusWaiter := statusWaiter{
client: fakeClient,
restMapper: fakeMapper,
readers: []engine.StatusReader{tt.customReader},
}
objs := getRuntimeObjFromManifests(t, tt.objManifests)
for _, obj := range objs {
u := obj.(*unstructured.Unstructured)
gvr := getGVR(t, fakeMapper, u)
err := fakeClient.Tracker().Create(gvr, u, u.GetNamespace())
assert.NoError(t, err)
}
resourceList := getResourceListFromRuntimeObjs(t, c, objs)
err := statusWaiter.Wait(resourceList, time.Second*3)
if tt.expectErrs != nil {
assert.EqualError(t, err, errors.Join(tt.expectErrs...).Error())
return
}
assert.NoError(t, err)
})
}
}
func TestStatusWaitWithJobsAndCustomReaders(t *testing.T) {
t.Parallel()
tests := []struct {
name string
objManifests []string
customReader *mockStatusReader
expectErrs []error
}{
{
name: "custom reader makes job immediately current",
objManifests: []string{jobNoStatusManifest},
customReader: &mockStatusReader{
supportedGK: batchv1.SchemeGroupVersion.WithKind("Job").GroupKind(),
status: status.CurrentStatus,
},
expectErrs: nil,
},
{
name: "custom reader for pod works with WaitWithJobs",
objManifests: []string{podNoStatusManifest},
customReader: &mockStatusReader{
supportedGK: v1.SchemeGroupVersion.WithKind("Pod").GroupKind(),
status: status.CurrentStatus,
},
expectErrs: nil,
},
{
name: "built-in job reader is still appended after custom readers",
objManifests: []string{jobCompleteManifest},
customReader: &mockStatusReader{
supportedGK: v1.SchemeGroupVersion.WithKind("Pod").GroupKind(),
status: status.CurrentStatus,
},
expectErrs: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
c := newTestClient(t)
fakeClient := dynamicfake.NewSimpleDynamicClient(scheme.Scheme)
fakeMapper := testutil.NewFakeRESTMapper(
v1.SchemeGroupVersion.WithKind("Pod"),
batchv1.SchemeGroupVersion.WithKind("Job"),
)
statusWaiter := statusWaiter{
client: fakeClient,
restMapper: fakeMapper,
readers: []engine.StatusReader{tt.customReader},
}
objs := getRuntimeObjFromManifests(t, tt.objManifests)
for _, obj := range objs {
u := obj.(*unstructured.Unstructured)
gvr := getGVR(t, fakeMapper, u)
err := fakeClient.Tracker().Create(gvr, u, u.GetNamespace())
assert.NoError(t, err)
}
resourceList := getResourceListFromRuntimeObjs(t, c, objs)
err := statusWaiter.WaitWithJobs(resourceList, time.Second*3)
if tt.expectErrs != nil {
assert.EqualError(t, err, errors.Join(tt.expectErrs...).Error())
return
}
assert.NoError(t, err)
})
}
}
func TestWatchUntilReadyWithCustomReaders(t *testing.T) {
t.Parallel()
tests := []struct {
name string
objManifests []string
customReader *mockStatusReader
expectErrs []error
}{
{
name: "custom reader makes job immediately current for hooks",
objManifests: []string{jobNoStatusManifest},
customReader: &mockStatusReader{
supportedGK: batchv1.SchemeGroupVersion.WithKind("Job").GroupKind(),
status: status.CurrentStatus,
},
expectErrs: nil,
},
{
name: "custom reader makes pod immediately current for hooks",
objManifests: []string{podCurrentManifest},
customReader: &mockStatusReader{
supportedGK: v1.SchemeGroupVersion.WithKind("Pod").GroupKind(),
status: status.CurrentStatus,
},
expectErrs: nil,
},
{
name: "custom reader takes precedence over built-in pod reader",
objManifests: []string{podCompleteManifest},
customReader: &mockStatusReader{
supportedGK: v1.SchemeGroupVersion.WithKind("Pod").GroupKind(),
status: status.InProgressStatus,
},
expectErrs: []error{errors.New("resource not ready, name: good-pod, kind: Pod, status: InProgress"), errors.New("context deadline exceeded")},
},
{
name: "custom reader takes precedence over built-in job reader",
objManifests: []string{jobCompleteManifest},
customReader: &mockStatusReader{
supportedGK: batchv1.SchemeGroupVersion.WithKind("Job").GroupKind(),
status: status.InProgressStatus,
},
expectErrs: []error{errors.New("resource not ready, name: test, kind: Job, status: InProgress"), errors.New("context deadline exceeded")},
},
{
name: "custom reader for different resource type does not affect pods",
objManifests: []string{podCompleteManifest},
customReader: &mockStatusReader{
supportedGK: batchv1.SchemeGroupVersion.WithKind("Job").GroupKind(),
status: status.InProgressStatus,
},
expectErrs: nil,
},
{
name: "built-in readers still work when custom reader does not match",
objManifests: []string{jobCompleteManifest},
customReader: &mockStatusReader{
supportedGK: v1.SchemeGroupVersion.WithKind("Pod").GroupKind(),
status: status.InProgressStatus,
},
expectErrs: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
c := newTestClient(t)
fakeClient := dynamicfake.NewSimpleDynamicClient(scheme.Scheme)
fakeMapper := testutil.NewFakeRESTMapper(
v1.SchemeGroupVersion.WithKind("Pod"),
batchv1.SchemeGroupVersion.WithKind("Job"),
)
statusWaiter := statusWaiter{
client: fakeClient,
restMapper: fakeMapper,
readers: []engine.StatusReader{tt.customReader},
}
objs := getRuntimeObjFromManifests(t, tt.objManifests)
for _, obj := range objs {
u := obj.(*unstructured.Unstructured)
gvr := getGVR(t, fakeMapper, u)
err := fakeClient.Tracker().Create(gvr, u, u.GetNamespace())
assert.NoError(t, err)
}
resourceList := getResourceListFromRuntimeObjs(t, c, objs)
err := statusWaiter.WatchUntilReady(resourceList, time.Second*3)
if tt.expectErrs != nil {
assert.EqualError(t, err, errors.Join(tt.expectErrs...).Error())
return
}
assert.NoError(t, err)
})
}
}

Loading…
Cancel
Save