Kube client support server-side apply

Signed-off-by: George Jenkins <gvjenkins@gmail.com>
pull/31030/head
George Jenkins 2 months ago
parent 8affd6178f
commit 45141451b4

@ -73,7 +73,9 @@ func (cfg *Configuration) execHook(rl *release.Release, hook release.HookEvent,
h.LastRun.Phase = release.HookPhaseUnknown
// Create hook resources
if _, err := cfg.KubeClient.Create(resources); err != nil {
if _, err := cfg.KubeClient.Create(
resources,
kube.ClientCreateOptionServerSideApply(false)); err != nil {
h.LastRun.CompletedAt = helmtime.Now()
h.LastRun.Phase = release.HookPhaseFailed
return fmt.Errorf("warning: Hook %s %s failed: %w", hook, h.Path, err)

@ -173,7 +173,9 @@ func (i *Install) installCRDs(crds []chart.CRD) error {
}
// Send them to Kube
if _, err := i.cfg.KubeClient.Create(res); err != nil {
if _, err := i.cfg.KubeClient.Create(
res,
kube.ClientCreateOptionServerSideApply(false)); err != nil {
// If the error is CRD already exists, continue.
if apierrors.IsAlreadyExists(err) {
crdName := res[0].Name
@ -399,7 +401,9 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma
if err != nil {
return nil, err
}
if _, err := i.cfg.KubeClient.Create(resourceList); err != nil && !apierrors.IsAlreadyExists(err) {
if _, err := i.cfg.KubeClient.Create(
resourceList,
kube.ClientCreateOptionServerSideApply(false)); err != nil && !apierrors.IsAlreadyExists(err) {
return nil, err
}
}
@ -468,13 +472,17 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource
// do an update, but it's not clear whether we WANT to do an update if the reuse is set
// to true, since that is basically an upgrade operation.
if len(toBeAdopted) == 0 && len(resources) > 0 {
_, err = i.cfg.KubeClient.Create(resources)
_, err = i.cfg.KubeClient.Create(
resources,
kube.ClientCreateOptionServerSideApply(false))
} else if len(resources) > 0 {
if i.TakeOwnership {
_, err = i.cfg.KubeClient.(kube.InterfaceThreeWayMerge).UpdateThreeWayMerge(toBeAdopted, resources, i.ForceReplace)
} else {
_, err = i.cfg.KubeClient.Update(toBeAdopted, resources, i.ForceReplace)
}
updateThreeWayMergeForUnstructured := i.TakeOwnership
_, err = i.cfg.KubeClient.Update(
toBeAdopted,
resources,
kube.ClientUpdateOptionServerSideApply(false),
kube.ClientUpdateOptionThreeWayMergeForUnstructured(updateThreeWayMergeForUnstructured),
kube.ClientUpdateOptionForceReplace(i.ForceReplace))
}
if err != nil {
return rel, err

@ -190,7 +190,11 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
if err != nil {
return targetRelease, fmt.Errorf("unable to set metadata visitor from target release: %w", err)
}
results, err := r.cfg.KubeClient.Update(current, target, r.ForceReplace)
results, err := r.cfg.KubeClient.Update(
current,
target,
kube.ClientUpdateOptionServerSideApply(false),
kube.ClientUpdateOptionForceReplace(r.ForceReplace))
if err != nil {
msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err)

@ -426,7 +426,11 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele
slog.Debug("upgrade hooks disabled", "name", upgradedRelease.Name)
}
results, err := u.cfg.KubeClient.Update(current, target, u.ForceReplace)
results, err := u.cfg.KubeClient.Update(
current,
target,
kube.ClientUpdateOptionServerSideApply(false),
kube.ClientUpdateOptionForceReplace(u.ForceReplace))
if err != nil {
u.cfg.recordRelease(originalRelease)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)

@ -24,6 +24,7 @@ import (
"fmt"
"io"
"log/slog"
"net/http"
"os"
"path/filepath"
"reflect"
@ -91,6 +92,14 @@ const (
HookOnlyStrategy WaitStrategy = "hookOnly"
)
type FieldValidationDirective string
const (
FieldValidationDirectiveIgnore FieldValidationDirective = "Ignore"
FieldValidationDirectiveWarn FieldValidationDirective = "Warn"
FieldValidationDirectiveStrict FieldValidationDirective = "Strict"
)
func init() {
// Add CRDs to the scheme. They are missing by default.
if err := apiextv1.AddToScheme(scheme.Scheme); err != nil {
@ -194,10 +203,101 @@ func (c *Client) IsReachable() error {
return nil
}
type clientCreateOptions struct {
serverSideApply bool
forceConflicts bool
dryRun bool
fieldValidationDirective FieldValidationDirective
}
type ClientCreateOption func(*clientCreateOptions) error
// ClientUpdateOptionServerSideApply enables performing object apply server-side
// see: https://kubernetes.io/docs/reference/using-api/server-side-apply/
func ClientCreateOptionServerSideApply(serverSideApply bool) ClientCreateOption {
return func(o *clientCreateOptions) error {
o.serverSideApply = serverSideApply
return nil
}
}
// ClientCreateOptionForceConflicts forces field conflicts to be resolved
// see: https://kubernetes.io/docs/reference/using-api/server-side-apply/#conflicts
// Only valid when ClientUpdateOptionServerSideApply enabled
func ClientCreateOptionForceConflicts(forceConflicts bool) ClientCreateOption {
return func(o *clientCreateOptions) error {
o.forceConflicts = forceConflicts
return nil
}
}
// ClientCreateOptionDryRun performs non-mutating operations only
func ClientCreateOptionDryRun(dryRun bool) ClientCreateOption {
return func(o *clientCreateOptions) error {
o.dryRun = dryRun
return nil
}
}
// ClientCreateOptionFieldValidationDirective specifies show API operations validate object's schema
// - For client-side apply: this is ignored
// - For server-side apply: the directive is sent to the server to perform the validation
//
// Defaults to `FieldValidationDirectiveStrict`
func ClientCreateOptionFieldValidationDirective(fieldValidationDirective FieldValidationDirective) ClientCreateOption {
return func(o *clientCreateOptions) error {
o.fieldValidationDirective = fieldValidationDirective
return nil
}
}
// Create creates Kubernetes resources specified in the resource list.
func (c *Client) Create(resources ResourceList) (*Result, error) {
func (c *Client) Create(resources ResourceList, options ...ClientCreateOption) (*Result, error) {
slog.Debug("creating resource(s)", "resources", len(resources))
if err := perform(resources, createResource); err != nil {
createOptions := clientCreateOptions{
serverSideApply: true, // Default to server-side apply
fieldValidationDirective: FieldValidationDirectiveStrict,
}
for _, o := range options {
o(&createOptions)
}
if createOptions.forceConflicts && !createOptions.serverSideApply {
return nil, fmt.Errorf("invalid operation: force conflicts can only be used with server-side apply")
}
makeCreateApplyFunc := func() func(target *resource.Info) error {
if createOptions.serverSideApply {
slog.Debug("using server-side apply for resource creation", slog.Bool("forceConflicts", createOptions.forceConflicts), slog.Bool("dryRun", createOptions.dryRun), slog.String("fieldValidationDirective", string(createOptions.fieldValidationDirective)))
return func(target *resource.Info) error {
err := patchResourceServerSide(target, createOptions.dryRun, createOptions.forceConflicts, createOptions.fieldValidationDirective)
logger := slog.With(
slog.String("namespace", target.Namespace),
slog.String("name", target.Name),
slog.String("gvk", target.Mapping.GroupVersionKind.String()))
if err != nil {
logger.Debug("Error patching resource", slog.Any("error", err))
return err
}
logger.Debug("Patched resource")
return nil
}
}
slog.Debug("using client-side apply for resource creation")
return createResource
}
if err := perform(resources, makeCreateApplyFunc()); err != nil {
return nil, err
}
return &Result{Created: resources}, nil
@ -348,96 +448,98 @@ func (c *Client) namespace() string {
return v1.NamespaceDefault
}
// newBuilder returns a new resource builder for structured api objects.
func (c *Client) newBuilder() *resource.Builder {
return c.Factory.NewBuilder().
ContinueOnError().
NamespaceParam(c.namespace()).
DefaultNamespace().
Flatten()
}
// Build validates for Kubernetes objects and returns unstructured infos.
func (c *Client) Build(reader io.Reader, validate bool) (ResourceList, error) {
validationDirective := metav1.FieldValidationIgnore
func determineFieldValidationDirective(validate bool) FieldValidationDirective {
if validate {
validationDirective = metav1.FieldValidationStrict
return FieldValidationDirectiveStrict
}
schema, err := c.Factory.Validator(validationDirective)
return FieldValidationDirectiveIgnore
}
func buildResourceList(f Factory, namespace string, validationDirective FieldValidationDirective, reader io.Reader, transformRequest resource.RequestTransform) (ResourceList, error) {
schema, err := f.Validator(string(validationDirective))
if err != nil {
return nil, err
}
result, err := c.newBuilder().
builder := f.NewBuilder().
ContinueOnError().
NamespaceParam(namespace).
DefaultNamespace().
Flatten().
Unstructured().
Schema(schema).
Stream(reader, "").
Do().Infos()
Stream(reader, "")
if transformRequest != nil {
builder.TransformRequests(transformRequest)
}
result, err := builder.Do().Infos()
return result, scrubValidationError(err)
}
// Build validates for Kubernetes objects and returns unstructured infos.
func (c *Client) Build(reader io.Reader, validate bool) (ResourceList, error) {
return buildResourceList(
c.Factory,
c.namespace(),
determineFieldValidationDirective(validate),
reader,
nil)
}
// BuildTable validates for Kubernetes objects and returns unstructured infos.
// The returned kind is a Table.
func (c *Client) BuildTable(reader io.Reader, validate bool) (ResourceList, error) {
validationDirective := metav1.FieldValidationIgnore
if validate {
validationDirective = metav1.FieldValidationStrict
}
schema, err := c.Factory.Validator(validationDirective)
if err != nil {
return nil, err
}
result, err := c.newBuilder().
Unstructured().
Schema(schema).
Stream(reader, "").
TransformRequests(transformRequests).
Do().Infos()
return result, scrubValidationError(err)
return buildResourceList(
c.Factory,
c.namespace(),
determineFieldValidationDirective(validate),
reader,
transformRequests)
}
func (c *Client) update(original, target ResourceList, force, threeWayMerge bool) (*Result, error) {
func (c *Client) update(target, original ResourceList, updateApplyFunc func(target, original *resource.Info) error) (*Result, error) {
updateErrors := []error{}
res := &Result{}
slog.Debug("checking resources for changes", "resources", len(target))
err := target.Visit(func(info *resource.Info, err error) error {
err := target.Visit(func(target *resource.Info, err error) error {
if err != nil {
return err
}
helper := resource.NewHelper(info.Client, info.Mapping).WithFieldManager(getManagedFieldsManager())
if _, err := helper.Get(info.Namespace, info.Name); err != nil {
helper := resource.NewHelper(target.Client, target.Mapping).WithFieldManager(getManagedFieldsManager())
if _, err := helper.Get(target.Namespace, target.Name); err != nil {
if !apierrors.IsNotFound(err) {
return fmt.Errorf("could not get information about the resource: %w", err)
}
// Append the created resource to the results, even if something fails
res.Created = append(res.Created, info)
res.Created = append(res.Created, target)
// Since the resource does not exist, create it.
if err := createResource(info); err != nil {
if err := createResource(target); err != nil {
return fmt.Errorf("failed to create resource: %w", err)
}
kind := info.Mapping.GroupVersionKind.Kind
slog.Debug("created a new resource", "namespace", info.Namespace, "name", info.Name, "kind", kind)
kind := target.Mapping.GroupVersionKind.Kind
slog.Debug("created a new resource", "namespace", target.Namespace, "name", target.Name, "kind", kind)
return nil
}
originalInfo := original.Get(info)
if originalInfo == nil {
kind := info.Mapping.GroupVersionKind.Kind
return fmt.Errorf("no %s with the name %q found", kind, info.Name)
original := original.Get(target)
if original == nil {
kind := target.Mapping.GroupVersionKind.Kind
return fmt.Errorf("original object %s with the name %q not found", kind, target.Name)
}
if err := updateResource(c, info, originalInfo.Object, force, threeWayMerge); err != nil {
slog.Debug("error updating the resource", "namespace", info.Namespace, "name", info.Name, "kind", info.Mapping.GroupVersionKind.Kind, slog.Any("error", err))
if err := updateApplyFunc(target, original); err != nil {
updateErrors = append(updateErrors, err)
}
// Because we check for errors later, append the info regardless
res.Updated = append(res.Updated, info)
res.Updated = append(res.Updated, target)
return nil
})
@ -473,18 +575,81 @@ func (c *Client) update(original, target ResourceList, force, threeWayMerge bool
return res, nil
}
// Update takes the current list of objects and target list of objects and
// creates resources that don't already exist, updates resources that have been
// modified in the target configuration, and deletes resources from the current
// configuration that are not present in the target configuration. If an error
// occurs, a Result will still be returned with the error, containing all
// resource updates, creations, and deletions that were attempted. These can be
// used for cleanup or other logging purposes.
type clientUpdateOptions struct {
threeWayMergeForUnstructured bool
serverSideApply bool
forceReplace bool
forceConflicts bool
dryRun bool
fieldValidationDirective FieldValidationDirective
}
type ClientUpdateOption func(*clientUpdateOptions) error
// ClientUpdateOptionThreeWayMergeForUnstructured enables performing three-way merge for unstructured objects
// Must not be enabled when ClientUpdateOptionServerSideApply is enabled
func ClientUpdateOptionThreeWayMergeForUnstructured(threeWayMergeForUnstructured bool) ClientUpdateOption {
return func(o *clientUpdateOptions) error {
o.threeWayMergeForUnstructured = threeWayMergeForUnstructured
return nil
}
}
// ClientUpdateOptionServerSideApply enables performing object apply server-side (default)
// see: https://kubernetes.io/docs/reference/using-api/server-side-apply/
// Must not be enabled when ClientUpdateOptionThreeWayMerge is enabled
func ClientUpdateOptionServerSideApply(serverSideApply bool) ClientUpdateOption {
return func(o *clientUpdateOptions) error {
o.serverSideApply = serverSideApply
return nil
}
}
// ClientUpdateOptionForceReplace forces objects to be replaced rather than updated via patch
// Must not be enabled when ClientUpdateOptionForceConflicts is enabled
func ClientUpdateOptionForceReplace(forceReplace bool) ClientUpdateOption {
return func(o *clientUpdateOptions) error {
o.forceReplace = forceReplace
return nil
}
}
// ClientUpdateOptionForceConflicts forces field conflicts to be resolved
// see: https://kubernetes.io/docs/reference/using-api/server-side-apply/#conflicts
// Must not be enabled when ClientUpdateOptionForceReplace is enabled
func ClientUpdateOptionForceConflicts(forceConflicts bool) ClientUpdateOption {
return func(o *clientUpdateOptions) error {
o.forceConflicts = forceConflicts
return nil
}
}
// ClientUpdateOptionForceConflicts forces field conflicts to be resolved
// see: https://kubernetes.io/docs/reference/using-api/server-side-apply/#conflicts
// Must not be enabled when ClientUpdateOptionForceReplace is enabled
func ClientUpdateOptionDryRun(dryRun bool) ClientUpdateOption {
return func(o *clientUpdateOptions) error {
o.dryRun = dryRun
return nil
}
}
// ClientUpdateOptionFieldValidationDirective specifies show API operations validate object's schema
// - For client-side apply: this is ignored
// - For server-side apply: the directive is sent to the server to perform the validation
//
// The difference to Update is that UpdateThreeWayMerge does a three-way-merge
// for unstructured objects.
func (c *Client) UpdateThreeWayMerge(original, target ResourceList, force bool) (*Result, error) {
return c.update(original, target, force, true)
// Defaults to `FieldValidationDirectiveStrict`
func ClientUpdateOptionFieldValidationDirective(fieldValidationDirective FieldValidationDirective) ClientCreateOption {
return func(o *clientCreateOptions) error {
o.fieldValidationDirective = fieldValidationDirective
return nil
}
}
// Update takes the current list of objects and target list of objects and
@ -494,8 +659,78 @@ func (c *Client) UpdateThreeWayMerge(original, target ResourceList, force bool)
// occurs, a Result will still be returned with the error, containing all
// resource updates, creations, and deletions that were attempted. These can be
// used for cleanup or other logging purposes.
func (c *Client) Update(original, target ResourceList, force bool) (*Result, error) {
return c.update(original, target, force, false)
//
// The default is to use server-side apply, equivalent to: `ClientUpdateOptionServerSideApply(true)`
func (c *Client) Update(original, target ResourceList, options ...ClientUpdateOption) (*Result, error) {
updateOptions := clientUpdateOptions{
serverSideApply: true, // Default to server-side apply
fieldValidationDirective: FieldValidationDirectiveStrict,
}
for _, o := range options {
o(&updateOptions)
}
if updateOptions.threeWayMergeForUnstructured && updateOptions.serverSideApply {
return nil, fmt.Errorf("invalid operation: cannot use three-way merge for unstructured and server-side apply together")
}
if updateOptions.forceConflicts && updateOptions.forceReplace {
return nil, fmt.Errorf("invalid operation: cannot use force conflicts and force replace together")
}
if updateOptions.serverSideApply && updateOptions.forceReplace {
return nil, fmt.Errorf("invalid operation: cannot use server-side apply and force replace together")
}
makeUpdateApplyFunc := func() func(target, original *resource.Info) error {
if updateOptions.forceReplace {
slog.Debug(
"using resource replace update strategy",
slog.String("fieldValidationDirective", string(updateOptions.fieldValidationDirective)))
return func(target, original *resource.Info) error {
if err := replaceResource(target, updateOptions.fieldValidationDirective); err != nil {
slog.Debug("error replacing the resource", "namespace", target.Namespace, "name", target.Name, "kind", target.Mapping.GroupVersionKind.Kind, slog.Any("error", err))
return err
}
originalObject := original.Object
kind := target.Mapping.GroupVersionKind.Kind
slog.Debug("replace succeeded", "name", original.Name, "initialKind", originalObject.GetObjectKind().GroupVersionKind().Kind, "kind", kind)
return nil
}
} else if updateOptions.serverSideApply {
slog.Debug(
"using server-side apply for resource update",
slog.Bool("forceConflicts", updateOptions.forceConflicts),
slog.Bool("dryRun", updateOptions.dryRun),
slog.String("fieldValidationDirective", string(updateOptions.fieldValidationDirective)))
return func(target, _ *resource.Info) error {
err := patchResourceServerSide(target, updateOptions.dryRun, updateOptions.forceConflicts, updateOptions.fieldValidationDirective)
logger := slog.With(
slog.String("namespace", target.Namespace),
slog.String("name", target.Name),
slog.String("gvk", target.Mapping.GroupVersionKind.String()))
if err != nil {
logger.Debug("Error patching resource", slog.Any("error", err))
return err
}
logger.Debug("Patched resource")
return nil
}
}
slog.Debug("using client-side apply for resource update", slog.Bool("threeWayMergeForUnstructured", updateOptions.threeWayMergeForUnstructured))
return func(target, original *resource.Info) error {
return patchResourceClientSide(target, original.Object, updateOptions.threeWayMergeForUnstructured)
}
}
return c.update(target, original, makeUpdateApplyFunc())
}
// Delete deletes Kubernetes resources specified in the resources list with
@ -503,7 +738,7 @@ func (c *Client) Update(original, target ResourceList, force bool) (*Result, err
// if one or more fail and collect any errors. All successfully deleted items
// will be returned in the `Deleted` ResourceList that is part of the result.
func (c *Client) Delete(resources ResourceList) (*Result, []error) {
return rdelete(c, resources, metav1.DeletePropagationBackground)
return deleteResources(resources, metav1.DeletePropagationBackground)
}
// Delete deletes Kubernetes resources specified in the resources list with
@ -511,10 +746,10 @@ func (c *Client) Delete(resources ResourceList) (*Result, []error) {
// if one or more fail and collect any errors. All successfully deleted items
// will be returned in the `Deleted` ResourceList that is part of the result.
func (c *Client) DeleteWithPropagationPolicy(resources ResourceList, policy metav1.DeletionPropagation) (*Result, []error) {
return rdelete(c, resources, policy)
return deleteResources(resources, policy)
}
func rdelete(_ *Client, resources ResourceList, propagation metav1.DeletionPropagation) (*Result, []error) {
func deleteResources(resources ResourceList, propagation metav1.DeletionPropagation) (*Result, []error) {
var errs []error
res := &Result{}
mtx := sync.Mutex{}
@ -548,6 +783,17 @@ func rdelete(_ *Client, resources ResourceList, propagation metav1.DeletionPropa
return res, nil
}
// https://github.com/kubernetes/kubectl/blob/197123726db24c61aa0f78d1f0ba6e91a2ec2f35/pkg/cmd/apply/apply.go#L439
func isIncompatibleServerError(err error) bool {
// 415: Unsupported media type means we're talking to a server which doesn't
// support server-side apply.
if _, ok := err.(*apierrors.StatusError); !ok {
// Non-StatusError means the error isn't because the server is incompatible.
return false
}
return err.(*apierrors.StatusError).Status().Code == http.StatusUnsupportedMediaType
}
// getManagedFieldsManager returns the manager string. If one was set it will be returned.
// Otherwise, one is calculated based on the name of the binary.
func getManagedFieldsManager() string {
@ -568,18 +814,41 @@ func getManagedFieldsManager() string {
return filepath.Base(os.Args[0])
}
func perform(infos ResourceList, fn func(*resource.Info) error) error {
var result error
if len(infos) == 0 {
return ErrNoObjectsVisited
}
errs := make(chan error)
go batchPerform(infos, fn, errs)
for range infos {
err := <-errs
if err != nil {
result = errors.Join(result, err)
}
}
return result
}
func batchPerform(infos ResourceList, fn func(*resource.Info) error, errs chan<- error) {
var kind string
var wg sync.WaitGroup
defer wg.Wait()
for _, info := range infos {
currentKind := info.Object.GetObjectKind().GroupVersionKind().Kind
if kind != currentKind {
wg.Wait()
kind = currentKind
}
wg.Add(1)
go func(i *resource.Info) {
errs <- fn(i)
go func(info *resource.Info) {
errs <- fn(info)
wg.Done()
}(info)
}
@ -597,6 +866,7 @@ func createResource(info *resource.Info) error {
if err != nil {
return err
}
return info.Refresh(obj, true)
})
}
@ -674,48 +944,95 @@ func createPatch(target *resource.Info, current runtime.Object, threeWayMergeFor
return patch, types.StrategicMergePatchType, err
}
func updateResource(_ *Client, target *resource.Info, currentObj runtime.Object, force, threeWayMergeForUnstructured bool) error {
var (
obj runtime.Object
helper = resource.NewHelper(target.Client, target.Mapping).WithFieldManager(getManagedFieldsManager())
kind = target.Mapping.GroupVersionKind.Kind
)
func replaceResource(target *resource.Info, fieldValidationDirective FieldValidationDirective) error {
// if --force is applied, attempt to replace the existing resource with the new object.
if force {
var err error
obj, err = helper.Replace(target.Namespace, target.Name, true, target.Object)
if err != nil {
return fmt.Errorf("failed to replace object: %w", err)
}
slog.Debug("replace succeeded", "name", target.Name, "initialKind", currentObj.GetObjectKind().GroupVersionKind().Kind, "kind", kind)
} else {
patch, patchType, err := createPatch(target, currentObj, threeWayMergeForUnstructured)
if err != nil {
return fmt.Errorf("failed to create patch: %w", err)
}
helper := resource.NewHelper(target.Client, target.Mapping).
WithFieldValidation(string(fieldValidationDirective)).
WithFieldManager(getManagedFieldsManager())
if patch == nil || string(patch) == "{}" {
slog.Debug("no changes detected", "kind", kind, "name", target.Name)
// This needs to happen to make sure that Helm has the latest info from the API
// Otherwise there will be no labels and other functions that use labels will panic
if err := target.Get(); err != nil {
return fmt.Errorf("failed to refresh resource information: %w", err)
}
return nil
}
// send patch to server
slog.Debug("patching resource", "kind", kind, "name", target.Name, "namespace", target.Namespace)
obj, err = helper.Patch(target.Namespace, target.Name, patchType, patch, nil)
if err != nil {
return fmt.Errorf("cannot patch %q with kind %s: %w", target.Name, kind, err)
obj, err := helper.Replace(target.Namespace, target.Name, true, target.Object)
if err != nil {
return fmt.Errorf("failed to replace object: %w", err)
}
if err := target.Refresh(obj, true); err != nil {
return fmt.Errorf("failed to refresh object after replace: %w", err)
}
return nil
}
func patchResourceClientSide(target *resource.Info, original runtime.Object, threeWayMergeForUnstructured bool) error {
patch, patchType, err := createPatch(target, original, threeWayMergeForUnstructured)
if err != nil {
return fmt.Errorf("failed to create patch: %w", err)
}
kind := target.Mapping.GroupVersionKind.Kind
if patch == nil || string(patch) == "{}" {
slog.Debug("no changes detected", "kind", kind, "name", target.Name)
// This needs to happen to make sure that Helm has the latest info from the API
// Otherwise there will be no labels and other functions that use labels will panic
if err := target.Get(); err != nil {
return fmt.Errorf("failed to refresh resource information: %w", err)
}
return nil
}
// send patch to server
slog.Debug("patching resource", "kind", kind, "name", target.Name, "namespace", target.Namespace)
helper := resource.NewHelper(target.Client, target.Mapping).WithFieldManager(getManagedFieldsManager())
obj, err := helper.Patch(target.Namespace, target.Name, patchType, patch, nil)
if err != nil {
return fmt.Errorf("cannot patch %q with kind %s: %w", target.Name, kind, err)
}
target.Refresh(obj, true)
return nil
}
// Patch reource using server-side apply
func patchResourceServerSide(info *resource.Info, dryRun bool, forceConflicts bool, fieldValidationDirective FieldValidationDirective) error {
helper := resource.NewHelper(
info.Client,
info.Mapping).
DryRun(dryRun).
WithFieldManager(ManagedFieldsManager).
WithFieldValidation(string(fieldValidationDirective))
// Send the full object to be applied on the server side.
data, err := runtime.Encode(unstructured.UnstructuredJSONScheme, info.Object)
if err != nil {
return fmt.Errorf("failed to encode object %s/%s with kind %s: %w", info.Namespace, info.Name, info.Mapping.GroupVersionKind.Kind, err)
}
options := metav1.PatchOptions{
Force: &forceConflicts,
}
obj, err := helper.Patch(
info.Namespace,
info.Name,
types.ApplyPatchType,
data,
&options,
)
if err != nil {
if isIncompatibleServerError(err) {
return fmt.Errorf("server-side apply not available on the server: %v", err)
}
if apierrors.IsConflict(err) {
return fmt.Errorf("conflict occurred while applying %s/%s with kind %s: %w", info.Namespace, info.Name, info.Mapping.GroupVersionKind.Kind, err)
}
return err
}
return info.Refresh(obj, true)
}
// GetPodList uses the kubernetes interface to get the list of pods filtered by listOptions
func (c *Client) GetPodList(namespace string, listOptions metav1.ListOptions) (*v1.PodList, error) {
podList, err := c.kubeClient.CoreV1().Pods(namespace).List(context.Background(), listOptions)

File diff suppressed because it is too large Load Diff

@ -60,11 +60,11 @@ type FailingKubeWaiter struct {
}
// Create returns the configured error if set or prints
func (f *FailingKubeClient) Create(resources kube.ResourceList) (*kube.Result, error) {
func (f *FailingKubeClient) Create(resources kube.ResourceList, options ...kube.ClientCreateOption) (*kube.Result, error) {
if f.CreateError != nil {
return nil, f.CreateError
}
return f.PrintingKubeClient.Create(resources)
return f.PrintingKubeClient.Create(resources, options...)
}
// Get returns the configured error if set or prints
@ -117,19 +117,11 @@ func (f *FailingKubeWaiter) WatchUntilReady(resources kube.ResourceList, d time.
}
// Update returns the configured error if set or prints
func (f *FailingKubeClient) Update(r, modified kube.ResourceList, ignoreMe bool) (*kube.Result, error) {
func (f *FailingKubeClient) Update(r, modified kube.ResourceList, options ...kube.ClientUpdateOption) (*kube.Result, error) {
if f.UpdateError != nil {
return &kube.Result{}, f.UpdateError
}
return f.PrintingKubeClient.Update(r, modified, ignoreMe)
}
// Update returns the configured error if set or prints
func (f *FailingKubeClient) UpdateThreeWayMerge(r, modified kube.ResourceList, ignoreMe bool) (*kube.Result, error) {
if f.UpdateError != nil {
return &kube.Result{}, f.UpdateError
}
return f.PrintingKubeClient.Update(r, modified, ignoreMe)
return f.PrintingKubeClient.Update(r, modified, options...)
}
// Build returns the configured error if set or prints

@ -49,7 +49,7 @@ func (p *PrintingKubeClient) IsReachable() error {
}
// Create prints the values of what would be created with a real KubeClient.
func (p *PrintingKubeClient) Create(resources kube.ResourceList) (*kube.Result, error) {
func (p *PrintingKubeClient) Create(resources kube.ResourceList, _ ...kube.ClientCreateOption) (*kube.Result, error) {
_, err := io.Copy(p.Out, bufferize(resources))
if err != nil {
return nil, err
@ -98,7 +98,7 @@ func (p *PrintingKubeClient) Delete(resources kube.ResourceList) (*kube.Result,
}
// Update implements KubeClient Update.
func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ bool) (*kube.Result, error) {
func (p *PrintingKubeClient) Update(_, modified kube.ResourceList, _ ...kube.ClientUpdateOption) (*kube.Result, error) {
_, err := io.Copy(p.Out, bufferize(modified))
if err != nil {
return nil, err

@ -30,14 +30,14 @@ import (
// A KubernetesClient must be concurrency safe.
type Interface interface {
// Create creates one or more resources.
Create(resources ResourceList) (*Result, error)
Create(resources ResourceList, options ...ClientCreateOption) (*Result, error)
// Delete destroys one or more resources.
Delete(resources ResourceList) (*Result, []error)
// Update updates one or more resources or creates the resource
// if it doesn't exist.
Update(original, target ResourceList, force bool) (*Result, error)
Update(original, target ResourceList, options ...ClientUpdateOption) (*Result, error)
// Build creates a resource list from a Reader.
//
@ -53,13 +53,6 @@ type Interface interface {
GetWaiter(ws WaitStrategy) (Waiter, error)
}
// InterfaceThreeWayMerge was introduced to avoid breaking backwards compatibility for Interface implementers.
//
// TODO Helm 4: Remove InterfaceThreeWayMerge and integrate its method(s) into the Interface.
type InterfaceThreeWayMerge interface {
UpdateThreeWayMerge(original, target ResourceList, force bool) (*Result, error)
}
// Waiter defines methods related to waiting for resource states.
type Waiter interface {
// Wait waits up to the given timeout for the specified resources to be ready.
@ -125,7 +118,6 @@ type InterfaceResources interface {
}
var _ Interface = (*Client)(nil)
var _ InterfaceThreeWayMerge = (*Client)(nil)
var _ InterfaceLogs = (*Client)(nil)
var _ InterfaceDeletionPropagation = (*Client)(nil)
var _ InterfaceResources = (*Client)(nil)

@ -18,7 +18,6 @@ package kube // import "helm.sh/helm/v4/pkg/kube"
import (
"context"
"errors"
"fmt"
"log/slog"
"net/http"
@ -223,26 +222,6 @@ func (hw *legacyWaiter) WatchUntilReady(resources ResourceList, timeout time.Dur
return perform(resources, hw.watchTimeout(timeout))
}
func perform(infos ResourceList, fn func(*resource.Info) error) error {
var result error
if len(infos) == 0 {
return ErrNoObjectsVisited
}
errs := make(chan error)
go batchPerform(infos, fn, errs)
for range infos {
err := <-errs
if err != nil {
result = errors.Join(result, err)
}
}
return result
}
func (hw *legacyWaiter) watchUntilReady(timeout time.Duration, info *resource.Info) error {
kind := info.Mapping.GroupVersionKind.Kind
switch kind {

Loading…
Cancel
Save