Merge pull request #31030 from gjenkins8/gjenkins/kubeclient_ssa

HIP-0023: Kube client support server-side apply
pull/31140/head
George Jenkins 3 weeks ago committed by GitHub
commit 36e52c828d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -73,7 +73,9 @@ func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent,
h.LastRun.Phase = release.HookPhaseUnknown h.LastRun.Phase = release.HookPhaseUnknown
// Create hook resources // Create hook resources
if _, err := cfg.KubeClient.Create(resources); err != nil { if _, err := cfg.KubeClient.Create(
resources,
kube.ClientCreateOptionServerSideApply(false, false)); err != nil {
h.LastRun.CompletedAt = helmtime.Now() h.LastRun.CompletedAt = helmtime.Now()
h.LastRun.Phase = release.HookPhaseFailed h.LastRun.Phase = release.HookPhaseFailed
return fmt.Errorf("warning: Hook %s %s failed: %w", hook, h.Path, err) return fmt.Errorf("warning: Hook %s %s failed: %w", hook, h.Path, err)

@ -173,7 +173,9 @@ func (i *Install) installCRDs(crds []chart.CRD) error {
} }
// Send them to Kube // Send them to Kube
if _, err := i.cfg.KubeClient.Create(res); err != nil { if _, err := i.cfg.KubeClient.Create(
res,
kube.ClientCreateOptionServerSideApply(false, false)); err != nil {
// If the error is CRD already exists, continue. // If the error is CRD already exists, continue.
if apierrors.IsAlreadyExists(err) { if apierrors.IsAlreadyExists(err) {
crdName := res[0].Name crdName := res[0].Name
@ -399,7 +401,9 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma
if err != nil { if err != nil {
return nil, err return nil, err
} }
if _, err := i.cfg.KubeClient.Create(resourceList); err != nil && !apierrors.IsAlreadyExists(err) { if _, err := i.cfg.KubeClient.Create(
resourceList,
kube.ClientCreateOptionServerSideApply(false, false)); err != nil && !apierrors.IsAlreadyExists(err) {
return nil, err return nil, err
} }
} }
@ -468,13 +472,17 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource
// do an update, but it's not clear whether we WANT to do an update if the reuse is set // do an update, but it's not clear whether we WANT to do an update if the reuse is set
// to true, since that is basically an upgrade operation. // to true, since that is basically an upgrade operation.
if len(toBeAdopted) == 0 && len(resources) > 0 { if len(toBeAdopted) == 0 && len(resources) > 0 {
_, err = i.cfg.KubeClient.Create(resources) _, err = i.cfg.KubeClient.Create(
resources,
kube.ClientCreateOptionServerSideApply(false, false))
} else if len(resources) > 0 { } else if len(resources) > 0 {
if i.TakeOwnership { updateThreeWayMergeForUnstructured := i.TakeOwnership
_, err = i.cfg.KubeClient.(kube.InterfaceThreeWayMerge).UpdateThreeWayMerge(toBeAdopted, resources, i.ForceReplace) _, err = i.cfg.KubeClient.Update(
} else { toBeAdopted,
_, err = i.cfg.KubeClient.Update(toBeAdopted, resources, i.ForceReplace) resources,
} kube.ClientUpdateOptionServerSideApply(false, false),
kube.ClientUpdateOptionThreeWayMergeForUnstructured(updateThreeWayMergeForUnstructured),
kube.ClientUpdateOptionForceReplace(i.ForceReplace))
} }
if err != nil { if err != nil {
return rel, err return rel, err

@ -190,7 +190,11 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
if err != nil { if err != nil {
return targetRelease, fmt.Errorf("unable to set metadata visitor from target release: %w", err) return targetRelease, fmt.Errorf("unable to set metadata visitor from target release: %w", err)
} }
results, err := r.cfg.KubeClient.Update(current, target, r.ForceReplace) results, err := r.cfg.KubeClient.Update(
current,
target,
kube.ClientUpdateOptionServerSideApply(false, false),
kube.ClientUpdateOptionForceReplace(r.ForceReplace))
if err != nil { if err != nil {
msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err) msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err)

@ -426,7 +426,11 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele
slog.Debug("upgrade hooks disabled", "name", upgradedRelease.Name) slog.Debug("upgrade hooks disabled", "name", upgradedRelease.Name)
} }
results, err := u.cfg.KubeClient.Update(current, target, u.ForceReplace) results, err := u.cfg.KubeClient.Update(
current,
target,
kube.ClientUpdateOptionServerSideApply(false, false),
kube.ClientUpdateOptionForceReplace(u.ForceReplace))
if err != nil { if err != nil {
u.cfg.recordRelease(originalRelease) u.cfg.recordRelease(originalRelease)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)

@ -24,6 +24,7 @@ import (
"fmt" "fmt"
"io" "io"
"log/slog" "log/slog"
"net/http"
"os" "os"
"path/filepath" "path/filepath"
"reflect" "reflect"
@ -91,6 +92,14 @@ const (
HookOnlyStrategy WaitStrategy = "hookOnly" HookOnlyStrategy WaitStrategy = "hookOnly"
) )
type FieldValidationDirective string
const (
FieldValidationDirectiveIgnore FieldValidationDirective = "Ignore"
FieldValidationDirectiveWarn FieldValidationDirective = "Warn"
FieldValidationDirectiveStrict FieldValidationDirective = "Strict"
)
func init() { func init() {
// Add CRDs to the scheme. They are missing by default. // Add CRDs to the scheme. They are missing by default.
if err := apiextv1.AddToScheme(scheme.Scheme); err != nil { if err := apiextv1.AddToScheme(scheme.Scheme); err != nil {
@ -194,10 +203,102 @@ func (c *Client) IsReachable() error {
return nil return nil
} }
type clientCreateOptions struct {
serverSideApply bool
forceConflicts bool
dryRun bool
fieldValidationDirective FieldValidationDirective
}
type ClientCreateOption func(*clientCreateOptions) error
// ClientUpdateOptionServerSideApply enables performing object apply server-side
// see: https://kubernetes.io/docs/reference/using-api/server-side-apply/
//
// `forceConflicts` forces conflicts to be resolved (may be when serverSideApply enabled only)
// see: https://kubernetes.io/docs/reference/using-api/server-side-apply/#conflicts
func ClientCreateOptionServerSideApply(serverSideApply, forceConflicts bool) ClientCreateOption {
return func(o *clientCreateOptions) error {
if !serverSideApply && forceConflicts {
return fmt.Errorf("forceConflicts enabled when serverSideApply disabled")
}
o.serverSideApply = serverSideApply
o.forceConflicts = forceConflicts
return nil
}
}
// ClientCreateOptionDryRun requests the server to perform non-mutating operations only
func ClientCreateOptionDryRun(dryRun bool) ClientCreateOption {
return func(o *clientCreateOptions) error {
o.dryRun = dryRun
return nil
}
}
// ClientCreateOptionFieldValidationDirective specifies show API operations validate object's schema
// - For client-side apply: this is ignored
// - For server-side apply: the directive is sent to the server to perform the validation
//
// Defaults to `FieldValidationDirectiveStrict`
func ClientCreateOptionFieldValidationDirective(fieldValidationDirective FieldValidationDirective) ClientCreateOption {
return func(o *clientCreateOptions) error {
o.fieldValidationDirective = fieldValidationDirective
return nil
}
}
// Create creates Kubernetes resources specified in the resource list. // Create creates Kubernetes resources specified in the resource list.
func (c *Client) Create(resources ResourceList) (*Result, error) { func (c *Client) Create(resources ResourceList, options ...ClientCreateOption) (*Result, error) {
slog.Debug("creating resource(s)", "resources", len(resources)) slog.Debug("creating resource(s)", "resources", len(resources))
if err := perform(resources, createResource); err != nil {
createOptions := clientCreateOptions{
serverSideApply: true, // Default to server-side apply
fieldValidationDirective: FieldValidationDirectiveStrict,
}
errs := make([]error, 0, len(options))
for _, o := range options {
errs = append(errs, o(&createOptions))
}
if err := errors.Join(errs...); err != nil {
return nil, fmt.Errorf("invalid client create option(s): %w", err)
}
if createOptions.forceConflicts && !createOptions.serverSideApply {
return nil, fmt.Errorf("invalid operation: force conflicts can only be used with server-side apply")
}
makeCreateApplyFunc := func() func(target *resource.Info) error {
if createOptions.serverSideApply {
slog.Debug("using server-side apply for resource creation", slog.Bool("forceConflicts", createOptions.forceConflicts), slog.Bool("dryRun", createOptions.dryRun), slog.String("fieldValidationDirective", string(createOptions.fieldValidationDirective)))
return func(target *resource.Info) error {
err := patchResourceServerSide(target, createOptions.dryRun, createOptions.forceConflicts, createOptions.fieldValidationDirective)
logger := slog.With(
slog.String("namespace", target.Namespace),
slog.String("name", target.Name),
slog.String("gvk", target.Mapping.GroupVersionKind.String()))
if err != nil {
logger.Debug("Error patching resource", slog.Any("error", err))
return err
}
logger.Debug("Patched resource")
return nil
}
}
slog.Debug("using client-side apply for resource creation")
return createResource
}
if err := perform(resources, makeCreateApplyFunc()); err != nil {
return nil, err return nil, err
} }
return &Result{Created: resources}, nil return &Result{Created: resources}, nil
@ -348,96 +449,98 @@ func (c *Client) namespace() string {
return v1.NamespaceDefault return v1.NamespaceDefault
} }
// newBuilder returns a new resource builder for structured api objects. func determineFieldValidationDirective(validate bool) FieldValidationDirective {
func (c *Client) newBuilder() *resource.Builder {
return c.Factory.NewBuilder().
ContinueOnError().
NamespaceParam(c.namespace()).
DefaultNamespace().
Flatten()
}
// Build validates for Kubernetes objects and returns unstructured infos.
func (c *Client) Build(reader io.Reader, validate bool) (ResourceList, error) {
validationDirective := metav1.FieldValidationIgnore
if validate { if validate {
validationDirective = metav1.FieldValidationStrict return FieldValidationDirectiveStrict
} }
schema, err := c.Factory.Validator(validationDirective) return FieldValidationDirectiveIgnore
}
func buildResourceList(f Factory, namespace string, validationDirective FieldValidationDirective, reader io.Reader, transformRequest resource.RequestTransform) (ResourceList, error) {
schema, err := f.Validator(string(validationDirective))
if err != nil { if err != nil {
return nil, err return nil, err
} }
result, err := c.newBuilder().
builder := f.NewBuilder().
ContinueOnError().
NamespaceParam(namespace).
DefaultNamespace().
Flatten().
Unstructured(). Unstructured().
Schema(schema). Schema(schema).
Stream(reader, ""). Stream(reader, "")
Do().Infos() if transformRequest != nil {
builder.TransformRequests(transformRequest)
}
result, err := builder.Do().Infos()
return result, scrubValidationError(err) return result, scrubValidationError(err)
} }
// Build validates for Kubernetes objects and returns unstructured infos.
func (c *Client) Build(reader io.Reader, validate bool) (ResourceList, error) {
return buildResourceList(
c.Factory,
c.namespace(),
determineFieldValidationDirective(validate),
reader,
nil)
}
// BuildTable validates for Kubernetes objects and returns unstructured infos. // BuildTable validates for Kubernetes objects and returns unstructured infos.
// The returned kind is a Table. // The returned kind is a Table.
func (c *Client) BuildTable(reader io.Reader, validate bool) (ResourceList, error) { func (c *Client) BuildTable(reader io.Reader, validate bool) (ResourceList, error) {
validationDirective := metav1.FieldValidationIgnore return buildResourceList(
if validate { c.Factory,
validationDirective = metav1.FieldValidationStrict c.namespace(),
} determineFieldValidationDirective(validate),
reader,
schema, err := c.Factory.Validator(validationDirective) transformRequests)
if err != nil {
return nil, err
}
result, err := c.newBuilder().
Unstructured().
Schema(schema).
Stream(reader, "").
TransformRequests(transformRequests).
Do().Infos()
return result, scrubValidationError(err)
} }
func (c *Client) update(original, target ResourceList, force, threeWayMerge bool) (*Result, error) { func (c *Client) update(originals, targets ResourceList, updateApplyFunc UpdateApplyFunc) (*Result, error) {
updateErrors := []error{} updateErrors := []error{}
res := &Result{} res := &Result{}
slog.Debug("checking resources for changes", "resources", len(target)) slog.Debug("checking resources for changes", "resources", len(targets))
err := target.Visit(func(info *resource.Info, err error) error { err := targets.Visit(func(target *resource.Info, err error) error {
if err != nil { if err != nil {
return err return err
} }
helper := resource.NewHelper(info.Client, info.Mapping).WithFieldManager(getManagedFieldsManager()) helper := resource.NewHelper(target.Client, target.Mapping).WithFieldManager(getManagedFieldsManager())
if _, err := helper.Get(info.Namespace, info.Name); err != nil { if _, err := helper.Get(target.Namespace, target.Name); err != nil {
if !apierrors.IsNotFound(err) { if !apierrors.IsNotFound(err) {
return fmt.Errorf("could not get information about the resource: %w", err) return fmt.Errorf("could not get information about the resource: %w", err)
} }
// Append the created resource to the results, even if something fails // Append the created resource to the results, even if something fails
res.Created = append(res.Created, info) res.Created = append(res.Created, target)
// Since the resource does not exist, create it. // Since the resource does not exist, create it.
if err := createResource(info); err != nil { if err := createResource(target); err != nil {
return fmt.Errorf("failed to create resource: %w", err) return fmt.Errorf("failed to create resource: %w", err)
} }
kind := info.Mapping.GroupVersionKind.Kind kind := target.Mapping.GroupVersionKind.Kind
slog.Debug("created a new resource", "namespace", info.Namespace, "name", info.Name, "kind", kind) slog.Debug("created a new resource", "namespace", target.Namespace, "name", target.Name, "kind", kind)
return nil return nil
} }
originalInfo := original.Get(info) original := originals.Get(target)
if originalInfo == nil { if original == nil {
kind := info.Mapping.GroupVersionKind.Kind kind := target.Mapping.GroupVersionKind.Kind
return fmt.Errorf("no %s with the name %q found", kind, info.Name) return fmt.Errorf("original object %s with the name %q not found", kind, target.Name)
} }
if err := updateResource(c, info, originalInfo.Object, force, threeWayMerge); err != nil { if err := updateApplyFunc(original, target); err != nil {
slog.Debug("error updating the resource", "namespace", info.Namespace, "name", info.Name, "kind", info.Mapping.GroupVersionKind.Kind, slog.Any("error", err))
updateErrors = append(updateErrors, err) updateErrors = append(updateErrors, err)
} }
// Because we check for errors later, append the info regardless // Because we check for errors later, append the info regardless
res.Updated = append(res.Updated, info) res.Updated = append(res.Updated, target)
return nil return nil
}) })
@ -449,7 +552,7 @@ func (c *Client) update(original, target ResourceList, force, threeWayMerge bool
return res, joinErrors(updateErrors, " && ") return res, joinErrors(updateErrors, " && ")
} }
for _, info := range original.Difference(target) { for _, info := range originals.Difference(targets) {
slog.Debug("deleting resource", "namespace", info.Namespace, "name", info.Name, "kind", info.Mapping.GroupVersionKind.Kind) slog.Debug("deleting resource", "namespace", info.Namespace, "name", info.Name, "kind", info.Mapping.GroupVersionKind.Kind)
if err := info.Get(); err != nil { if err := info.Get(); err != nil {
@ -473,20 +576,80 @@ func (c *Client) update(original, target ResourceList, force, threeWayMerge bool
return res, nil return res, nil
} }
// Update takes the current list of objects and target list of objects and type clientUpdateOptions struct {
// creates resources that don't already exist, updates resources that have been threeWayMergeForUnstructured bool
// modified in the target configuration, and deletes resources from the current serverSideApply bool
// configuration that are not present in the target configuration. If an error forceReplace bool
// occurs, a Result will still be returned with the error, containing all forceConflicts bool
// resource updates, creations, and deletions that were attempted. These can be dryRun bool
// used for cleanup or other logging purposes. fieldValidationDirective FieldValidationDirective
}
type ClientUpdateOption func(*clientUpdateOptions) error
// ClientUpdateOptionThreeWayMergeForUnstructured enables performing three-way merge for unstructured objects
// Must not be enabled when ClientUpdateOptionServerSideApply is enabled
func ClientUpdateOptionThreeWayMergeForUnstructured(threeWayMergeForUnstructured bool) ClientUpdateOption {
return func(o *clientUpdateOptions) error {
o.threeWayMergeForUnstructured = threeWayMergeForUnstructured
return nil
}
}
// ClientUpdateOptionServerSideApply enables performing object apply server-side (default)
// see: https://kubernetes.io/docs/reference/using-api/server-side-apply/
// Must not be enabled when ClientUpdateOptionThreeWayMerge is enabled
// //
// The difference to Update is that UpdateThreeWayMerge does a three-way-merge // `forceConflicts` forces conflicts to be resolved (may be enabled when serverSideApply enabled only)
// for unstructured objects. // see: https://kubernetes.io/docs/reference/using-api/server-side-apply/#conflicts
func (c *Client) UpdateThreeWayMerge(original, target ResourceList, force bool) (*Result, error) { func ClientUpdateOptionServerSideApply(serverSideApply, forceConflicts bool) ClientUpdateOption {
return c.update(original, target, force, true) return func(o *clientUpdateOptions) error {
if !serverSideApply && forceConflicts {
return fmt.Errorf("forceConflicts enabled when serverSideApply disabled")
}
o.serverSideApply = serverSideApply
o.forceConflicts = forceConflicts
return nil
}
}
// ClientUpdateOptionForceReplace forces objects to be replaced rather than updated via patch
// Must not be enabled when ClientUpdateOptionForceConflicts is enabled
func ClientUpdateOptionForceReplace(forceReplace bool) ClientUpdateOption {
return func(o *clientUpdateOptions) error {
o.forceReplace = forceReplace
return nil
}
}
// ClientUpdateOptionDryRun requests the server to perform non-mutating operations only
func ClientUpdateOptionDryRun(dryRun bool) ClientUpdateOption {
return func(o *clientUpdateOptions) error {
o.dryRun = dryRun
return nil
}
}
// ClientUpdateOptionFieldValidationDirective specifies show API operations validate object's schema
// - For client-side apply: this is ignored
// - For server-side apply: the directive is sent to the server to perform the validation
//
// Defaults to `FieldValidationDirectiveStrict`
func ClientUpdateOptionFieldValidationDirective(fieldValidationDirective FieldValidationDirective) ClientCreateOption {
return func(o *clientCreateOptions) error {
o.fieldValidationDirective = fieldValidationDirective
return nil
}
} }
type UpdateApplyFunc func(original, target *resource.Info) error
// Update takes the current list of objects and target list of objects and // Update takes the current list of objects and target list of objects and
// creates resources that don't already exist, updates resources that have been // creates resources that don't already exist, updates resources that have been
// modified in the target configuration, and deletes resources from the current // modified in the target configuration, and deletes resources from the current
@ -494,8 +657,82 @@ func (c *Client) UpdateThreeWayMerge(original, target ResourceList, force bool)
// occurs, a Result will still be returned with the error, containing all // occurs, a Result will still be returned with the error, containing all
// resource updates, creations, and deletions that were attempted. These can be // resource updates, creations, and deletions that were attempted. These can be
// used for cleanup or other logging purposes. // used for cleanup or other logging purposes.
func (c *Client) Update(original, target ResourceList, force bool) (*Result, error) { //
return c.update(original, target, force, false) // The default is to use server-side apply, equivalent to: `ClientUpdateOptionServerSideApply(true)`
func (c *Client) Update(originals, targets ResourceList, options ...ClientUpdateOption) (*Result, error) {
updateOptions := clientUpdateOptions{
serverSideApply: true, // Default to server-side apply
fieldValidationDirective: FieldValidationDirectiveStrict,
}
errs := make([]error, 0, len(options))
for _, o := range options {
errs = append(errs, o(&updateOptions))
}
if err := errors.Join(errs...); err != nil {
return nil, fmt.Errorf("invalid client update option(s): %w", err)
}
if updateOptions.threeWayMergeForUnstructured && updateOptions.serverSideApply {
return nil, fmt.Errorf("invalid operation: cannot use three-way merge for unstructured and server-side apply together")
}
if updateOptions.forceConflicts && updateOptions.forceReplace {
return nil, fmt.Errorf("invalid operation: cannot use force conflicts and force replace together")
}
if updateOptions.serverSideApply && updateOptions.forceReplace {
return nil, fmt.Errorf("invalid operation: cannot use server-side apply and force replace together")
}
makeUpdateApplyFunc := func() UpdateApplyFunc {
if updateOptions.forceReplace {
slog.Debug(
"using resource replace update strategy",
slog.String("fieldValidationDirective", string(updateOptions.fieldValidationDirective)))
return func(original, target *resource.Info) error {
if err := replaceResource(target, updateOptions.fieldValidationDirective); err != nil {
slog.Debug("error replacing the resource", "namespace", target.Namespace, "name", target.Name, "kind", target.Mapping.GroupVersionKind.Kind, slog.Any("error", err))
return err
}
originalObject := original.Object
kind := target.Mapping.GroupVersionKind.Kind
slog.Debug("replace succeeded", "name", original.Name, "initialKind", originalObject.GetObjectKind().GroupVersionKind().Kind, "kind", kind)
return nil
}
} else if updateOptions.serverSideApply {
slog.Debug(
"using server-side apply for resource update",
slog.Bool("forceConflicts", updateOptions.forceConflicts),
slog.Bool("dryRun", updateOptions.dryRun),
slog.String("fieldValidationDirective", string(updateOptions.fieldValidationDirective)))
return func(_, target *resource.Info) error {
err := patchResourceServerSide(target, updateOptions.dryRun, updateOptions.forceConflicts, updateOptions.fieldValidationDirective)
logger := slog.With(
slog.String("namespace", target.Namespace),
slog.String("name", target.Name),
slog.String("gvk", target.Mapping.GroupVersionKind.String()))
if err != nil {
logger.Debug("Error patching resource", slog.Any("error", err))
return err
}
logger.Debug("Patched resource")
return nil
}
}
slog.Debug("using client-side apply for resource update", slog.Bool("threeWayMergeForUnstructured", updateOptions.threeWayMergeForUnstructured))
return func(original, target *resource.Info) error {
return patchResourceClientSide(original.Object, target, updateOptions.threeWayMergeForUnstructured)
}
}
return c.update(originals, targets, makeUpdateApplyFunc())
} }
// Delete deletes Kubernetes resources specified in the resources list with // Delete deletes Kubernetes resources specified in the resources list with
@ -503,7 +740,7 @@ func (c *Client) Update(original, target ResourceList, force bool) (*Result, err
// if one or more fail and collect any errors. All successfully deleted items // if one or more fail and collect any errors. All successfully deleted items
// will be returned in the `Deleted` ResourceList that is part of the result. // will be returned in the `Deleted` ResourceList that is part of the result.
func (c *Client) Delete(resources ResourceList) (*Result, []error) { func (c *Client) Delete(resources ResourceList) (*Result, []error) {
return rdelete(c, resources, metav1.DeletePropagationBackground) return deleteResources(resources, metav1.DeletePropagationBackground)
} }
// Delete deletes Kubernetes resources specified in the resources list with // Delete deletes Kubernetes resources specified in the resources list with
@ -511,23 +748,23 @@ func (c *Client) Delete(resources ResourceList) (*Result, []error) {
// if one or more fail and collect any errors. All successfully deleted items // if one or more fail and collect any errors. All successfully deleted items
// will be returned in the `Deleted` ResourceList that is part of the result. // will be returned in the `Deleted` ResourceList that is part of the result.
func (c *Client) DeleteWithPropagationPolicy(resources ResourceList, policy metav1.DeletionPropagation) (*Result, []error) { func (c *Client) DeleteWithPropagationPolicy(resources ResourceList, policy metav1.DeletionPropagation) (*Result, []error) {
return rdelete(c, resources, policy) return deleteResources(resources, policy)
} }
func rdelete(_ *Client, resources ResourceList, propagation metav1.DeletionPropagation) (*Result, []error) { func deleteResources(resources ResourceList, propagation metav1.DeletionPropagation) (*Result, []error) {
var errs []error var errs []error
res := &Result{} res := &Result{}
mtx := sync.Mutex{} mtx := sync.Mutex{}
err := perform(resources, func(info *resource.Info) error { err := perform(resources, func(target *resource.Info) error {
slog.Debug("starting delete resource", "namespace", info.Namespace, "name", info.Name, "kind", info.Mapping.GroupVersionKind.Kind) slog.Debug("starting delete resource", "namespace", target.Namespace, "name", target.Name, "kind", target.Mapping.GroupVersionKind.Kind)
err := deleteResource(info, propagation) err := deleteResource(target, propagation)
if err == nil || apierrors.IsNotFound(err) { if err == nil || apierrors.IsNotFound(err) {
if err != nil { if err != nil {
slog.Debug("ignoring delete failure", "namespace", info.Namespace, "name", info.Name, "kind", info.Mapping.GroupVersionKind.Kind, slog.Any("error", err)) slog.Debug("ignoring delete failure", "namespace", target.Namespace, "name", target.Name, "kind", target.Mapping.GroupVersionKind.Kind, slog.Any("error", err))
} }
mtx.Lock() mtx.Lock()
defer mtx.Unlock() defer mtx.Unlock()
res.Deleted = append(res.Deleted, info) res.Deleted = append(res.Deleted, target)
return nil return nil
} }
mtx.Lock() mtx.Lock()
@ -548,6 +785,17 @@ func rdelete(_ *Client, resources ResourceList, propagation metav1.DeletionPropa
return res, nil return res, nil
} }
// https://github.com/kubernetes/kubectl/blob/197123726db24c61aa0f78d1f0ba6e91a2ec2f35/pkg/cmd/apply/apply.go#L439
func isIncompatibleServerError(err error) bool {
// 415: Unsupported media type means we're talking to a server which doesn't
// support server-side apply.
if _, ok := err.(*apierrors.StatusError); !ok {
// Non-StatusError means the error isn't because the server is incompatible.
return false
}
return err.(*apierrors.StatusError).Status().Code == http.StatusUnsupportedMediaType
}
// getManagedFieldsManager returns the manager string. If one was set it will be returned. // getManagedFieldsManager returns the manager string. If one was set it will be returned.
// Otherwise, one is calculated based on the name of the binary. // Otherwise, one is calculated based on the name of the binary.
func getManagedFieldsManager() string { func getManagedFieldsManager() string {
@ -568,18 +816,41 @@ func getManagedFieldsManager() string {
return filepath.Base(os.Args[0]) return filepath.Base(os.Args[0])
} }
func perform(infos ResourceList, fn func(*resource.Info) error) error {
var result error
if len(infos) == 0 {
return ErrNoObjectsVisited
}
errs := make(chan error)
go batchPerform(infos, fn, errs)
for range infos {
err := <-errs
if err != nil {
result = errors.Join(result, err)
}
}
return result
}
func batchPerform(infos ResourceList, fn func(*resource.Info) error, errs chan<- error) { func batchPerform(infos ResourceList, fn func(*resource.Info) error, errs chan<- error) {
var kind string var kind string
var wg sync.WaitGroup var wg sync.WaitGroup
defer wg.Wait()
for _, info := range infos { for _, info := range infos {
currentKind := info.Object.GetObjectKind().GroupVersionKind().Kind currentKind := info.Object.GetObjectKind().GroupVersionKind().Kind
if kind != currentKind { if kind != currentKind {
wg.Wait() wg.Wait()
kind = currentKind kind = currentKind
} }
wg.Add(1) wg.Add(1)
go func(i *resource.Info) { go func(info *resource.Info) {
errs <- fn(i) errs <- fn(info)
wg.Done() wg.Done()
}(info) }(info)
} }
@ -597,6 +868,7 @@ func createResource(info *resource.Info) error {
if err != nil { if err != nil {
return err return err
} }
return info.Refresh(obj, true) return info.Refresh(obj, true)
}) })
} }
@ -611,8 +883,8 @@ func deleteResource(info *resource.Info, policy metav1.DeletionPropagation) erro
}) })
} }
func createPatch(target *resource.Info, current runtime.Object, threeWayMergeForUnstructured bool) ([]byte, types.PatchType, error) { func createPatch(original runtime.Object, target *resource.Info, threeWayMergeForUnstructured bool) ([]byte, types.PatchType, error) {
oldData, err := json.Marshal(current) oldData, err := json.Marshal(original)
if err != nil { if err != nil {
return nil, types.StrategicMergePatchType, fmt.Errorf("serializing current configuration: %w", err) return nil, types.StrategicMergePatchType, fmt.Errorf("serializing current configuration: %w", err)
} }
@ -674,27 +946,33 @@ func createPatch(target *resource.Info, current runtime.Object, threeWayMergeFor
return patch, types.StrategicMergePatchType, err return patch, types.StrategicMergePatchType, err
} }
func updateResource(_ *Client, target *resource.Info, currentObj runtime.Object, force, threeWayMergeForUnstructured bool) error { func replaceResource(target *resource.Info, fieldValidationDirective FieldValidationDirective) error {
var (
obj runtime.Object
helper = resource.NewHelper(target.Client, target.Mapping).WithFieldManager(getManagedFieldsManager())
kind = target.Mapping.GroupVersionKind.Kind
)
// if --force is applied, attempt to replace the existing resource with the new object. helper := resource.NewHelper(target.Client, target.Mapping).
if force { WithFieldValidation(string(fieldValidationDirective)).
var err error WithFieldManager(getManagedFieldsManager())
obj, err = helper.Replace(target.Namespace, target.Name, true, target.Object)
obj, err := helper.Replace(target.Namespace, target.Name, true, target.Object)
if err != nil { if err != nil {
return fmt.Errorf("failed to replace object: %w", err) return fmt.Errorf("failed to replace object: %w", err)
} }
slog.Debug("replace succeeded", "name", target.Name, "initialKind", currentObj.GetObjectKind().GroupVersionKind().Kind, "kind", kind)
} else { if err := target.Refresh(obj, true); err != nil {
patch, patchType, err := createPatch(target, currentObj, threeWayMergeForUnstructured) return fmt.Errorf("failed to refresh object after replace: %w", err)
}
return nil
}
func patchResourceClientSide(original runtime.Object, target *resource.Info, threeWayMergeForUnstructured bool) error {
patch, patchType, err := createPatch(original, target, threeWayMergeForUnstructured)
if err != nil { if err != nil {
return fmt.Errorf("failed to create patch: %w", err) return fmt.Errorf("failed to create patch: %w", err)
} }
kind := target.Mapping.GroupVersionKind.Kind
if patch == nil || string(patch) == "{}" { if patch == nil || string(patch) == "{}" {
slog.Debug("no changes detected", "kind", kind, "name", target.Name) slog.Debug("no changes detected", "kind", kind, "name", target.Name)
// This needs to happen to make sure that Helm has the latest info from the API // This needs to happen to make sure that Helm has the latest info from the API
@ -704,18 +982,59 @@ func updateResource(_ *Client, target *resource.Info, currentObj runtime.Object,
} }
return nil return nil
} }
// send patch to server // send patch to server
slog.Debug("patching resource", "kind", kind, "name", target.Name, "namespace", target.Namespace) slog.Debug("patching resource", "kind", kind, "name", target.Name, "namespace", target.Namespace)
obj, err = helper.Patch(target.Namespace, target.Name, patchType, patch, nil) helper := resource.NewHelper(target.Client, target.Mapping).WithFieldManager(getManagedFieldsManager())
obj, err := helper.Patch(target.Namespace, target.Name, patchType, patch, nil)
if err != nil { if err != nil {
return fmt.Errorf("cannot patch %q with kind %s: %w", target.Name, kind, err) return fmt.Errorf("cannot patch %q with kind %s: %w", target.Name, kind, err)
} }
}
target.Refresh(obj, true) target.Refresh(obj, true)
return nil return nil
} }
// Patch reource using server-side apply
func patchResourceServerSide(target *resource.Info, dryRun bool, forceConflicts bool, fieldValidationDirective FieldValidationDirective) error {
helper := resource.NewHelper(
target.Client,
target.Mapping).
DryRun(dryRun).
WithFieldManager(ManagedFieldsManager).
WithFieldValidation(string(fieldValidationDirective))
// Send the full object to be applied on the server side.
data, err := runtime.Encode(unstructured.UnstructuredJSONScheme, target.Object)
if err != nil {
return fmt.Errorf("failed to encode object %s/%s with kind %s: %w", target.Namespace, target.Name, target.Mapping.GroupVersionKind.Kind, err)
}
options := metav1.PatchOptions{
Force: &forceConflicts,
}
obj, err := helper.Patch(
target.Namespace,
target.Name,
types.ApplyPatchType,
data,
&options,
)
if err != nil {
if isIncompatibleServerError(err) {
return fmt.Errorf("server-side apply not available on the server: %v", err)
}
if apierrors.IsConflict(err) {
return fmt.Errorf("conflict occurred while applying %s/%s with kind %s: %w", target.Namespace, target.Name, target.Mapping.GroupVersionKind.Kind, err)
}
return err
}
return target.Refresh(obj, true)
}
// GetPodList uses the kubernetes interface to get the list of pods filtered by listOptions // GetPodList uses the kubernetes interface to get the list of pods filtered by listOptions
func (c *Client) GetPodList(namespace string, listOptions metav1.ListOptions) (*v1.PodList, error) { func (c *Client) GetPodList(namespace string, listOptions metav1.ListOptions) (*v1.PodList, error) {
podList, err := c.kubeClient.CoreV1().Pods(namespace).List(context.Background(), listOptions) podList, err := c.kubeClient.CoreV1().Pods(namespace).List(context.Background(), listOptions)

File diff suppressed because it is too large Load Diff

@ -60,11 +60,11 @@ type FailingKubeWaiter struct {
} }
// Create returns the configured error if set or prints // Create returns the configured error if set or prints
func (f *FailingKubeClient) Create(resources kube.ResourceList) (*kube.Result, error) { func (f *FailingKubeClient) Create(resources kube.ResourceList, options ...kube.ClientCreateOption) (*kube.Result, error) {
if f.CreateError != nil { if f.CreateError != nil {
return nil, f.CreateError return nil, f.CreateError
} }
return f.PrintingKubeClient.Create(resources) return f.PrintingKubeClient.Create(resources, options...)
} }
// Get returns the configured error if set or prints // Get returns the configured error if set or prints
@ -117,19 +117,11 @@ func (f *FailingKubeWaiter) WatchUntilReady(resources kube.ResourceList, d time.
} }
// Update returns the configured error if set or prints // Update returns the configured error if set or prints
func (f *FailingKubeClient) Update(r, modified kube.ResourceList, ignoreMe bool) (*kube.Result, error) { func (f *FailingKubeClient) Update(r, modified kube.ResourceList, options ...kube.ClientUpdateOption) (*kube.Result, error) {
if f.UpdateError != nil { if f.UpdateError != nil {
return &kube.Result{}, f.UpdateError return &kube.Result{}, f.UpdateError
} }
return f.PrintingKubeClient.Update(r, modified, ignoreMe) return f.PrintingKubeClient.Update(r, modified, options...)
}
// Update returns the configured error if set or prints
func (f *FailingKubeClient) UpdateThreeWayMerge(r, modified kube.ResourceList, ignoreMe bool) (*kube.Result, error) {
if f.UpdateError != nil {
return &kube.Result{}, f.UpdateError
}
return f.PrintingKubeClient.Update(r, modified, ignoreMe)
} }
// Build returns the configured error if set or prints // Build returns the configured error if set or prints

@ -49,7 +49,7 @@ func (p *PrintingKubeClient) IsReachable() error {
} }
// Create prints the values of what would be created with a real KubeClient. // Create prints the values of what would be created with a real KubeClient.
func (p *PrintingKubeClient) Create(resources kube.ResourceList) (*kube.Result, error) { func (p *PrintingKubeClient) Create(resources kube.ResourceList, _ ...kube.ClientCreateOption) (*kube.Result, error) {
_, err := io.Copy(p.Out, bufferize(resources)) _, err := io.Copy(p.Out, bufferize(resources))
if err != nil { if err != nil {
return nil, err return nil, err
@ -98,7 +98,7 @@ func (p *PrintingKubeClient) Delete(resources kube.ResourceList) (*kube.Result,
} }
// Update implements KubeClient Update. // Update implements KubeClient Update.
func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ bool) (*kube.Result, error) { func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ ...kube.ClientUpdateOption) (*kube.Result, error) {
_, err := io.Copy(p.Out, bufferize(modified)) _, err := io.Copy(p.Out, bufferize(modified))
if err != nil { if err != nil {
return nil, err return nil, err

@ -30,14 +30,14 @@ import (
// A KubernetesClient must be concurrency safe. // A KubernetesClient must be concurrency safe.
type Interface interface { type Interface interface {
// Create creates one or more resources. // Create creates one or more resources.
Create(resources ResourceList) (*Result, error) Create(resources ResourceList, options ...ClientCreateOption) (*Result, error)
// Delete destroys one or more resources. // Delete destroys one or more resources.
Delete(resources ResourceList) (*Result, []error) Delete(resources ResourceList) (*Result, []error)
// Update updates one or more resources or creates the resource // Update updates one or more resources or creates the resource
// if it doesn't exist. // if it doesn't exist.
Update(original, target ResourceList, force bool) (*Result, error) Update(original, target ResourceList, options ...ClientUpdateOption) (*Result, error)
// Build creates a resource list from a Reader. // Build creates a resource list from a Reader.
// //
@ -53,13 +53,6 @@ type Interface interface {
GetWaiter(ws WaitStrategy) (Waiter, error) GetWaiter(ws WaitStrategy) (Waiter, error)
} }
// InterfaceThreeWayMerge was introduced to avoid breaking backwards compatibility for Interface implementers.
//
// TODO Helm 4: Remove InterfaceThreeWayMerge and integrate its method(s) into the Interface.
type InterfaceThreeWayMerge interface {
UpdateThreeWayMerge(original, target ResourceList, force bool) (*Result, error)
}
// Waiter defines methods related to waiting for resource states. // Waiter defines methods related to waiting for resource states.
type Waiter interface { type Waiter interface {
// Wait waits up to the given timeout for the specified resources to be ready. // Wait waits up to the given timeout for the specified resources to be ready.
@ -125,7 +118,6 @@ type InterfaceResources interface {
} }
var _ Interface = (*Client)(nil) var _ Interface = (*Client)(nil)
var _ InterfaceThreeWayMerge = (*Client)(nil)
var _ InterfaceLogs = (*Client)(nil) var _ InterfaceLogs = (*Client)(nil)
var _ InterfaceDeletionPropagation = (*Client)(nil) var _ InterfaceDeletionPropagation = (*Client)(nil)
var _ InterfaceResources = (*Client)(nil) var _ InterfaceResources = (*Client)(nil)

@ -18,7 +18,6 @@ package kube // import "helm.sh/helm/v4/pkg/kube"
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"log/slog" "log/slog"
"net/http" "net/http"
@ -223,26 +222,6 @@ func (hw *legacyWaiter) WatchUntilReady(resources ResourceList, timeout time.Dur
return perform(resources, hw.watchTimeout(timeout)) return perform(resources, hw.watchTimeout(timeout))
} }
func perform(infos ResourceList, fn func(*resource.Info) error) error {
var result error
if len(infos) == 0 {
return ErrNoObjectsVisited
}
errs := make(chan error)
go batchPerform(infos, fn, errs)
for range infos {
err := <-errs
if err != nil {
result = errors.Join(result, err)
}
}
return result
}
func (hw *legacyWaiter) watchUntilReady(timeout time.Duration, info *resource.Info) error { func (hw *legacyWaiter) watchUntilReady(timeout time.Duration, info *resource.Info) error {
kind := info.Mapping.GroupVersionKind.Kind kind := info.Mapping.GroupVersionKind.Kind
switch kind { switch kind {

Loading…
Cancel
Save