From 45141451b48503565ecc13b8805bbe85a2c6d6af Mon Sep 17 00:00:00 2001 From: George Jenkins Date: Thu, 3 Jul 2025 12:59:23 -0700 Subject: [PATCH 1/4] Kube client support server-side apply Signed-off-by: George Jenkins --- pkg/action/hooks.go | 4 +- pkg/action/install.go | 24 +- pkg/action/rollback.go | 6 +- pkg/action/upgrade.go | 6 +- pkg/kube/client.go | 519 ++++++++++++++++----- pkg/kube/client_test.go | 949 +++++++++++++++++++++++++++++++-------- pkg/kube/fake/fake.go | 16 +- pkg/kube/fake/printer.go | 4 +- pkg/kube/interface.go | 12 +- pkg/kube/wait.go | 21 - 10 files changed, 1213 insertions(+), 348 deletions(-) diff --git a/pkg/action/hooks.go b/pkg/action/hooks.go index d01ec84a0..95260e0e4 100644 --- a/pkg/action/hooks.go +++ b/pkg/action/hooks.go @@ -73,7 +73,9 @@ func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent, h.LastRun.Phase = release.HookPhaseUnknown // Create hook resources - if _, err := cfg.KubeClient.Create(resources); err != nil { + if _, err := cfg.KubeClient.Create( + resources, + kube.ClientCreateOptionServerSideApply(false)); err != nil { h.LastRun.CompletedAt = helmtime.Now() h.LastRun.Phase = release.HookPhaseFailed return fmt.Errorf("warning: Hook %s %s failed: %w", hook, h.Path, err) diff --git a/pkg/action/install.go b/pkg/action/install.go index d8efa5d5d..9a9101f5d 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -173,7 +173,9 @@ func (i *Install) installCRDs(crds []chart.CRD) error { } // Send them to Kube - if _, err := i.cfg.KubeClient.Create(res); err != nil { + if _, err := i.cfg.KubeClient.Create( + res, + kube.ClientCreateOptionServerSideApply(false)); err != nil { // If the error is CRD already exists, continue. if apierrors.IsAlreadyExists(err) { crdName := res[0].Name @@ -399,7 +401,9 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma if err != nil { return nil, err } - if _, err := i.cfg.KubeClient.Create(resourceList); err != nil && !apierrors.IsAlreadyExists(err) { + if _, err := i.cfg.KubeClient.Create( + resourceList, + kube.ClientCreateOptionServerSideApply(false)); err != nil && !apierrors.IsAlreadyExists(err) { return nil, err } } @@ -468,13 +472,17 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource // do an update, but it's not clear whether we WANT to do an update if the reuse is set // to true, since that is basically an upgrade operation. if len(toBeAdopted) == 0 && len(resources) > 0 { - _, err = i.cfg.KubeClient.Create(resources) + _, err = i.cfg.KubeClient.Create( + resources, + kube.ClientCreateOptionServerSideApply(false)) } else if len(resources) > 0 { - if i.TakeOwnership { - _, err = i.cfg.KubeClient.(kube.InterfaceThreeWayMerge).UpdateThreeWayMerge(toBeAdopted, resources, i.ForceReplace) - } else { - _, err = i.cfg.KubeClient.Update(toBeAdopted, resources, i.ForceReplace) - } + updateThreeWayMergeForUnstructured := i.TakeOwnership + _, err = i.cfg.KubeClient.Update( + toBeAdopted, + resources, + kube.ClientUpdateOptionServerSideApply(false), + kube.ClientUpdateOptionThreeWayMergeForUnstructured(updateThreeWayMergeForUnstructured), + kube.ClientUpdateOptionForceReplace(i.ForceReplace)) } if err != nil { return rel, err diff --git a/pkg/action/rollback.go b/pkg/action/rollback.go index f529fa422..f60d4f4bc 100644 --- a/pkg/action/rollback.go +++ b/pkg/action/rollback.go @@ -190,7 +190,11 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas if err != nil { return targetRelease, fmt.Errorf("unable to set metadata visitor from target release: %w", err) } - results, err := r.cfg.KubeClient.Update(current, target, r.ForceReplace) + results, err := r.cfg.KubeClient.Update( + current, + target, + kube.ClientUpdateOptionServerSideApply(false), + kube.ClientUpdateOptionForceReplace(r.ForceReplace)) if err != nil { msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err) diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index 0567c8de2..a32d6e78e 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -426,7 +426,11 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele slog.Debug("upgrade hooks disabled", "name", upgradedRelease.Name) } - results, err := u.cfg.KubeClient.Update(current, target, u.ForceReplace) + results, err := u.cfg.KubeClient.Update( + current, + target, + kube.ClientUpdateOptionServerSideApply(false), + kube.ClientUpdateOptionForceReplace(u.ForceReplace)) if err != nil { u.cfg.recordRelease(originalRelease) u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 78ed4e088..aa7c86c9b 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -24,6 +24,7 @@ import ( "fmt" "io" "log/slog" + "net/http" "os" "path/filepath" "reflect" @@ -91,6 +92,14 @@ const ( HookOnlyStrategy WaitStrategy = "hookOnly" ) +type FieldValidationDirective string + +const ( + FieldValidationDirectiveIgnore FieldValidationDirective = "Ignore" + FieldValidationDirectiveWarn FieldValidationDirective = "Warn" + FieldValidationDirectiveStrict FieldValidationDirective = "Strict" +) + func init() { // Add CRDs to the scheme. They are missing by default. if err := apiextv1.AddToScheme(scheme.Scheme); err != nil { @@ -194,10 +203,101 @@ func (c *Client) IsReachable() error { return nil } +type clientCreateOptions struct { + serverSideApply bool + forceConflicts bool + dryRun bool + fieldValidationDirective FieldValidationDirective +} + +type ClientCreateOption func(*clientCreateOptions) error + +// ClientUpdateOptionServerSideApply enables performing object apply server-side +// see: https://kubernetes.io/docs/reference/using-api/server-side-apply/ +func ClientCreateOptionServerSideApply(serverSideApply bool) ClientCreateOption { + return func(o *clientCreateOptions) error { + o.serverSideApply = serverSideApply + + return nil + } +} + +// ClientCreateOptionForceConflicts forces field conflicts to be resolved +// see: https://kubernetes.io/docs/reference/using-api/server-side-apply/#conflicts +// Only valid when ClientUpdateOptionServerSideApply enabled +func ClientCreateOptionForceConflicts(forceConflicts bool) ClientCreateOption { + return func(o *clientCreateOptions) error { + o.forceConflicts = forceConflicts + + return nil + } +} + +// ClientCreateOptionDryRun performs non-mutating operations only +func ClientCreateOptionDryRun(dryRun bool) ClientCreateOption { + return func(o *clientCreateOptions) error { + o.dryRun = dryRun + + return nil + } +} + +// ClientCreateOptionFieldValidationDirective specifies show API operations validate object's schema +// - For client-side apply: this is ignored +// - For server-side apply: the directive is sent to the server to perform the validation +// +// Defaults to `FieldValidationDirectiveStrict` +func ClientCreateOptionFieldValidationDirective(fieldValidationDirective FieldValidationDirective) ClientCreateOption { + return func(o *clientCreateOptions) error { + o.fieldValidationDirective = fieldValidationDirective + + return nil + } +} + // Create creates Kubernetes resources specified in the resource list. -func (c *Client) Create(resources ResourceList) (*Result, error) { +func (c *Client) Create(resources ResourceList, options ...ClientCreateOption) (*Result, error) { slog.Debug("creating resource(s)", "resources", len(resources)) - if err := perform(resources, createResource); err != nil { + + createOptions := clientCreateOptions{ + serverSideApply: true, // Default to server-side apply + fieldValidationDirective: FieldValidationDirectiveStrict, + } + + for _, o := range options { + o(&createOptions) + } + + if createOptions.forceConflicts && !createOptions.serverSideApply { + return nil, fmt.Errorf("invalid operation: force conflicts can only be used with server-side apply") + } + + makeCreateApplyFunc := func() func(target *resource.Info) error { + if createOptions.serverSideApply { + slog.Debug("using server-side apply for resource creation", slog.Bool("forceConflicts", createOptions.forceConflicts), slog.Bool("dryRun", createOptions.dryRun), slog.String("fieldValidationDirective", string(createOptions.fieldValidationDirective))) + return func(target *resource.Info) error { + err := patchResourceServerSide(target, createOptions.dryRun, createOptions.forceConflicts, createOptions.fieldValidationDirective) + + logger := slog.With( + slog.String("namespace", target.Namespace), + slog.String("name", target.Name), + slog.String("gvk", target.Mapping.GroupVersionKind.String())) + if err != nil { + logger.Debug("Error patching resource", slog.Any("error", err)) + return err + } + + logger.Debug("Patched resource") + + return nil + } + } + + slog.Debug("using client-side apply for resource creation") + return createResource + } + + if err := perform(resources, makeCreateApplyFunc()); err != nil { return nil, err } return &Result{Created: resources}, nil @@ -348,96 +448,98 @@ func (c *Client) namespace() string { return v1.NamespaceDefault } -// newBuilder returns a new resource builder for structured api objects. -func (c *Client) newBuilder() *resource.Builder { - return c.Factory.NewBuilder(). - ContinueOnError(). - NamespaceParam(c.namespace()). - DefaultNamespace(). - Flatten() -} - -// Build validates for Kubernetes objects and returns unstructured infos. -func (c *Client) Build(reader io.Reader, validate bool) (ResourceList, error) { - validationDirective := metav1.FieldValidationIgnore +func determineFieldValidationDirective(validate bool) FieldValidationDirective { if validate { - validationDirective = metav1.FieldValidationStrict + return FieldValidationDirectiveStrict } - schema, err := c.Factory.Validator(validationDirective) + return FieldValidationDirectiveIgnore +} + +func buildResourceList(f Factory, namespace string, validationDirective FieldValidationDirective, reader io.Reader, transformRequest resource.RequestTransform) (ResourceList, error) { + + schema, err := f.Validator(string(validationDirective)) if err != nil { return nil, err } - result, err := c.newBuilder(). + + builder := f.NewBuilder(). + ContinueOnError(). + NamespaceParam(namespace). + DefaultNamespace(). + Flatten(). Unstructured(). Schema(schema). - Stream(reader, ""). - Do().Infos() + Stream(reader, "") + if transformRequest != nil { + builder.TransformRequests(transformRequest) + } + result, err := builder.Do().Infos() return result, scrubValidationError(err) } +// Build validates for Kubernetes objects and returns unstructured infos. +func (c *Client) Build(reader io.Reader, validate bool) (ResourceList, error) { + return buildResourceList( + c.Factory, + c.namespace(), + determineFieldValidationDirective(validate), + reader, + nil) +} + // BuildTable validates for Kubernetes objects and returns unstructured infos. // The returned kind is a Table. func (c *Client) BuildTable(reader io.Reader, validate bool) (ResourceList, error) { - validationDirective := metav1.FieldValidationIgnore - if validate { - validationDirective = metav1.FieldValidationStrict - } - - schema, err := c.Factory.Validator(validationDirective) - if err != nil { - return nil, err - } - result, err := c.newBuilder(). - Unstructured(). - Schema(schema). - Stream(reader, ""). - TransformRequests(transformRequests). - Do().Infos() - return result, scrubValidationError(err) + return buildResourceList( + c.Factory, + c.namespace(), + determineFieldValidationDirective(validate), + reader, + transformRequests) } -func (c *Client) update(original, target ResourceList, force, threeWayMerge bool) (*Result, error) { +func (c *Client) update(target, original ResourceList, updateApplyFunc func(target, original *resource.Info) error) (*Result, error) { updateErrors := []error{} res := &Result{} slog.Debug("checking resources for changes", "resources", len(target)) - err := target.Visit(func(info *resource.Info, err error) error { + err := target.Visit(func(target *resource.Info, err error) error { if err != nil { return err } - helper := resource.NewHelper(info.Client, info.Mapping).WithFieldManager(getManagedFieldsManager()) - if _, err := helper.Get(info.Namespace, info.Name); err != nil { + helper := resource.NewHelper(target.Client, target.Mapping).WithFieldManager(getManagedFieldsManager()) + if _, err := helper.Get(target.Namespace, target.Name); err != nil { if !apierrors.IsNotFound(err) { return fmt.Errorf("could not get information about the resource: %w", err) } // Append the created resource to the results, even if something fails - res.Created = append(res.Created, info) + res.Created = append(res.Created, target) // Since the resource does not exist, create it. - if err := createResource(info); err != nil { + if err := createResource(target); err != nil { return fmt.Errorf("failed to create resource: %w", err) } - kind := info.Mapping.GroupVersionKind.Kind - slog.Debug("created a new resource", "namespace", info.Namespace, "name", info.Name, "kind", kind) + kind := target.Mapping.GroupVersionKind.Kind + slog.Debug("created a new resource", "namespace", target.Namespace, "name", target.Name, "kind", kind) return nil } - originalInfo := original.Get(info) - if originalInfo == nil { - kind := info.Mapping.GroupVersionKind.Kind - return fmt.Errorf("no %s with the name %q found", kind, info.Name) + original := original.Get(target) + if original == nil { + kind := target.Mapping.GroupVersionKind.Kind + return fmt.Errorf("original object %s with the name %q not found", kind, target.Name) } - if err := updateResource(c, info, originalInfo.Object, force, threeWayMerge); err != nil { - slog.Debug("error updating the resource", "namespace", info.Namespace, "name", info.Name, "kind", info.Mapping.GroupVersionKind.Kind, slog.Any("error", err)) + if err := updateApplyFunc(target, original); err != nil { updateErrors = append(updateErrors, err) } + // Because we check for errors later, append the info regardless - res.Updated = append(res.Updated, info) + res.Updated = append(res.Updated, target) return nil }) @@ -473,18 +575,81 @@ func (c *Client) update(original, target ResourceList, force, threeWayMerge bool return res, nil } -// Update takes the current list of objects and target list of objects and -// creates resources that don't already exist, updates resources that have been -// modified in the target configuration, and deletes resources from the current -// configuration that are not present in the target configuration. If an error -// occurs, a Result will still be returned with the error, containing all -// resource updates, creations, and deletions that were attempted. These can be -// used for cleanup or other logging purposes. +type clientUpdateOptions struct { + threeWayMergeForUnstructured bool + serverSideApply bool + forceReplace bool + forceConflicts bool + dryRun bool + fieldValidationDirective FieldValidationDirective +} + +type ClientUpdateOption func(*clientUpdateOptions) error + +// ClientUpdateOptionThreeWayMergeForUnstructured enables performing three-way merge for unstructured objects +// Must not be enabled when ClientUpdateOptionServerSideApply is enabled +func ClientUpdateOptionThreeWayMergeForUnstructured(threeWayMergeForUnstructured bool) ClientUpdateOption { + return func(o *clientUpdateOptions) error { + o.threeWayMergeForUnstructured = threeWayMergeForUnstructured + + return nil + } +} + +// ClientUpdateOptionServerSideApply enables performing object apply server-side (default) +// see: https://kubernetes.io/docs/reference/using-api/server-side-apply/ +// Must not be enabled when ClientUpdateOptionThreeWayMerge is enabled +func ClientUpdateOptionServerSideApply(serverSideApply bool) ClientUpdateOption { + return func(o *clientUpdateOptions) error { + o.serverSideApply = serverSideApply + + return nil + } +} + +// ClientUpdateOptionForceReplace forces objects to be replaced rather than updated via patch +// Must not be enabled when ClientUpdateOptionForceConflicts is enabled +func ClientUpdateOptionForceReplace(forceReplace bool) ClientUpdateOption { + return func(o *clientUpdateOptions) error { + o.forceReplace = forceReplace + + return nil + } +} + +// ClientUpdateOptionForceConflicts forces field conflicts to be resolved +// see: https://kubernetes.io/docs/reference/using-api/server-side-apply/#conflicts +// Must not be enabled when ClientUpdateOptionForceReplace is enabled +func ClientUpdateOptionForceConflicts(forceConflicts bool) ClientUpdateOption { + return func(o *clientUpdateOptions) error { + o.forceConflicts = forceConflicts + + return nil + } +} + +// ClientUpdateOptionForceConflicts forces field conflicts to be resolved +// see: https://kubernetes.io/docs/reference/using-api/server-side-apply/#conflicts +// Must not be enabled when ClientUpdateOptionForceReplace is enabled +func ClientUpdateOptionDryRun(dryRun bool) ClientUpdateOption { + return func(o *clientUpdateOptions) error { + o.dryRun = dryRun + + return nil + } +} + +// ClientUpdateOptionFieldValidationDirective specifies show API operations validate object's schema +// - For client-side apply: this is ignored +// - For server-side apply: the directive is sent to the server to perform the validation // -// The difference to Update is that UpdateThreeWayMerge does a three-way-merge -// for unstructured objects. -func (c *Client) UpdateThreeWayMerge(original, target ResourceList, force bool) (*Result, error) { - return c.update(original, target, force, true) +// Defaults to `FieldValidationDirectiveStrict` +func ClientUpdateOptionFieldValidationDirective(fieldValidationDirective FieldValidationDirective) ClientCreateOption { + return func(o *clientCreateOptions) error { + o.fieldValidationDirective = fieldValidationDirective + + return nil + } } // Update takes the current list of objects and target list of objects and @@ -494,8 +659,78 @@ func (c *Client) UpdateThreeWayMerge(original, target ResourceList, force bool) // occurs, a Result will still be returned with the error, containing all // resource updates, creations, and deletions that were attempted. These can be // used for cleanup or other logging purposes. -func (c *Client) Update(original, target ResourceList, force bool) (*Result, error) { - return c.update(original, target, force, false) +// +// The default is to use server-side apply, equivalent to: `ClientUpdateOptionServerSideApply(true)` +func (c *Client) Update(original, target ResourceList, options ...ClientUpdateOption) (*Result, error) { + updateOptions := clientUpdateOptions{ + serverSideApply: true, // Default to server-side apply + fieldValidationDirective: FieldValidationDirectiveStrict, + } + + for _, o := range options { + o(&updateOptions) + } + + if updateOptions.threeWayMergeForUnstructured && updateOptions.serverSideApply { + return nil, fmt.Errorf("invalid operation: cannot use three-way merge for unstructured and server-side apply together") + } + + if updateOptions.forceConflicts && updateOptions.forceReplace { + return nil, fmt.Errorf("invalid operation: cannot use force conflicts and force replace together") + } + + if updateOptions.serverSideApply && updateOptions.forceReplace { + return nil, fmt.Errorf("invalid operation: cannot use server-side apply and force replace together") + } + + makeUpdateApplyFunc := func() func(target, original *resource.Info) error { + if updateOptions.forceReplace { + slog.Debug( + "using resource replace update strategy", + slog.String("fieldValidationDirective", string(updateOptions.fieldValidationDirective))) + return func(target, original *resource.Info) error { + if err := replaceResource(target, updateOptions.fieldValidationDirective); err != nil { + slog.Debug("error replacing the resource", "namespace", target.Namespace, "name", target.Name, "kind", target.Mapping.GroupVersionKind.Kind, slog.Any("error", err)) + return err + } + + originalObject := original.Object + kind := target.Mapping.GroupVersionKind.Kind + slog.Debug("replace succeeded", "name", original.Name, "initialKind", originalObject.GetObjectKind().GroupVersionKind().Kind, "kind", kind) + + return nil + } + } else if updateOptions.serverSideApply { + slog.Debug( + "using server-side apply for resource update", + slog.Bool("forceConflicts", updateOptions.forceConflicts), + slog.Bool("dryRun", updateOptions.dryRun), + slog.String("fieldValidationDirective", string(updateOptions.fieldValidationDirective))) + return func(target, _ *resource.Info) error { + err := patchResourceServerSide(target, updateOptions.dryRun, updateOptions.forceConflicts, updateOptions.fieldValidationDirective) + + logger := slog.With( + slog.String("namespace", target.Namespace), + slog.String("name", target.Name), + slog.String("gvk", target.Mapping.GroupVersionKind.String())) + if err != nil { + logger.Debug("Error patching resource", slog.Any("error", err)) + return err + } + + logger.Debug("Patched resource") + + return nil + } + } + + slog.Debug("using client-side apply for resource update", slog.Bool("threeWayMergeForUnstructured", updateOptions.threeWayMergeForUnstructured)) + return func(target, original *resource.Info) error { + return patchResourceClientSide(target, original.Object, updateOptions.threeWayMergeForUnstructured) + } + } + + return c.update(target, original, makeUpdateApplyFunc()) } // Delete deletes Kubernetes resources specified in the resources list with @@ -503,7 +738,7 @@ func (c *Client) Update(original, target ResourceList, force bool) (*Result, err // if one or more fail and collect any errors. All successfully deleted items // will be returned in the `Deleted` ResourceList that is part of the result. func (c *Client) Delete(resources ResourceList) (*Result, []error) { - return rdelete(c, resources, metav1.DeletePropagationBackground) + return deleteResources(resources, metav1.DeletePropagationBackground) } // Delete deletes Kubernetes resources specified in the resources list with @@ -511,10 +746,10 @@ func (c *Client) Delete(resources ResourceList) (*Result, []error) { // if one or more fail and collect any errors. All successfully deleted items // will be returned in the `Deleted` ResourceList that is part of the result. func (c *Client) DeleteWithPropagationPolicy(resources ResourceList, policy metav1.DeletionPropagation) (*Result, []error) { - return rdelete(c, resources, policy) + return deleteResources(resources, policy) } -func rdelete(_ *Client, resources ResourceList, propagation metav1.DeletionPropagation) (*Result, []error) { +func deleteResources(resources ResourceList, propagation metav1.DeletionPropagation) (*Result, []error) { var errs []error res := &Result{} mtx := sync.Mutex{} @@ -548,6 +783,17 @@ func rdelete(_ *Client, resources ResourceList, propagation metav1.DeletionPropa return res, nil } +// https://github.com/kubernetes/kubectl/blob/197123726db24c61aa0f78d1f0ba6e91a2ec2f35/pkg/cmd/apply/apply.go#L439 +func isIncompatibleServerError(err error) bool { + // 415: Unsupported media type means we're talking to a server which doesn't + // support server-side apply. + if _, ok := err.(*apierrors.StatusError); !ok { + // Non-StatusError means the error isn't because the server is incompatible. + return false + } + return err.(*apierrors.StatusError).Status().Code == http.StatusUnsupportedMediaType +} + // getManagedFieldsManager returns the manager string. If one was set it will be returned. // Otherwise, one is calculated based on the name of the binary. func getManagedFieldsManager() string { @@ -568,18 +814,41 @@ func getManagedFieldsManager() string { return filepath.Base(os.Args[0]) } +func perform(infos ResourceList, fn func(*resource.Info) error) error { + var result error + + if len(infos) == 0 { + return ErrNoObjectsVisited + } + + errs := make(chan error) + go batchPerform(infos, fn, errs) + + for range infos { + err := <-errs + if err != nil { + result = errors.Join(result, err) + } + } + + return result +} + func batchPerform(infos ResourceList, fn func(*resource.Info) error, errs chan<- error) { var kind string var wg sync.WaitGroup + defer wg.Wait() + for _, info := range infos { currentKind := info.Object.GetObjectKind().GroupVersionKind().Kind if kind != currentKind { wg.Wait() kind = currentKind } + wg.Add(1) - go func(i *resource.Info) { - errs <- fn(i) + go func(info *resource.Info) { + errs <- fn(info) wg.Done() }(info) } @@ -597,6 +866,7 @@ func createResource(info *resource.Info) error { if err != nil { return err } + return info.Refresh(obj, true) }) } @@ -674,48 +944,95 @@ func createPatch(target *resource.Info, current runtime.Object, threeWayMergeFor return patch, types.StrategicMergePatchType, err } -func updateResource(_ *Client, target *resource.Info, currentObj runtime.Object, force, threeWayMergeForUnstructured bool) error { - var ( - obj runtime.Object - helper = resource.NewHelper(target.Client, target.Mapping).WithFieldManager(getManagedFieldsManager()) - kind = target.Mapping.GroupVersionKind.Kind - ) +func replaceResource(target *resource.Info, fieldValidationDirective FieldValidationDirective) error { - // if --force is applied, attempt to replace the existing resource with the new object. - if force { - var err error - obj, err = helper.Replace(target.Namespace, target.Name, true, target.Object) - if err != nil { - return fmt.Errorf("failed to replace object: %w", err) - } - slog.Debug("replace succeeded", "name", target.Name, "initialKind", currentObj.GetObjectKind().GroupVersionKind().Kind, "kind", kind) - } else { - patch, patchType, err := createPatch(target, currentObj, threeWayMergeForUnstructured) - if err != nil { - return fmt.Errorf("failed to create patch: %w", err) - } + helper := resource.NewHelper(target.Client, target.Mapping). + WithFieldValidation(string(fieldValidationDirective)). + WithFieldManager(getManagedFieldsManager()) - if patch == nil || string(patch) == "{}" { - slog.Debug("no changes detected", "kind", kind, "name", target.Name) - // This needs to happen to make sure that Helm has the latest info from the API - // Otherwise there will be no labels and other functions that use labels will panic - if err := target.Get(); err != nil { - return fmt.Errorf("failed to refresh resource information: %w", err) - } - return nil - } - // send patch to server - slog.Debug("patching resource", "kind", kind, "name", target.Name, "namespace", target.Namespace) - obj, err = helper.Patch(target.Namespace, target.Name, patchType, patch, nil) - if err != nil { - return fmt.Errorf("cannot patch %q with kind %s: %w", target.Name, kind, err) + obj, err := helper.Replace(target.Namespace, target.Name, true, target.Object) + if err != nil { + return fmt.Errorf("failed to replace object: %w", err) + } + + if err := target.Refresh(obj, true); err != nil { + return fmt.Errorf("failed to refresh object after replace: %w", err) + } + + return nil + +} + +func patchResourceClientSide(target *resource.Info, original runtime.Object, threeWayMergeForUnstructured bool) error { + + patch, patchType, err := createPatch(target, original, threeWayMergeForUnstructured) + if err != nil { + return fmt.Errorf("failed to create patch: %w", err) + } + + kind := target.Mapping.GroupVersionKind.Kind + if patch == nil || string(patch) == "{}" { + slog.Debug("no changes detected", "kind", kind, "name", target.Name) + // This needs to happen to make sure that Helm has the latest info from the API + // Otherwise there will be no labels and other functions that use labels will panic + if err := target.Get(); err != nil { + return fmt.Errorf("failed to refresh resource information: %w", err) } + return nil + } + + // send patch to server + slog.Debug("patching resource", "kind", kind, "name", target.Name, "namespace", target.Namespace) + helper := resource.NewHelper(target.Client, target.Mapping).WithFieldManager(getManagedFieldsManager()) + obj, err := helper.Patch(target.Namespace, target.Name, patchType, patch, nil) + if err != nil { + return fmt.Errorf("cannot patch %q with kind %s: %w", target.Name, kind, err) } target.Refresh(obj, true) + return nil } +// Patch reource using server-side apply +func patchResourceServerSide(info *resource.Info, dryRun bool, forceConflicts bool, fieldValidationDirective FieldValidationDirective) error { + helper := resource.NewHelper( + info.Client, + info.Mapping). + DryRun(dryRun). + WithFieldManager(ManagedFieldsManager). + WithFieldValidation(string(fieldValidationDirective)) + + // Send the full object to be applied on the server side. + data, err := runtime.Encode(unstructured.UnstructuredJSONScheme, info.Object) + if err != nil { + return fmt.Errorf("failed to encode object %s/%s with kind %s: %w", info.Namespace, info.Name, info.Mapping.GroupVersionKind.Kind, err) + } + options := metav1.PatchOptions{ + Force: &forceConflicts, + } + obj, err := helper.Patch( + info.Namespace, + info.Name, + types.ApplyPatchType, + data, + &options, + ) + if err != nil { + if isIncompatibleServerError(err) { + return fmt.Errorf("server-side apply not available on the server: %v", err) + } + + if apierrors.IsConflict(err) { + return fmt.Errorf("conflict occurred while applying %s/%s with kind %s: %w", info.Namespace, info.Name, info.Mapping.GroupVersionKind.Kind, err) + } + + return err + } + + return info.Refresh(obj, true) +} + // GetPodList uses the kubernetes interface to get the list of pods filtered by listOptions func (c *Client) GetPodList(namespace string, listOptions metav1.ListOptions) (*v1.PodList, error) { podList, err := c.kubeClient.CoreV1().Pods(namespace).List(context.Background(), listOptions) diff --git a/pkg/kube/client_test.go b/pkg/kube/client_test.go index 5ffa0972b..8de856a5a 100644 --- a/pkg/kube/client_test.go +++ b/pkg/kube/client_test.go @@ -19,15 +19,19 @@ package kube import ( "bytes" "errors" + "fmt" "io" "net/http" "strings" + "sync" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -117,210 +121,209 @@ func newTestClient(t *testing.T) *Client { t.Cleanup(testFactory.Cleanup) return &Client{ - Factory: testFactory.WithNamespace("default"), + Factory: testFactory.WithNamespace(v1.NamespaceDefault), } } -func TestCreate(t *testing.T) { - // Note: c.Create with the fake client can currently only test creation of a single pod in the same list. When testing - // with more than one pod, c.Create will run into a data race as it calls perform->batchPerform which performs creation - // in batches. The first data race is on accessing var actions and can be fixed easily with a mutex lock in the Client - // function. The second data race though is something in the fake client itself in func (c *RESTClient) do(...) - // when it stores the req: c.Req = req and cannot (?) be fixed easily. - listA := newPodList("starfish") - listB := newPodList("dolphin") +type RequestResponseAction struct { + Request http.Request + Response http.Response + Error error +} - var actions []string - var iterationCounter int +type RoundTripperTestFunc func(previous []RequestResponseAction, req *http.Request) (*http.Response, error) - c := newTestClient(t) - c.Factory.(*cmdtesting.TestFactory).UnstructuredClient = &fake.RESTClient{ - NegotiatedSerializer: unstructuredSerializer, - Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { - path, method := req.URL.Path, req.Method - bodyReader := new(strings.Builder) - _, _ = io.Copy(bodyReader, req.Body) - body := bodyReader.String() - actions = append(actions, path+":"+method) - t.Logf("got request %s %s", path, method) - switch { - case path == "/namespaces/default/pods" && method == http.MethodPost: - if strings.Contains(body, "starfish") { - if iterationCounter < 2 { - iterationCounter++ - return newResponseJSON(http.StatusConflict, resourceQuotaConflict) - } - return newResponse(http.StatusOK, &listA.Items[0]) - } - return newResponseJSON(http.StatusConflict, resourceQuotaConflict) - default: - t.Fatalf("unexpected request: %s %s", method, path) - return nil, nil - } - }), +func NewRequestResponseLogClient(t *testing.T, cb RoundTripperTestFunc) RequestResponseLogClient { + t.Helper() + return RequestResponseLogClient{ + t: t, + cb: cb, } +} - t.Run("Create success", func(t *testing.T) { - list, err := c.Build(objBody(&listA), false) - if err != nil { - t.Fatal(err) - } +// RequestResponseLogClient is a test client that logs requests and responses +// Satifying http.RoundTripper interface, it can be used to mock HTTP requests in tests. +// Forwarding requests to a callback function (cb) that can be used to simulate server responses. +type RequestResponseLogClient struct { + t *testing.T + cb RoundTripperTestFunc + actionsLock sync.Mutex + Actions []RequestResponseAction +} - result, err := c.Create(list) - if err != nil { - t.Fatal(err) - } +func (r *RequestResponseLogClient) Do(req *http.Request) (*http.Response, error) { + t := r.t + t.Helper() - if len(result.Created) != 1 { - t.Errorf("expected 1 resource created, got %d", len(result.Created)) + readBodyBytes := func(body io.ReadCloser) []byte { + if body == nil { + return []byte{} } - expectedActions := []string{ - "/namespaces/default/pods:POST", - "/namespaces/default/pods:POST", - "/namespaces/default/pods:POST", - } - if len(expectedActions) != len(actions) { - t.Fatalf("unexpected number of requests, expected %d, got %d", len(expectedActions), len(actions)) - } - for k, v := range expectedActions { - if actions[k] != v { - t.Errorf("expected %s request got %s", v, actions[k]) - } - } - }) + defer body.Close() + bodyBytes, err := io.ReadAll(body) + require.NoError(t, err) - t.Run("Create failure", func(t *testing.T) { - list, err := c.Build(objBody(&listB), false) - if err != nil { - t.Fatal(err) - } + return bodyBytes + } - _, err = c.Create(list) - if err == nil { - t.Errorf("expected error") - } + reqBytes := readBodyBytes(req.Body) - expectedString := "Operation cannot be fulfilled on resourcequotas \"quota\": the object has been modified; " + - "please apply your changes to the latest version and try again" - if !strings.Contains(err.Error(), expectedString) { - t.Errorf("Unexpected error message: %q", err) - } + t.Logf("Request: %s %s %s", req.Method, req.URL.String(), reqBytes) + if req.Body != nil { + req.Body = io.NopCloser(bytes.NewReader(reqBytes)) + } - expectedActions := []string{ - "/namespaces/default/pods:POST", - } - for k, v := range actions { - if expectedActions[0] != v { - t.Errorf("expected %s request got %s", v, actions[k]) - } - } + resp, err := r.cb(r.Actions, req) + + respBytes := readBodyBytes(resp.Body) + t.Logf("Response: %d %s", resp.StatusCode, string(respBytes)) + if resp.Body != nil { + resp.Body = io.NopCloser(bytes.NewReader(respBytes)) + } + + r.actionsLock.Lock() + defer r.actionsLock.Unlock() + r.Actions = append(r.Actions, RequestResponseAction{ + Request: *req, + Response: *resp, + Error: err, }) + + return resp, err } -func testUpdate(t *testing.T, threeWayMerge bool) { - t.Helper() - listA := newPodList("starfish", "otter", "squid") - listB := newPodList("starfish", "otter", "dolphin") - listC := newPodList("starfish", "otter", "dolphin") - listB.Items[0].Spec.Containers[0].Ports = []v1.ContainerPort{{Name: "https", ContainerPort: 443}} - listC.Items[0].Spec.Containers[0].Ports = []v1.ContainerPort{{Name: "https", ContainerPort: 443}} +func TestCreate(t *testing.T) { + // Note: c.Create with the fake client can currently only test creation of a single pod/object in the same list. When testing + // with more than one pod, c.Create will run into a data race as it calls perform->batchPerform which performs creation + // in batches. The race is something in the fake client itself in `func (c *RESTClient) do(...)` + // when it stores the req: c.Req = req and cannot (?) be fixed easily. - var actions []string - var iterationCounter int + type testCase struct { + Name string + Pods v1.PodList + Callback func(t *testing.T, tc testCase, previous []RequestResponseAction, req *http.Request) (*http.Response, error) + ServerSideApply bool + ExpectedActions []string + ExpectedErrorContains string + } - c := newTestClient(t) - c.Factory.(*cmdtesting.TestFactory).UnstructuredClient = &fake.RESTClient{ - NegotiatedSerializer: unstructuredSerializer, - Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { - p, m := req.URL.Path, req.Method - actions = append(actions, p+":"+m) - t.Logf("got request %s %s", p, m) - switch { - case p == "/namespaces/default/pods/starfish" && m == http.MethodGet: - return newResponse(http.StatusOK, &listA.Items[0]) - case p == "/namespaces/default/pods/otter" && m == http.MethodGet: - return newResponse(http.StatusOK, &listA.Items[1]) - case p == "/namespaces/default/pods/otter" && m == http.MethodPatch: - data, err := io.ReadAll(req.Body) - if err != nil { - t.Fatalf("could not dump request: %s", err) - } - req.Body.Close() - expected := `{}` - if string(data) != expected { - t.Errorf("expected patch\n%s\ngot\n%s", expected, string(data)) - } - return newResponse(http.StatusOK, &listB.Items[0]) - case p == "/namespaces/default/pods/dolphin" && m == http.MethodGet: - return newResponse(http.StatusNotFound, notFoundBody()) - case p == "/namespaces/default/pods/starfish" && m == http.MethodPatch: - data, err := io.ReadAll(req.Body) - if err != nil { - t.Fatalf("could not dump request: %s", err) - } - req.Body.Close() - expected := `{"spec":{"$setElementOrder/containers":[{"name":"app:v4"}],"containers":[{"$setElementOrder/ports":[{"containerPort":443}],"name":"app:v4","ports":[{"containerPort":443,"name":"https"},{"$patch":"delete","containerPort":80}]}]}}` - if string(data) != expected { - t.Errorf("expected patch\n%s\ngot\n%s", expected, string(data)) - } - return newResponse(http.StatusOK, &listB.Items[0]) - case p == "/namespaces/default/pods" && m == http.MethodPost: - if iterationCounter < 2 { - iterationCounter++ + testCases := map[string]testCase{ + "Create success (client-side apply)": { + Pods: newPodList("starfish"), + ServerSideApply: false, + Callback: func(t *testing.T, tc testCase, previous []RequestResponseAction, _ *http.Request) (*http.Response, error) { + t.Helper() + + if len(previous) < 2 { // simulate a conflict return newResponseJSON(http.StatusConflict, resourceQuotaConflict) } - return newResponse(http.StatusOK, &listB.Items[1]) - case p == "/namespaces/default/pods/squid" && m == http.MethodDelete: - return newResponse(http.StatusOK, &listB.Items[1]) - case p == "/namespaces/default/pods/squid" && m == http.MethodGet: - return newResponse(http.StatusOK, &listB.Items[2]) - default: - t.Fatalf("unexpected request: %s %s", req.Method, req.URL.Path) - return nil, nil - } - }), - } - first, err := c.Build(objBody(&listA), false) - if err != nil { - t.Fatal(err) - } - second, err := c.Build(objBody(&listB), false) - if err != nil { - t.Fatal(err) - } - var result *Result - if threeWayMerge { - result, err = c.UpdateThreeWayMerge(first, second, false) - } else { - result, err = c.Update(first, second, false) - } - if err != nil { - t.Fatal(err) - } + return newResponse(http.StatusOK, &tc.Pods.Items[0]) + }, + ExpectedActions: []string{ + "/namespaces/default/pods:POST", + "/namespaces/default/pods:POST", + "/namespaces/default/pods:POST", + }, + }, + "Create success (server-side apply)": { + Pods: newPodList("whale"), + ServerSideApply: true, + Callback: func(t *testing.T, tc testCase, _ []RequestResponseAction, _ *http.Request) (*http.Response, error) { + t.Helper() - if len(result.Created) != 1 { - t.Errorf("expected 1 resource created, got %d", len(result.Created)) + return newResponse(http.StatusOK, &tc.Pods.Items[0]) + }, + ExpectedActions: []string{ + "/namespaces/default/pods/whale:PATCH", + }, + }, + "Create fail: incompatible server (server-side apply)": { + Pods: newPodList("lobster"), + ServerSideApply: true, + Callback: func(t *testing.T, _ testCase, _ []RequestResponseAction, req *http.Request) (*http.Response, error) { + t.Helper() + + return &http.Response{ + StatusCode: http.StatusUnsupportedMediaType, + Request: req, + }, nil + }, + ExpectedErrorContains: "server-side apply not available on the server:", + ExpectedActions: []string{ + "/namespaces/default/pods/lobster:PATCH", + }, + }, + "Create fail: quota (server-side apply)": { + Pods: newPodList("dolphin"), + ServerSideApply: true, + Callback: func(t *testing.T, _ testCase, _ []RequestResponseAction, _ *http.Request) (*http.Response, error) { + t.Helper() + + return newResponseJSON(http.StatusConflict, resourceQuotaConflict) + }, + ExpectedErrorContains: "Operation cannot be fulfilled on resourcequotas \"quota\": the object has been modified; " + + "please apply your changes to the latest version and try again", + ExpectedActions: []string{ + "/namespaces/default/pods/dolphin:PATCH", + }, + }, } - if len(result.Updated) != 2 { - t.Errorf("expected 2 resource updated, got %d", len(result.Updated)) + + c := newTestClient(t) + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + + client := NewRequestResponseLogClient(t, func(previous []RequestResponseAction, req *http.Request) (*http.Response, error) { + return tc.Callback(t, tc, previous, req) + }) + + c.Factory.(*cmdtesting.TestFactory).UnstructuredClient = &fake.RESTClient{ + NegotiatedSerializer: unstructuredSerializer, + Client: fake.CreateHTTPClient(client.Do), + } + + list, err := c.Build(objBody(&tc.Pods), false) + require.NoError(t, err) + if err != nil { + t.Fatal(err) + } + + result, err := c.Create( + list, + ClientCreateOptionServerSideApply(tc.ServerSideApply)) + if tc.ExpectedErrorContains != "" { + require.ErrorContains(t, err, tc.ExpectedErrorContains) + } else { + require.NoError(t, err) + + // See note above about limitations in supporting more than a single object + assert.Len(t, result.Created, 1, "expected 1 object created, got %d", len(result.Created)) + } + + actions := []string{} + for _, action := range client.Actions { + path, method := action.Request.URL.Path, action.Request.Method + actions = append(actions, path+":"+method) + } + + assert.Equal(t, tc.ExpectedActions, actions) + + }) } - if len(result.Deleted) != 1 { - t.Errorf("expected 1 resource deleted, got %d", len(result.Deleted)) +} + +func TestUpdate(t *testing.T) { + type testCase struct { + OriginalPods v1.PodList + TargetPods v1.PodList + ThreeWayMergeForUnstructured bool + ServerSideApply bool + ExpectedActions []string } - // TODO: Find a way to test methods that use Client Set - // Test with a wait - // if err := c.Update("test", objBody(codec, &listB), objBody(codec, &listC), false, 300, true); err != nil { - // t.Fatal(err) - // } - // Test with a wait should fail - // TODO: A way to make this not based off of an extremely short timeout? - // if err := c.Update("test", objBody(codec, &listC), objBody(codec, &listA), false, 2, true); err != nil { - // t.Fatal(err) - // } - expectedActions := []string{ + expectedActionsClientSideApply := []string{ "/namespaces/default/pods/starfish:GET", "/namespaces/default/pods/starfish:GET", "/namespaces/default/pods/starfish:PATCH", @@ -334,22 +337,152 @@ func testUpdate(t *testing.T, threeWayMerge bool) { "/namespaces/default/pods/squid:GET", "/namespaces/default/pods/squid:DELETE", } - if len(expectedActions) != len(actions) { - t.Fatalf("unexpected number of requests, expected %d, got %d", len(expectedActions), len(actions)) + + expectedActionsServerSideApply := []string{ + "/namespaces/default/pods/starfish:GET", + "/namespaces/default/pods/starfish:PATCH", + "/namespaces/default/pods/otter:GET", + "/namespaces/default/pods/otter:PATCH", + "/namespaces/default/pods/dolphin:GET", + "/namespaces/default/pods:POST", // create dolphin + "/namespaces/default/pods:POST", // retry due to 409 + "/namespaces/default/pods:POST", // retry due to 409 + "/namespaces/default/pods/squid:GET", + "/namespaces/default/pods/squid:DELETE", } - for k, v := range expectedActions { - if actions[k] != v { - t.Errorf("expected %s request got %s", v, actions[k]) - } + + testCases := map[string]testCase{ + "client-side apply": { + OriginalPods: newPodList("starfish", "otter", "squid"), + TargetPods: func() v1.PodList { + listTarget := newPodList("starfish", "otter", "dolphin") + listTarget.Items[0].Spec.Containers[0].Ports = []v1.ContainerPort{{Name: "https", ContainerPort: 443}} + + return listTarget + }(), + ThreeWayMergeForUnstructured: false, + ServerSideApply: false, + ExpectedActions: expectedActionsClientSideApply, + }, + "client-side apply (three-way merge for unstructured)": { + OriginalPods: newPodList("starfish", "otter", "squid"), + TargetPods: func() v1.PodList { + listTarget := newPodList("starfish", "otter", "dolphin") + listTarget.Items[0].Spec.Containers[0].Ports = []v1.ContainerPort{{Name: "https", ContainerPort: 443}} + + return listTarget + }(), + ThreeWayMergeForUnstructured: true, + ServerSideApply: false, + ExpectedActions: expectedActionsClientSideApply, + }, + "serverSideApply": { + OriginalPods: newPodList("starfish", "otter", "squid"), + TargetPods: func() v1.PodList { + listTarget := newPodList("starfish", "otter", "dolphin") + listTarget.Items[0].Spec.Containers[0].Ports = []v1.ContainerPort{{Name: "https", ContainerPort: 443}} + + return listTarget + }(), + ThreeWayMergeForUnstructured: false, + ServerSideApply: true, + ExpectedActions: expectedActionsServerSideApply, + }, } -} -func TestUpdate(t *testing.T) { - testUpdate(t, false) -} + c := newTestClient(t) + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + + listOriginal := tc.OriginalPods + listTarget := tc.TargetPods + + iterationCounter := 0 + cb := func(_ []RequestResponseAction, req *http.Request) (*http.Response, error) { + p, m := req.URL.Path, req.Method + + switch { + case p == "/namespaces/default/pods/starfish" && m == http.MethodGet: + return newResponse(http.StatusOK, &listOriginal.Items[0]) + case p == "/namespaces/default/pods/otter" && m == http.MethodGet: + return newResponse(http.StatusOK, &listOriginal.Items[1]) + case p == "/namespaces/default/pods/otter" && m == http.MethodPatch: + if !tc.ServerSideApply { + defer req.Body.Close() + data, err := io.ReadAll(req.Body) + require.NoError(t, err) + + assert.Equal(t, `{}`, string(data)) + } + + return newResponse(http.StatusOK, &listTarget.Items[0]) + case p == "/namespaces/default/pods/dolphin" && m == http.MethodGet: + return newResponse(http.StatusNotFound, notFoundBody()) + case p == "/namespaces/default/pods/starfish" && m == http.MethodPatch: + if !tc.ServerSideApply { + // Ensure client-side apply specifies correct patch + defer req.Body.Close() + data, err := io.ReadAll(req.Body) + require.NoError(t, err) + + expected := `{"spec":{"$setElementOrder/containers":[{"name":"app:v4"}],"containers":[{"$setElementOrder/ports":[{"containerPort":443}],"name":"app:v4","ports":[{"containerPort":443,"name":"https"},{"$patch":"delete","containerPort":80}]}]}}` + assert.Equal(t, expected, string(data)) + } + + return newResponse(http.StatusOK, &listTarget.Items[0]) + case p == "/namespaces/default/pods" && m == http.MethodPost: + if iterationCounter < 2 { + iterationCounter++ + return newResponseJSON(http.StatusConflict, resourceQuotaConflict) + } + + return newResponse(http.StatusOK, &listTarget.Items[1]) + case p == "/namespaces/default/pods/squid" && m == http.MethodDelete: + return newResponse(http.StatusOK, &listTarget.Items[1]) + case p == "/namespaces/default/pods/squid" && m == http.MethodGet: + return newResponse(http.StatusOK, &listTarget.Items[2]) + default: + } + + t.Fail() + return nil, nil + } + + client := NewRequestResponseLogClient(t, cb) + + c.Factory.(*cmdtesting.TestFactory).UnstructuredClient = &fake.RESTClient{ + NegotiatedSerializer: unstructuredSerializer, + Client: fake.CreateHTTPClient(client.Do), + } + + first, err := c.Build(objBody(&listOriginal), false) + require.NoError(t, err) + + second, err := c.Build(objBody(&listTarget), false) + require.NoError(t, err) + + result, err := c.Update( + first, + second, + ClientUpdateOptionThreeWayMergeForUnstructured(tc.ThreeWayMergeForUnstructured), + ClientUpdateOptionForceReplace(false), + ClientUpdateOptionServerSideApply(tc.ServerSideApply)) + require.NoError(t, err) -func TestUpdateThreeWayMerge(t *testing.T) { - testUpdate(t, true) + assert.Len(t, result.Created, 1, "expected 1 resource created, got %d", len(result.Created)) + assert.Len(t, result.Updated, 2, "expected 2 resource updated, got %d", len(result.Updated)) + assert.Len(t, result.Deleted, 1, "expected 1 resource deleted, got %d", len(result.Deleted)) + + actions := []string{} + for _, action := range client.Actions { + path, method := action.Request.URL.Path, action.Request.Method + actions = append(actions, path+":"+method) + } + + assert.Equal(t, tc.ExpectedActions, actions) + }) + } } func TestBuild(t *testing.T) { @@ -548,7 +681,11 @@ func TestWait(t *testing.T) { if err != nil { t.Fatal(err) } - result, err := c.Create(resources) + + result, err := c.Create( + resources, + ClientCreateOptionServerSideApply(false)) + if err != nil { t.Fatal(err) } @@ -605,7 +742,10 @@ func TestWaitJob(t *testing.T) { if err != nil { t.Fatal(err) } - result, err := c.Create(resources) + result, err := c.Create( + resources, + ClientCreateOptionServerSideApply(false)) + if err != nil { t.Fatal(err) } @@ -664,7 +804,9 @@ func TestWaitDelete(t *testing.T) { if err != nil { t.Fatal(err) } - result, err := c.Create(resources) + result, err := c.Create( + resources, + ClientCreateOptionServerSideApply(false)) if err != nil { t.Fatal(err) } @@ -1083,6 +1225,7 @@ func TestCreatePatchCustomResourceSpec(t *testing.T) { t.Run(testCase.name, testCase.run) } +<<<<<<< HEAD type errorFactory struct { *cmdtesting.TestFactory err error @@ -1183,4 +1326,428 @@ func TestIsReachable(t *testing.T) { } }) } +||||||| parent of 36a476ff4 (Kube client support server-side apply) +======= +func TestIsIncompatibleServerError(t *testing.T) { + testCases := map[string]struct { + Err error + Want bool + }{ + "Unsupported media type": { + Err: &apierrors.StatusError{ErrStatus: metav1.Status{Code: http.StatusUnsupportedMediaType}}, + Want: true, + }, + "Not found error": { + Err: &apierrors.StatusError{ErrStatus: metav1.Status{Code: http.StatusNotFound}}, + Want: false, + }, + "Generic error": { + Err: fmt.Errorf("some generic error"), + Want: false, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + if got := isIncompatibleServerError(tc.Err); got != tc.Want { + t.Errorf("isIncompatibleServerError() = %v, want %v", got, tc.Want) + } + }) + } +} + +func TestReplaceResource(t *testing.T) { + type testCase struct { + Pods v1.PodList + Callback func(t *testing.T, tc testCase, previous []RequestResponseAction, req *http.Request) (*http.Response, error) + ExpectedErrorContains string + } + + testCases := map[string]testCase{ + "normal": { + Pods: newPodList("whale"), + Callback: func(t *testing.T, tc testCase, previous []RequestResponseAction, req *http.Request) (*http.Response, error) { + t.Helper() + + assert.Equal(t, "/namespaces/default/pods/whale", req.URL.Path) + switch len(previous) { + case 0: + assert.Equal(t, "GET", req.Method) + case 1: + assert.Equal(t, "PUT", req.Method) + } + + return newResponse(http.StatusOK, &tc.Pods.Items[0]) + }, + }, + "conflict": { + Pods: newPodList("whale"), + Callback: func(t *testing.T, _ testCase, _ []RequestResponseAction, req *http.Request) (*http.Response, error) { + t.Helper() + + return &http.Response{ + StatusCode: http.StatusConflict, + Request: req, + }, nil + }, + ExpectedErrorContains: "failed to replace object: the server reported a conflict", + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + + testFactory := cmdtesting.NewTestFactory() + t.Cleanup(testFactory.Cleanup) + + client := NewRequestResponseLogClient(t, func(previous []RequestResponseAction, req *http.Request) (*http.Response, error) { + t.Helper() + + return tc.Callback(t, tc, previous, req) + }) + + testFactory.UnstructuredClient = &fake.RESTClient{ + NegotiatedSerializer: unstructuredSerializer, + Client: fake.CreateHTTPClient(client.Do), + } + + resourceList, err := buildResourceList(testFactory, v1.NamespaceDefault, FieldValidationDirectiveStrict, objBody(&tc.Pods), nil) + require.NoError(t, err) + + require.Len(t, resourceList, 1) + info := resourceList[0] + + err = replaceResource(info, FieldValidationDirectiveStrict) + if tc.ExpectedErrorContains != "" { + require.ErrorContains(t, err, tc.ExpectedErrorContains) + } else { + require.NoError(t, err) + require.NotNil(t, info.Object) + } + }) + } +} + +func TestUpdateResourceThreeWayMerge(t *testing.T) { + type testCase struct { + OriginalPods v1.PodList + TargetPods v1.PodList + ThreeWayMergeForUnstructured bool + Callback func(t *testing.T, tc testCase, previous []RequestResponseAction, req *http.Request) (*http.Response, error) + ExpectedErrorContains string + } + + testCases := map[string]testCase{ + "normal": { + OriginalPods: newPodList("whale"), + TargetPods: func() v1.PodList { + pods := newPodList("whale") + pods.Items[0].Spec.Containers[0].Ports = []v1.ContainerPort{{Name: "https", ContainerPort: 443}} + + return pods + }(), + ThreeWayMergeForUnstructured: false, + Callback: func(t *testing.T, tc testCase, previous []RequestResponseAction, req *http.Request) (*http.Response, error) { + t.Helper() + + assert.Equal(t, "/namespaces/default/pods/whale", req.URL.Path) + switch len(previous) { + case 0: + assert.Equal(t, "GET", req.Method) + return newResponse(http.StatusOK, &tc.OriginalPods.Items[0]) + case 1: + assert.Equal(t, "PATCH", req.Method) + assert.Equal(t, "application/strategic-merge-patch+json", req.Header.Get("Content-Type")) + return newResponse(http.StatusOK, &tc.TargetPods.Items[0]) + } + + t.Fail() + return nil, nil + }, + }, + "three way merge for unstructured": { + OriginalPods: newPodList("whale"), + TargetPods: func() v1.PodList { + pods := newPodList("whale") + pods.Items[0].Spec.Containers[0].Ports = []v1.ContainerPort{{Name: "https", ContainerPort: 443}} + + return pods + }(), + ThreeWayMergeForUnstructured: true, + Callback: func(t *testing.T, tc testCase, previous []RequestResponseAction, req *http.Request) (*http.Response, error) { + t.Helper() + + assert.Equal(t, "/namespaces/default/pods/whale", req.URL.Path) + switch len(previous) { + case 0: + assert.Equal(t, "GET", req.Method) + return newResponse(http.StatusOK, &tc.OriginalPods.Items[0]) + case 1: + t.Logf("patcher: %+v", req.Header) + assert.Equal(t, "PATCH", req.Method) + assert.Equal(t, "application/strategic-merge-patch+json", req.Header.Get("Content-Type")) + return newResponse(http.StatusOK, &tc.TargetPods.Items[0]) + } + + t.Fail() + return nil, nil + }, + }, + "conflict": { + OriginalPods: newPodList("whale"), + TargetPods: func() v1.PodList { + pods := newPodList("whale") + pods.Items[0].Spec.Containers[0].Ports = []v1.ContainerPort{{Name: "https", ContainerPort: 443}} + + return pods + }(), + Callback: func(t *testing.T, tc testCase, previous []RequestResponseAction, req *http.Request) (*http.Response, error) { + t.Helper() + + assert.Equal(t, "/namespaces/default/pods/whale", req.URL.Path) + switch len(previous) { + case 0: + assert.Equal(t, "GET", req.Method) + return newResponse(http.StatusOK, &tc.OriginalPods.Items[0]) + case 1: + assert.Equal(t, "PATCH", req.Method) + return &http.Response{ + StatusCode: http.StatusConflict, + Request: req, + }, nil + } + + t.Fail() + return nil, nil + + }, + ExpectedErrorContains: "cannot patch \"whale\" with kind Pod: the server reported a conflict", + }, + "no patch": { + OriginalPods: newPodList("whale"), + TargetPods: newPodList("whale"), + Callback: func(t *testing.T, tc testCase, previous []RequestResponseAction, req *http.Request) (*http.Response, error) { + t.Helper() + + assert.Equal(t, "/namespaces/default/pods/whale", req.URL.Path) + switch len(previous) { + case 0: + assert.Equal(t, "GET", req.Method) + return newResponse(http.StatusOK, &tc.OriginalPods.Items[0]) + case 1: + assert.Equal(t, "GET", req.Method) + return newResponse(http.StatusOK, &tc.TargetPods.Items[0]) + } + + t.Fail() + return nil, nil // newResponse(http.StatusOK, &tc.TargetPods.Items[0]) + + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + + testFactory := cmdtesting.NewTestFactory() + t.Cleanup(testFactory.Cleanup) + + client := NewRequestResponseLogClient(t, func(previous []RequestResponseAction, req *http.Request) (*http.Response, error) { + return tc.Callback(t, tc, previous, req) + }) + + testFactory.UnstructuredClient = &fake.RESTClient{ + NegotiatedSerializer: unstructuredSerializer, + Client: fake.CreateHTTPClient(client.Do), + } + + resourceListCurrent, err := buildResourceList(testFactory, v1.NamespaceDefault, FieldValidationDirectiveStrict, objBody(&tc.OriginalPods), nil) + require.NoError(t, err) + require.Len(t, resourceListCurrent, 1) + + resourceListTarget, err := buildResourceList(testFactory, v1.NamespaceDefault, FieldValidationDirectiveStrict, objBody(&tc.TargetPods), nil) + require.NoError(t, err) + require.Len(t, resourceListTarget, 1) + + current := resourceListCurrent[0] + target := resourceListTarget[0] + + err = patchResourceClientSide(target, current.Object, tc.ThreeWayMergeForUnstructured) + if tc.ExpectedErrorContains != "" { + require.ErrorContains(t, err, tc.ExpectedErrorContains) + } else { + require.NoError(t, err) + require.NotNil(t, target.Object) + } + }) + } +} + +func TestPatchResourceServerSide(t *testing.T) { + type testCase struct { + Pods v1.PodList + DryRun bool + ForceConflicts bool + FieldValidationDirective FieldValidationDirective + Callback func(t *testing.T, tc testCase, previous []RequestResponseAction, req *http.Request) (*http.Response, error) + ExpectedErrorContains string + } + + testCases := map[string]testCase{ + "normal": { + Pods: newPodList("whale"), + DryRun: false, + ForceConflicts: false, + FieldValidationDirective: FieldValidationDirectiveStrict, + Callback: func(t *testing.T, tc testCase, _ []RequestResponseAction, req *http.Request) (*http.Response, error) { + t.Helper() + + assert.Equal(t, "PATCH", req.Method) + assert.Equal(t, "application/apply-patch+yaml", req.Header.Get("Content-Type")) + assert.Equal(t, "/namespaces/default/pods/whale", req.URL.Path) + assert.Equal(t, "false", req.URL.Query().Get("force")) + assert.Equal(t, "Strict", req.URL.Query().Get("fieldValidation")) + + return newResponse(http.StatusOK, &tc.Pods.Items[0]) + }, + }, + "dry run": { + Pods: newPodList("whale"), + DryRun: true, + ForceConflicts: false, + FieldValidationDirective: FieldValidationDirectiveStrict, + Callback: func(t *testing.T, tc testCase, _ []RequestResponseAction, req *http.Request) (*http.Response, error) { + t.Helper() + + assert.Equal(t, "PATCH", req.Method) + assert.Equal(t, "application/apply-patch+yaml", req.Header.Get("Content-Type")) + assert.Equal(t, "/namespaces/default/pods/whale", req.URL.Path) + assert.Equal(t, "All", req.URL.Query().Get("dryRun")) + assert.Equal(t, "false", req.URL.Query().Get("force")) + assert.Equal(t, "Strict", req.URL.Query().Get("fieldValidation")) + + return newResponse(http.StatusOK, &tc.Pods.Items[0]) + }, + }, + "force conflicts": { + Pods: newPodList("whale"), + DryRun: false, + ForceConflicts: true, + FieldValidationDirective: FieldValidationDirectiveStrict, + Callback: func(t *testing.T, tc testCase, _ []RequestResponseAction, req *http.Request) (*http.Response, error) { + t.Helper() + + assert.Equal(t, "PATCH", req.Method) + assert.Equal(t, "application/apply-patch+yaml", req.Header.Get("Content-Type")) + assert.Equal(t, "/namespaces/default/pods/whale", req.URL.Path) + assert.Equal(t, "true", req.URL.Query().Get("force")) + assert.Equal(t, "Strict", req.URL.Query().Get("fieldValidation")) + + return newResponse(http.StatusOK, &tc.Pods.Items[0]) + }, + }, + "dry run + force conflicts": { + Pods: newPodList("whale"), + DryRun: true, + ForceConflicts: true, + FieldValidationDirective: FieldValidationDirectiveStrict, + Callback: func(t *testing.T, tc testCase, _ []RequestResponseAction, req *http.Request) (*http.Response, error) { + t.Helper() + + assert.Equal(t, "PATCH", req.Method) + assert.Equal(t, "application/apply-patch+yaml", req.Header.Get("Content-Type")) + assert.Equal(t, "/namespaces/default/pods/whale", req.URL.Path) + assert.Equal(t, "All", req.URL.Query().Get("dryRun")) + assert.Equal(t, "true", req.URL.Query().Get("force")) + assert.Equal(t, "Strict", req.URL.Query().Get("fieldValidation")) + + return newResponse(http.StatusOK, &tc.Pods.Items[0]) + }, + }, + "field validation ignore": { + Pods: newPodList("whale"), + DryRun: false, + ForceConflicts: false, + FieldValidationDirective: FieldValidationDirectiveIgnore, + Callback: func(t *testing.T, tc testCase, _ []RequestResponseAction, req *http.Request) (*http.Response, error) { + t.Helper() + + assert.Equal(t, "PATCH", req.Method) + assert.Equal(t, "application/apply-patch+yaml", req.Header.Get("Content-Type")) + assert.Equal(t, "/namespaces/default/pods/whale", req.URL.Path) + assert.Equal(t, "false", req.URL.Query().Get("force")) + assert.Equal(t, "Ignore", req.URL.Query().Get("fieldValidation")) + + return newResponse(http.StatusOK, &tc.Pods.Items[0]) + }, + }, + "incompatible server": { + Pods: newPodList("whale"), + DryRun: false, + ForceConflicts: false, + FieldValidationDirective: FieldValidationDirectiveStrict, + Callback: func(t *testing.T, _ testCase, _ []RequestResponseAction, req *http.Request) (*http.Response, error) { + t.Helper() + + return &http.Response{ + StatusCode: http.StatusUnsupportedMediaType, + Request: req, + }, nil + }, + ExpectedErrorContains: "server-side apply not available on the server:", + }, + "conflict": { + Pods: newPodList("whale"), + DryRun: false, + ForceConflicts: false, + FieldValidationDirective: FieldValidationDirectiveStrict, + Callback: func(t *testing.T, _ testCase, _ []RequestResponseAction, req *http.Request) (*http.Response, error) { + t.Helper() + + return &http.Response{ + StatusCode: http.StatusConflict, + Request: req, + }, nil + }, + ExpectedErrorContains: "the server reported a conflict", + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + + testFactory := cmdtesting.NewTestFactory() + t.Cleanup(testFactory.Cleanup) + + client := NewRequestResponseLogClient(t, func(previous []RequestResponseAction, req *http.Request) (*http.Response, error) { + return tc.Callback(t, tc, previous, req) + }) + + testFactory.UnstructuredClient = &fake.RESTClient{ + NegotiatedSerializer: unstructuredSerializer, + Client: fake.CreateHTTPClient(client.Do), + } + + resourceList, err := buildResourceList(testFactory, v1.NamespaceDefault, tc.FieldValidationDirective, objBody(&tc.Pods), nil) + require.NoError(t, err) + + require.Len(t, resourceList, 1) + info := resourceList[0] + + err = patchResourceServerSide(info, tc.DryRun, tc.ForceConflicts, tc.FieldValidationDirective) + if tc.ExpectedErrorContains != "" { + require.ErrorContains(t, err, tc.ExpectedErrorContains) + } else { + require.NoError(t, err) + require.NotNil(t, info.Object) + } + }) + } +} + +func TestDetermineFieldValidationDirective(t *testing.T) { + + assert.Equal(t, FieldValidationDirectiveIgnore, determineFieldValidationDirective(false)) + assert.Equal(t, FieldValidationDirectiveStrict, determineFieldValidationDirective(true)) +>>>>>>> 36a476ff4 (Kube client support server-side apply) } diff --git a/pkg/kube/fake/fake.go b/pkg/kube/fake/fake.go index a543a0f73..588bba83d 100644 --- a/pkg/kube/fake/fake.go +++ b/pkg/kube/fake/fake.go @@ -60,11 +60,11 @@ type FailingKubeWaiter struct { } // Create returns the configured error if set or prints -func (f *FailingKubeClient) Create(resources kube.ResourceList) (*kube.Result, error) { +func (f *FailingKubeClient) Create(resources kube.ResourceList, options ...kube.ClientCreateOption) (*kube.Result, error) { if f.CreateError != nil { return nil, f.CreateError } - return f.PrintingKubeClient.Create(resources) + return f.PrintingKubeClient.Create(resources, options...) } // Get returns the configured error if set or prints @@ -117,19 +117,11 @@ func (f *FailingKubeWaiter) WatchUntilReady(resources kube.ResourceList, d time. } // Update returns the configured error if set or prints -func (f *FailingKubeClient) Update(r, modified kube.ResourceList, ignoreMe bool) (*kube.Result, error) { +func (f *FailingKubeClient) Update(r, modified kube.ResourceList, options ...kube.ClientUpdateOption) (*kube.Result, error) { if f.UpdateError != nil { return &kube.Result{}, f.UpdateError } - return f.PrintingKubeClient.Update(r, modified, ignoreMe) -} - -// Update returns the configured error if set or prints -func (f *FailingKubeClient) UpdateThreeWayMerge(r, modified kube.ResourceList, ignoreMe bool) (*kube.Result, error) { - if f.UpdateError != nil { - return &kube.Result{}, f.UpdateError - } - return f.PrintingKubeClient.Update(r, modified, ignoreMe) + return f.PrintingKubeClient.Update(r, modified, options...) } // Build returns the configured error if set or prints diff --git a/pkg/kube/fake/printer.go b/pkg/kube/fake/printer.go index f6659a904..16c93615a 100644 --- a/pkg/kube/fake/printer.go +++ b/pkg/kube/fake/printer.go @@ -49,7 +49,7 @@ func (p *PrintingKubeClient) IsReachable() error { } // Create prints the values of what would be created with a real KubeClient. -func (p *PrintingKubeClient) Create(resources kube.ResourceList) (*kube.Result, error) { +func (p *PrintingKubeClient) Create(resources kube.ResourceList, _ ...kube.ClientCreateOption) (*kube.Result, error) { _, err := io.Copy(p.Out, bufferize(resources)) if err != nil { return nil, err @@ -98,7 +98,7 @@ func (p *PrintingKubeClient) Delete(resources kube.ResourceList) (*kube.Result, } // Update implements KubeClient Update. -func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ bool) (*kube.Result, error) { +func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ ...kube.ClientUpdateOption) (*kube.Result, error) { _, err := io.Copy(p.Out, bufferize(modified)) if err != nil { return nil, err diff --git a/pkg/kube/interface.go b/pkg/kube/interface.go index 6b945088e..7339ae0ff 100644 --- a/pkg/kube/interface.go +++ b/pkg/kube/interface.go @@ -30,14 +30,14 @@ import ( // A KubernetesClient must be concurrency safe. type Interface interface { // Create creates one or more resources. - Create(resources ResourceList) (*Result, error) + Create(resources ResourceList, options ...ClientCreateOption) (*Result, error) // Delete destroys one or more resources. Delete(resources ResourceList) (*Result, []error) // Update updates one or more resources or creates the resource // if it doesn't exist. - Update(original, target ResourceList, force bool) (*Result, error) + Update(original, target ResourceList, options ...ClientUpdateOption) (*Result, error) // Build creates a resource list from a Reader. // @@ -53,13 +53,6 @@ type Interface interface { GetWaiter(ws WaitStrategy) (Waiter, error) } -// InterfaceThreeWayMerge was introduced to avoid breaking backwards compatibility for Interface implementers. -// -// TODO Helm 4: Remove InterfaceThreeWayMerge and integrate its method(s) into the Interface. -type InterfaceThreeWayMerge interface { - UpdateThreeWayMerge(original, target ResourceList, force bool) (*Result, error) -} - // Waiter defines methods related to waiting for resource states. type Waiter interface { // Wait waits up to the given timeout for the specified resources to be ready. @@ -125,7 +118,6 @@ type InterfaceResources interface { } var _ Interface = (*Client)(nil) -var _ InterfaceThreeWayMerge = (*Client)(nil) var _ InterfaceLogs = (*Client)(nil) var _ InterfaceDeletionPropagation = (*Client)(nil) var _ InterfaceResources = (*Client)(nil) diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index 8a3bacdcc..9bfa1ef6d 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -18,7 +18,6 @@ package kube // import "helm.sh/helm/v4/pkg/kube" import ( "context" - "errors" "fmt" "log/slog" "net/http" @@ -223,26 +222,6 @@ func (hw *legacyWaiter) WatchUntilReady(resources ResourceList, timeout time.Dur return perform(resources, hw.watchTimeout(timeout)) } -func perform(infos ResourceList, fn func(*resource.Info) error) error { - var result error - - if len(infos) == 0 { - return ErrNoObjectsVisited - } - - errs := make(chan error) - go batchPerform(infos, fn, errs) - - for range infos { - err := <-errs - if err != nil { - result = errors.Join(result, err) - } - } - - return result -} - func (hw *legacyWaiter) watchUntilReady(timeout time.Duration, info *resource.Info) error { kind := info.Mapping.GroupVersionKind.Kind switch kind { From 741facca434c9ae9d4c7de4c5ea1ad71d6782790 Mon Sep 17 00:00:00 2001 From: George Jenkins Date: Mon, 7 Jul 2025 10:41:40 -0700 Subject: [PATCH 2/4] Update pkg/kube/client_test.go Signed-off-by: George Jenkins --- pkg/kube/client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kube/client_test.go b/pkg/kube/client_test.go index 8de856a5a..6fc2f1cc8 100644 --- a/pkg/kube/client_test.go +++ b/pkg/kube/client_test.go @@ -1428,7 +1428,7 @@ func TestReplaceResource(t *testing.T) { } } -func TestUpdateResourceThreeWayMerge(t *testing.T) { +func TestPatchResourceClientSide(t *testing.T) { type testCase struct { OriginalPods v1.PodList TargetPods v1.PodList From 99dc23f00b37624ef7070aa9059cfd5bdfcff5a2 Mon Sep 17 00:00:00 2001 From: George Jenkins Date: Mon, 28 Jul 2025 22:13:49 -0700 Subject: [PATCH 3/4] switch target<->original Signed-off-by: George Jenkins --- pkg/kube/client.go | 62 ++++++++++++++++++++--------------------- pkg/kube/client_test.go | 30 ++++++++++---------- 2 files changed, 46 insertions(+), 46 deletions(-) diff --git a/pkg/kube/client.go b/pkg/kube/client.go index aa7c86c9b..b436f518f 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -499,12 +499,12 @@ func (c *Client) BuildTable(reader io.Reader, validate bool) (ResourceList, erro transformRequests) } -func (c *Client) update(target, original ResourceList, updateApplyFunc func(target, original *resource.Info) error) (*Result, error) { +func (c *Client) update(originals, targets ResourceList, updateApplyFunc func(original, target *resource.Info) error) (*Result, error) { updateErrors := []error{} res := &Result{} - slog.Debug("checking resources for changes", "resources", len(target)) - err := target.Visit(func(target *resource.Info, err error) error { + slog.Debug("checking resources for changes", "resources", len(targets)) + err := targets.Visit(func(target *resource.Info, err error) error { if err != nil { return err } @@ -528,13 +528,13 @@ func (c *Client) update(target, original ResourceList, updateApplyFunc func(targ return nil } - original := original.Get(target) + original := originals.Get(target) if original == nil { kind := target.Mapping.GroupVersionKind.Kind return fmt.Errorf("original object %s with the name %q not found", kind, target.Name) } - if err := updateApplyFunc(target, original); err != nil { + if err := updateApplyFunc(original, target); err != nil { updateErrors = append(updateErrors, err) } @@ -551,7 +551,7 @@ func (c *Client) update(target, original ResourceList, updateApplyFunc func(targ return res, joinErrors(updateErrors, " && ") } - for _, info := range original.Difference(target) { + for _, info := range originals.Difference(targets) { slog.Debug("deleting resource", "namespace", info.Namespace, "name", info.Name, "kind", info.Mapping.GroupVersionKind.Kind) if err := info.Get(); err != nil { @@ -661,7 +661,7 @@ func ClientUpdateOptionFieldValidationDirective(fieldValidationDirective FieldVa // used for cleanup or other logging purposes. // // The default is to use server-side apply, equivalent to: `ClientUpdateOptionServerSideApply(true)` -func (c *Client) Update(original, target ResourceList, options ...ClientUpdateOption) (*Result, error) { +func (c *Client) Update(originals, targets ResourceList, options ...ClientUpdateOption) (*Result, error) { updateOptions := clientUpdateOptions{ serverSideApply: true, // Default to server-side apply fieldValidationDirective: FieldValidationDirectiveStrict, @@ -683,12 +683,12 @@ func (c *Client) Update(original, target ResourceList, options ...ClientUpdateOp return nil, fmt.Errorf("invalid operation: cannot use server-side apply and force replace together") } - makeUpdateApplyFunc := func() func(target, original *resource.Info) error { + makeUpdateApplyFunc := func() func(original, target *resource.Info) error { if updateOptions.forceReplace { slog.Debug( "using resource replace update strategy", slog.String("fieldValidationDirective", string(updateOptions.fieldValidationDirective))) - return func(target, original *resource.Info) error { + return func(original, target *resource.Info) error { if err := replaceResource(target, updateOptions.fieldValidationDirective); err != nil { slog.Debug("error replacing the resource", "namespace", target.Namespace, "name", target.Name, "kind", target.Mapping.GroupVersionKind.Kind, slog.Any("error", err)) return err @@ -706,7 +706,7 @@ func (c *Client) Update(original, target ResourceList, options ...ClientUpdateOp slog.Bool("forceConflicts", updateOptions.forceConflicts), slog.Bool("dryRun", updateOptions.dryRun), slog.String("fieldValidationDirective", string(updateOptions.fieldValidationDirective))) - return func(target, _ *resource.Info) error { + return func(_, target *resource.Info) error { err := patchResourceServerSide(target, updateOptions.dryRun, updateOptions.forceConflicts, updateOptions.fieldValidationDirective) logger := slog.With( @@ -725,12 +725,12 @@ func (c *Client) Update(original, target ResourceList, options ...ClientUpdateOp } slog.Debug("using client-side apply for resource update", slog.Bool("threeWayMergeForUnstructured", updateOptions.threeWayMergeForUnstructured)) - return func(target, original *resource.Info) error { - return patchResourceClientSide(target, original.Object, updateOptions.threeWayMergeForUnstructured) + return func(original, target *resource.Info) error { + return patchResourceClientSide(original.Object, target, updateOptions.threeWayMergeForUnstructured) } } - return c.update(target, original, makeUpdateApplyFunc()) + return c.update(originals, targets, makeUpdateApplyFunc()) } // Delete deletes Kubernetes resources specified in the resources list with @@ -753,16 +753,16 @@ func deleteResources(resources ResourceList, propagation metav1.DeletionPropagat var errs []error res := &Result{} mtx := sync.Mutex{} - err := perform(resources, func(info *resource.Info) error { - slog.Debug("starting delete resource", "namespace", info.Namespace, "name", info.Name, "kind", info.Mapping.GroupVersionKind.Kind) - err := deleteResource(info, propagation) + err := perform(resources, func(target *resource.Info) error { + slog.Debug("starting delete resource", "namespace", target.Namespace, "name", target.Name, "kind", target.Mapping.GroupVersionKind.Kind) + err := deleteResource(target, propagation) if err == nil || apierrors.IsNotFound(err) { if err != nil { - slog.Debug("ignoring delete failure", "namespace", info.Namespace, "name", info.Name, "kind", info.Mapping.GroupVersionKind.Kind, slog.Any("error", err)) + slog.Debug("ignoring delete failure", "namespace", target.Namespace, "name", target.Name, "kind", target.Mapping.GroupVersionKind.Kind, slog.Any("error", err)) } mtx.Lock() defer mtx.Unlock() - res.Deleted = append(res.Deleted, info) + res.Deleted = append(res.Deleted, target) return nil } mtx.Lock() @@ -881,8 +881,8 @@ func deleteResource(info *resource.Info, policy metav1.DeletionPropagation) erro }) } -func createPatch(target *resource.Info, current runtime.Object, threeWayMergeForUnstructured bool) ([]byte, types.PatchType, error) { - oldData, err := json.Marshal(current) +func createPatch(original runtime.Object, target *resource.Info, threeWayMergeForUnstructured bool) ([]byte, types.PatchType, error) { + oldData, err := json.Marshal(original) if err != nil { return nil, types.StrategicMergePatchType, fmt.Errorf("serializing current configuration: %w", err) } @@ -963,9 +963,9 @@ func replaceResource(target *resource.Info, fieldValidationDirective FieldValida } -func patchResourceClientSide(target *resource.Info, original runtime.Object, threeWayMergeForUnstructured bool) error { +func patchResourceClientSide(original runtime.Object, target *resource.Info, threeWayMergeForUnstructured bool) error { - patch, patchType, err := createPatch(target, original, threeWayMergeForUnstructured) + patch, patchType, err := createPatch(original, target, threeWayMergeForUnstructured) if err != nil { return fmt.Errorf("failed to create patch: %w", err) } @@ -995,25 +995,25 @@ func patchResourceClientSide(target *resource.Info, original runtime.Object, thr } // Patch reource using server-side apply -func patchResourceServerSide(info *resource.Info, dryRun bool, forceConflicts bool, fieldValidationDirective FieldValidationDirective) error { +func patchResourceServerSide(target *resource.Info, dryRun bool, forceConflicts bool, fieldValidationDirective FieldValidationDirective) error { helper := resource.NewHelper( - info.Client, - info.Mapping). + target.Client, + target.Mapping). DryRun(dryRun). WithFieldManager(ManagedFieldsManager). WithFieldValidation(string(fieldValidationDirective)) // Send the full object to be applied on the server side. - data, err := runtime.Encode(unstructured.UnstructuredJSONScheme, info.Object) + data, err := runtime.Encode(unstructured.UnstructuredJSONScheme, target.Object) if err != nil { - return fmt.Errorf("failed to encode object %s/%s with kind %s: %w", info.Namespace, info.Name, info.Mapping.GroupVersionKind.Kind, err) + return fmt.Errorf("failed to encode object %s/%s with kind %s: %w", target.Namespace, target.Name, target.Mapping.GroupVersionKind.Kind, err) } options := metav1.PatchOptions{ Force: &forceConflicts, } obj, err := helper.Patch( - info.Namespace, - info.Name, + target.Namespace, + target.Name, types.ApplyPatchType, data, &options, @@ -1024,13 +1024,13 @@ func patchResourceServerSide(info *resource.Info, dryRun bool, forceConflicts bo } if apierrors.IsConflict(err) { - return fmt.Errorf("conflict occurred while applying %s/%s with kind %s: %w", info.Namespace, info.Name, info.Mapping.GroupVersionKind.Kind, err) + return fmt.Errorf("conflict occurred while applying %s/%s with kind %s: %w", target.Namespace, target.Name, target.Mapping.GroupVersionKind.Kind, err) } return err } - return info.Refresh(obj, true) + return target.Refresh(obj, true) } // GetPodList uses the kubernetes interface to get the list of pods filtered by listOptions diff --git a/pkg/kube/client_test.go b/pkg/kube/client_test.go index 6fc2f1cc8..bdc5a9d7f 100644 --- a/pkg/kube/client_test.go +++ b/pkg/kube/client_test.go @@ -1083,8 +1083,8 @@ type createPatchTestCase struct { // The target state. target *unstructured.Unstructured - // The current state as it exists in the release. - current *unstructured.Unstructured + // The state as it exists in the release. + original *unstructured.Unstructured // The actual state as it exists in the cluster. actual *unstructured.Unstructured @@ -1132,15 +1132,15 @@ func (c createPatchTestCase) run(t *testing.T) { }, } - patch, patchType, err := createPatch(targetInfo, c.current, c.threeWayMergeForUnstructured) + patch, patchType, err := createPatch(c.original, targetInfo, c.threeWayMergeForUnstructured) if err != nil { t.Fatalf("Failed to create patch: %v", err) } if c.expectedPatch != string(patch) { - t.Errorf("Unexpected patch.\nTarget:\n%s\nCurrent:\n%s\nActual:\n%s\n\nExpected:\n%s\nGot:\n%s", + t.Errorf("Unexpected patch.\nTarget:\n%s\nOriginal:\n%s\nActual:\n%s\n\nExpected:\n%s\nGot:\n%s", c.target, - c.current, + c.original, c.actual, c.expectedPatch, string(patch), @@ -1182,9 +1182,9 @@ func TestCreatePatchCustomResourceMetadata(t *testing.T) { "objectset.rio.cattle.io/id": "default-foo-simple", }, nil) testCase := createPatchTestCase{ - name: "take ownership of resource", - target: target, - current: target, + name: "take ownership of resource", + target: target, + original: target, actual: newTestCustomResourceData(nil, map[string]interface{}{ "color": "red", }), @@ -1206,9 +1206,9 @@ func TestCreatePatchCustomResourceSpec(t *testing.T) { "size": "large", }) testCase := createPatchTestCase{ - name: "merge with spec of existing custom resource", - target: target, - current: target, + name: "merge with spec of existing custom resource", + target: target, + original: target, actual: newTestCustomResourceData(nil, map[string]interface{}{ "color": "red", "weight": "heavy", @@ -1561,18 +1561,18 @@ func TestPatchResourceClientSide(t *testing.T) { Client: fake.CreateHTTPClient(client.Do), } - resourceListCurrent, err := buildResourceList(testFactory, v1.NamespaceDefault, FieldValidationDirectiveStrict, objBody(&tc.OriginalPods), nil) + resourceListOriginal, err := buildResourceList(testFactory, v1.NamespaceDefault, FieldValidationDirectiveStrict, objBody(&tc.OriginalPods), nil) require.NoError(t, err) - require.Len(t, resourceListCurrent, 1) + require.Len(t, resourceListOriginal, 1) resourceListTarget, err := buildResourceList(testFactory, v1.NamespaceDefault, FieldValidationDirectiveStrict, objBody(&tc.TargetPods), nil) require.NoError(t, err) require.Len(t, resourceListTarget, 1) - current := resourceListCurrent[0] + original := resourceListOriginal[0] target := resourceListTarget[0] - err = patchResourceClientSide(target, current.Object, tc.ThreeWayMergeForUnstructured) + err = patchResourceClientSide(original.Object, target, tc.ThreeWayMergeForUnstructured) if tc.ExpectedErrorContains != "" { require.ErrorContains(t, err, tc.ExpectedErrorContains) } else { From b2dc411f9d77f3bca969fb4ad955d4a091aa0454 Mon Sep 17 00:00:00 2001 From: George Jenkins Date: Tue, 12 Aug 2025 10:49:10 -0700 Subject: [PATCH 4/4] code review (error checks, collapse forceConflicts, UpdateApplyFunc) Signed-off-by: George Jenkins --- pkg/action/hooks.go | 2 +- pkg/action/install.go | 8 +++--- pkg/action/rollback.go | 2 +- pkg/action/upgrade.go | 2 +- pkg/kube/client.go | 64 +++++++++++++++++++++-------------------- pkg/kube/client_test.go | 16 +++++------ 6 files changed, 47 insertions(+), 47 deletions(-) diff --git a/pkg/action/hooks.go b/pkg/action/hooks.go index 95260e0e4..275a1bf52 100644 --- a/pkg/action/hooks.go +++ b/pkg/action/hooks.go @@ -75,7 +75,7 @@ func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent, // Create hook resources if _, err := cfg.KubeClient.Create( resources, - kube.ClientCreateOptionServerSideApply(false)); err != nil { + kube.ClientCreateOptionServerSideApply(false, false)); err != nil { h.LastRun.CompletedAt = helmtime.Now() h.LastRun.Phase = release.HookPhaseFailed return fmt.Errorf("warning: Hook %s %s failed: %w", hook, h.Path, err) diff --git a/pkg/action/install.go b/pkg/action/install.go index 9a9101f5d..b46b4446b 100644 --- a/pkg/action/install.go +++ b/pkg/action/install.go @@ -175,7 +175,7 @@ func (i *Install) installCRDs(crds []chart.CRD) error { // Send them to Kube if _, err := i.cfg.KubeClient.Create( res, - kube.ClientCreateOptionServerSideApply(false)); err != nil { + kube.ClientCreateOptionServerSideApply(false, false)); err != nil { // If the error is CRD already exists, continue. if apierrors.IsAlreadyExists(err) { crdName := res[0].Name @@ -403,7 +403,7 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma } if _, err := i.cfg.KubeClient.Create( resourceList, - kube.ClientCreateOptionServerSideApply(false)); err != nil && !apierrors.IsAlreadyExists(err) { + kube.ClientCreateOptionServerSideApply(false, false)); err != nil && !apierrors.IsAlreadyExists(err) { return nil, err } } @@ -474,13 +474,13 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource if len(toBeAdopted) == 0 && len(resources) > 0 { _, err = i.cfg.KubeClient.Create( resources, - kube.ClientCreateOptionServerSideApply(false)) + kube.ClientCreateOptionServerSideApply(false, false)) } else if len(resources) > 0 { updateThreeWayMergeForUnstructured := i.TakeOwnership _, err = i.cfg.KubeClient.Update( toBeAdopted, resources, - kube.ClientUpdateOptionServerSideApply(false), + kube.ClientUpdateOptionServerSideApply(false, false), kube.ClientUpdateOptionThreeWayMergeForUnstructured(updateThreeWayMergeForUnstructured), kube.ClientUpdateOptionForceReplace(i.ForceReplace)) } diff --git a/pkg/action/rollback.go b/pkg/action/rollback.go index f60d4f4bc..dd1f8c390 100644 --- a/pkg/action/rollback.go +++ b/pkg/action/rollback.go @@ -193,7 +193,7 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas results, err := r.cfg.KubeClient.Update( current, target, - kube.ClientUpdateOptionServerSideApply(false), + kube.ClientUpdateOptionServerSideApply(false, false), kube.ClientUpdateOptionForceReplace(r.ForceReplace)) if err != nil { diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index a32d6e78e..abf4342d3 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -429,7 +429,7 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele results, err := u.cfg.KubeClient.Update( current, target, - kube.ClientUpdateOptionServerSideApply(false), + kube.ClientUpdateOptionServerSideApply(false, false), kube.ClientUpdateOptionForceReplace(u.ForceReplace)) if err != nil { u.cfg.recordRelease(originalRelease) diff --git a/pkg/kube/client.go b/pkg/kube/client.go index b436f518f..016055392 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -214,26 +214,23 @@ type ClientCreateOption func(*clientCreateOptions) error // ClientUpdateOptionServerSideApply enables performing object apply server-side // see: https://kubernetes.io/docs/reference/using-api/server-side-apply/ -func ClientCreateOptionServerSideApply(serverSideApply bool) ClientCreateOption { - return func(o *clientCreateOptions) error { - o.serverSideApply = serverSideApply - - return nil - } -} - -// ClientCreateOptionForceConflicts forces field conflicts to be resolved +// +// `forceConflicts` forces conflicts to be resolved (may be when serverSideApply enabled only) // see: https://kubernetes.io/docs/reference/using-api/server-side-apply/#conflicts -// Only valid when ClientUpdateOptionServerSideApply enabled -func ClientCreateOptionForceConflicts(forceConflicts bool) ClientCreateOption { +func ClientCreateOptionServerSideApply(serverSideApply, forceConflicts bool) ClientCreateOption { return func(o *clientCreateOptions) error { + if !serverSideApply && forceConflicts { + return fmt.Errorf("forceConflicts enabled when serverSideApply disabled") + } + + o.serverSideApply = serverSideApply o.forceConflicts = forceConflicts return nil } } -// ClientCreateOptionDryRun performs non-mutating operations only +// ClientCreateOptionDryRun requests the server to perform non-mutating operations only func ClientCreateOptionDryRun(dryRun bool) ClientCreateOption { return func(o *clientCreateOptions) error { o.dryRun = dryRun @@ -264,8 +261,12 @@ func (c *Client) Create(resources ResourceList, options ...ClientCreateOption) ( fieldValidationDirective: FieldValidationDirectiveStrict, } + errs := make([]error, 0, len(options)) for _, o := range options { - o(&createOptions) + errs = append(errs, o(&createOptions)) + } + if err := errors.Join(errs...); err != nil { + return nil, fmt.Errorf("invalid client create option(s): %w", err) } if createOptions.forceConflicts && !createOptions.serverSideApply { @@ -499,7 +500,7 @@ func (c *Client) BuildTable(reader io.Reader, validate bool) (ResourceList, erro transformRequests) } -func (c *Client) update(originals, targets ResourceList, updateApplyFunc func(original, target *resource.Info) error) (*Result, error) { +func (c *Client) update(originals, targets ResourceList, updateApplyFunc UpdateApplyFunc) (*Result, error) { updateErrors := []error{} res := &Result{} @@ -599,9 +600,17 @@ func ClientUpdateOptionThreeWayMergeForUnstructured(threeWayMergeForUnstructured // ClientUpdateOptionServerSideApply enables performing object apply server-side (default) // see: https://kubernetes.io/docs/reference/using-api/server-side-apply/ // Must not be enabled when ClientUpdateOptionThreeWayMerge is enabled -func ClientUpdateOptionServerSideApply(serverSideApply bool) ClientUpdateOption { +// +// `forceConflicts` forces conflicts to be resolved (may be enabled when serverSideApply enabled only) +// see: https://kubernetes.io/docs/reference/using-api/server-side-apply/#conflicts +func ClientUpdateOptionServerSideApply(serverSideApply, forceConflicts bool) ClientUpdateOption { return func(o *clientUpdateOptions) error { + if !serverSideApply && forceConflicts { + return fmt.Errorf("forceConflicts enabled when serverSideApply disabled") + } + o.serverSideApply = serverSideApply + o.forceConflicts = forceConflicts return nil } @@ -617,20 +626,7 @@ func ClientUpdateOptionForceReplace(forceReplace bool) ClientUpdateOption { } } -// ClientUpdateOptionForceConflicts forces field conflicts to be resolved -// see: https://kubernetes.io/docs/reference/using-api/server-side-apply/#conflicts -// Must not be enabled when ClientUpdateOptionForceReplace is enabled -func ClientUpdateOptionForceConflicts(forceConflicts bool) ClientUpdateOption { - return func(o *clientUpdateOptions) error { - o.forceConflicts = forceConflicts - - return nil - } -} - -// ClientUpdateOptionForceConflicts forces field conflicts to be resolved -// see: https://kubernetes.io/docs/reference/using-api/server-side-apply/#conflicts -// Must not be enabled when ClientUpdateOptionForceReplace is enabled +// ClientUpdateOptionDryRun requests the server to perform non-mutating operations only func ClientUpdateOptionDryRun(dryRun bool) ClientUpdateOption { return func(o *clientUpdateOptions) error { o.dryRun = dryRun @@ -652,6 +648,8 @@ func ClientUpdateOptionFieldValidationDirective(fieldValidationDirective FieldVa } } +type UpdateApplyFunc func(original, target *resource.Info) error + // Update takes the current list of objects and target list of objects and // creates resources that don't already exist, updates resources that have been // modified in the target configuration, and deletes resources from the current @@ -667,8 +665,12 @@ func (c *Client) Update(originals, targets ResourceList, options ...ClientUpdate fieldValidationDirective: FieldValidationDirectiveStrict, } + errs := make([]error, 0, len(options)) for _, o := range options { - o(&updateOptions) + errs = append(errs, o(&updateOptions)) + } + if err := errors.Join(errs...); err != nil { + return nil, fmt.Errorf("invalid client update option(s): %w", err) } if updateOptions.threeWayMergeForUnstructured && updateOptions.serverSideApply { @@ -683,7 +685,7 @@ func (c *Client) Update(originals, targets ResourceList, options ...ClientUpdate return nil, fmt.Errorf("invalid operation: cannot use server-side apply and force replace together") } - makeUpdateApplyFunc := func() func(original, target *resource.Info) error { + makeUpdateApplyFunc := func() UpdateApplyFunc { if updateOptions.forceReplace { slog.Debug( "using resource replace update strategy", diff --git a/pkg/kube/client_test.go b/pkg/kube/client_test.go index bdc5a9d7f..5060a5fc2 100644 --- a/pkg/kube/client_test.go +++ b/pkg/kube/client_test.go @@ -292,7 +292,7 @@ func TestCreate(t *testing.T) { result, err := c.Create( list, - ClientCreateOptionServerSideApply(tc.ServerSideApply)) + ClientCreateOptionServerSideApply(tc.ServerSideApply, false)) if tc.ExpectedErrorContains != "" { require.ErrorContains(t, err, tc.ExpectedErrorContains) } else { @@ -467,7 +467,7 @@ func TestUpdate(t *testing.T) { second, ClientUpdateOptionThreeWayMergeForUnstructured(tc.ThreeWayMergeForUnstructured), ClientUpdateOptionForceReplace(false), - ClientUpdateOptionServerSideApply(tc.ServerSideApply)) + ClientUpdateOptionServerSideApply(tc.ServerSideApply, false)) require.NoError(t, err) assert.Len(t, result.Created, 1, "expected 1 resource created, got %d", len(result.Created)) @@ -684,7 +684,7 @@ func TestWait(t *testing.T) { result, err := c.Create( resources, - ClientCreateOptionServerSideApply(false)) + ClientCreateOptionServerSideApply(false, false)) if err != nil { t.Fatal(err) @@ -744,7 +744,7 @@ func TestWaitJob(t *testing.T) { } result, err := c.Create( resources, - ClientCreateOptionServerSideApply(false)) + ClientCreateOptionServerSideApply(false, false)) if err != nil { t.Fatal(err) @@ -806,7 +806,7 @@ func TestWaitDelete(t *testing.T) { } result, err := c.Create( resources, - ClientCreateOptionServerSideApply(false)) + ClientCreateOptionServerSideApply(false, false)) if err != nil { t.Fatal(err) } @@ -1225,7 +1225,6 @@ func TestCreatePatchCustomResourceSpec(t *testing.T) { t.Run(testCase.name, testCase.run) } -<<<<<<< HEAD type errorFactory struct { *cmdtesting.TestFactory err error @@ -1326,8 +1325,8 @@ func TestIsReachable(t *testing.T) { } }) } -||||||| parent of 36a476ff4 (Kube client support server-side apply) -======= +} + func TestIsIncompatibleServerError(t *testing.T) { testCases := map[string]struct { Err error @@ -1749,5 +1748,4 @@ func TestDetermineFieldValidationDirective(t *testing.T) { assert.Equal(t, FieldValidationDirectiveIgnore, determineFieldValidationDirective(false)) assert.Equal(t, FieldValidationDirectiveStrict, determineFieldValidationDirective(true)) ->>>>>>> 36a476ff4 (Kube client support server-side apply) }