fix client-side to server-side field manager migration

Signed-off-by: George Jenkins <gvjenkins@gmail.com>
pull/30812/head
George Jenkins 4 weeks ago
parent b4b2392f7e
commit ebc874ef84

@ -488,7 +488,8 @@ func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.Resource
resources, resources,
kube.ClientUpdateOptionForceReplace(i.ForceReplace), kube.ClientUpdateOptionForceReplace(i.ForceReplace),
kube.ClientUpdateOptionServerSideApply(i.ServerSideApply, i.ForceConflicts), kube.ClientUpdateOptionServerSideApply(i.ServerSideApply, i.ForceConflicts),
kube.ClientUpdateOptionThreeWayMergeForUnstructured(updateThreeWayMergeForUnstructured)) kube.ClientUpdateOptionThreeWayMergeForUnstructured(updateThreeWayMergeForUnstructured),
kube.ClientUpdateOptionUpgradeClientSideFieldManager(true))
} }
if err != nil { if err != nil {
return rel, err return rel, err

@ -210,7 +210,8 @@ func (r *Rollback) performRollback(currentRelease, targetRelease *release.Releas
target, target,
kube.ClientUpdateOptionForceReplace(r.ForceReplace), kube.ClientUpdateOptionForceReplace(r.ForceReplace),
kube.ClientUpdateOptionServerSideApply(serverSideApply, r.ForceConflicts), kube.ClientUpdateOptionServerSideApply(serverSideApply, r.ForceConflicts),
kube.ClientUpdateOptionThreeWayMergeForUnstructured(false)) kube.ClientUpdateOptionThreeWayMergeForUnstructured(false),
kube.ClientUpdateOptionUpgradeClientSideFieldManager(true))
if err != nil { if err != nil {
msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err) msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err)

@ -399,6 +399,7 @@ func (u *Upgrade) performUpgrade(ctx context.Context, originalRelease, upgradedR
defer close(doneChan) defer close(doneChan)
go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease, serverSideApply) go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease, serverSideApply)
go u.handleContext(ctx, doneChan, ctxChan, upgradedRelease) go u.handleContext(ctx, doneChan, ctxChan, upgradedRelease)
select { select {
case result := <-rChan: case result := <-rChan:
return result.r, result.e return result.r, result.e
@ -431,6 +432,11 @@ func (u *Upgrade) handleContext(ctx context.Context, done chan interface{}, c ch
return return
} }
} }
func isReleaseApplyMethodClientSideApply(applyMethod string) bool {
return applyMethod == "" || applyMethod == string(release.ApplyMethodClientSideApply)
}
func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release, serverSideApply bool) { func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release, serverSideApply bool) {
// pre-upgrade hooks // pre-upgrade hooks
@ -443,11 +449,13 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele
slog.Debug("upgrade hooks disabled", "name", upgradedRelease.Name) slog.Debug("upgrade hooks disabled", "name", upgradedRelease.Name)
} }
upgradeClientSideFieldManager := isReleaseApplyMethodClientSideApply(originalRelease.ApplyMethod) && serverSideApply // Update client-side field manager if transitioning from client-side to server-side apply
results, err := u.cfg.KubeClient.Update( results, err := u.cfg.KubeClient.Update(
current, current,
target, target,
kube.ClientUpdateOptionForceReplace(u.ForceReplace), kube.ClientUpdateOptionForceReplace(u.ForceReplace),
kube.ClientUpdateOptionServerSideApply(serverSideApply, u.ForceConflicts)) kube.ClientUpdateOptionServerSideApply(serverSideApply, u.ForceConflicts),
kube.ClientUpdateOptionUpgradeClientSideFieldManager(upgradeClientSideFieldManager))
if err != nil { if err != nil {
u.cfg.recordRelease(originalRelease) u.cfg.recordRelease(originalRelease)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)

@ -47,12 +47,14 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/jsonmergepatch" "k8s.io/apimachinery/pkg/util/jsonmergepatch"
"k8s.io/apimachinery/pkg/util/mergepatch" "k8s.io/apimachinery/pkg/util/mergepatch"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/resource" "k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/util/csaupgrade"
"k8s.io/client-go/util/retry" "k8s.io/client-go/util/retry"
cmdutil "k8s.io/kubectl/pkg/cmd/util" cmdutil "k8s.io/kubectl/pkg/cmd/util"
) )
@ -577,12 +579,13 @@ func (c *Client) update(originals, targets ResourceList, updateApplyFunc UpdateA
} }
type clientUpdateOptions struct { type clientUpdateOptions struct {
threeWayMergeForUnstructured bool threeWayMergeForUnstructured bool
serverSideApply bool serverSideApply bool
forceReplace bool forceReplace bool
forceConflicts bool forceConflicts bool
dryRun bool dryRun bool
fieldValidationDirective FieldValidationDirective fieldValidationDirective FieldValidationDirective
upgradeClientSideFieldManager bool
} }
type ClientUpdateOption func(*clientUpdateOptions) error type ClientUpdateOption func(*clientUpdateOptions) error
@ -640,14 +643,32 @@ func ClientUpdateOptionDryRun(dryRun bool) ClientUpdateOption {
// - For server-side apply: the directive is sent to the server to perform the validation // - For server-side apply: the directive is sent to the server to perform the validation
// //
// Defaults to `FieldValidationDirectiveStrict` // Defaults to `FieldValidationDirectiveStrict`
func ClientUpdateOptionFieldValidationDirective(fieldValidationDirective FieldValidationDirective) ClientCreateOption { func ClientUpdateOptionFieldValidationDirective(fieldValidationDirective FieldValidationDirective) ClientUpdateOption {
return func(o *clientCreateOptions) error { return func(o *clientUpdateOptions) error {
o.fieldValidationDirective = fieldValidationDirective o.fieldValidationDirective = fieldValidationDirective
return nil return nil
} }
} }
// ClientUpdateOptionUpgradeClientSideFieldManager specifies that resources client-side field manager should be upgraded to server-side apply
// (before applying the object server-side)
// This is required when upgrading a chart from client-side to server-side apply, otherwise the client-side field management remains. Conflicting with server-side applied updates.
//
// Note:
// if this option is specified, but the object is not managed by client-side field manager, it will be a no-op. However, the cost of fetching the objects will be incurred.
//
// see:
// - https://github.com/kubernetes/kubernetes/pull/112905
// - `UpgradeManagedFields` / https://github.com/kubernetes/kubernetes/blob/f47e9696d7237f1011d23c9b55f6947e60526179/staging/src/k8s.io/client-go/util/csaupgrade/upgrade.go#L81
func ClientUpdateOptionUpgradeClientSideFieldManager(upgradeClientSideFieldManager bool) ClientUpdateOption {
return func(o *clientUpdateOptions) error {
o.upgradeClientSideFieldManager = upgradeClientSideFieldManager
return nil
}
}
type UpdateApplyFunc func(original, target *resource.Info) error type UpdateApplyFunc func(original, target *resource.Info) error
// Update takes the current list of objects and target list of objects and // Update takes the current list of objects and target list of objects and
@ -707,15 +728,28 @@ func (c *Client) Update(originals, targets ResourceList, options ...ClientUpdate
"using server-side apply for resource update", "using server-side apply for resource update",
slog.Bool("forceConflicts", updateOptions.forceConflicts), slog.Bool("forceConflicts", updateOptions.forceConflicts),
slog.Bool("dryRun", updateOptions.dryRun), slog.Bool("dryRun", updateOptions.dryRun),
slog.String("fieldValidationDirective", string(updateOptions.fieldValidationDirective))) slog.String("fieldValidationDirective", string(updateOptions.fieldValidationDirective)),
return func(_, target *resource.Info) error { slog.Bool("upgradeClientSideFieldManager", updateOptions.upgradeClientSideFieldManager))
err := patchResourceServerSide(target, updateOptions.dryRun, updateOptions.forceConflicts, updateOptions.fieldValidationDirective) return func(original, target *resource.Info) error {
logger := slog.With( logger := slog.With(
slog.String("namespace", target.Namespace), slog.String("namespace", target.Namespace),
slog.String("name", target.Name), slog.String("name", target.Name),
slog.String("gvk", target.Mapping.GroupVersionKind.String())) slog.String("gvk", target.Mapping.GroupVersionKind.String()))
if err != nil {
if updateOptions.upgradeClientSideFieldManager {
patched, err := upgradeClientSideFieldManager(original, updateOptions.dryRun, updateOptions.fieldValidationDirective)
if err != nil {
slog.Debug("Error patching resource to replace CSA field management", slog.Any("error", err))
return err
}
if patched {
logger.Debug("Upgraded object client-side field management with server-side apply field management")
}
}
if err := patchResourceServerSide(target, updateOptions.dryRun, updateOptions.forceConflicts, updateOptions.fieldValidationDirective); err != nil {
logger.Debug("Error patching resource", slog.Any("error", err)) logger.Debug("Error patching resource", slog.Any("error", err))
return err return err
} }
@ -996,19 +1030,76 @@ func patchResourceClientSide(original runtime.Object, target *resource.Info, thr
return nil return nil
} }
// upgradeClientSideFieldManager is simply a wrapper around csaupgrade.UpgradeManagedFields
// that ugrade CSA managed fields to SSA apply
// see: https://github.com/kubernetes/kubernetes/pull/112905
func upgradeClientSideFieldManager(info *resource.Info, dryRun bool, fieldValidationDirective FieldValidationDirective) (bool, error) {
fieldManagerName := getManagedFieldsManager()
patched := false
err := retry.RetryOnConflict(
retry.DefaultRetry,
func() error {
if err := info.Get(); err != nil {
return fmt.Errorf("failed to get object %s/%s %s: %w", info.Namespace, info.Name, info.Mapping.GroupVersionKind.String(), err)
}
helper := resource.NewHelper(
info.Client,
info.Mapping).
DryRun(dryRun).
WithFieldManager(fieldManagerName).
WithFieldValidation(string(fieldValidationDirective))
patchData, err := csaupgrade.UpgradeManagedFieldsPatch(
info.Object,
sets.New(fieldManagerName),
fieldManagerName)
if err != nil {
return fmt.Errorf("failed to upgrade managed fields for object %s/%s %s: %w", info.Namespace, info.Name, info.Mapping.GroupVersionKind.String(), err)
}
if len(patchData) == 0 {
return nil
}
obj, err := helper.Patch(
info.Namespace,
info.Name,
types.JSONPatchType,
patchData,
nil)
if err == nil {
patched = true
return info.Refresh(obj, true)
}
if !apierrors.IsConflict(err) {
return fmt.Errorf("failed to patch object to upgrade CSA field manager %s/%s %s: %w", info.Namespace, info.Name, info.Mapping.GroupVersionKind.String(), err)
}
return err
})
return patched, err
}
// Patch reource using server-side apply // Patch reource using server-side apply
func patchResourceServerSide(target *resource.Info, dryRun bool, forceConflicts bool, fieldValidationDirective FieldValidationDirective) error { func patchResourceServerSide(target *resource.Info, dryRun bool, forceConflicts bool, fieldValidationDirective FieldValidationDirective) error {
helper := resource.NewHelper( helper := resource.NewHelper(
target.Client, target.Client,
target.Mapping). target.Mapping).
DryRun(dryRun). DryRun(dryRun).
WithFieldManager(ManagedFieldsManager). WithFieldManager(getManagedFieldsManager()).
WithFieldValidation(string(fieldValidationDirective)) WithFieldValidation(string(fieldValidationDirective))
// Send the full object to be applied on the server side. // Send the full object to be applied on the server side.
data, err := runtime.Encode(unstructured.UnstructuredJSONScheme, target.Object) data, err := runtime.Encode(unstructured.UnstructuredJSONScheme, target.Object)
if err != nil { if err != nil {
return fmt.Errorf("failed to encode object %s/%s with kind %s: %w", target.Namespace, target.Name, target.Mapping.GroupVersionKind.Kind, err) return fmt.Errorf("failed to encode object %s/%s %s: %w", target.Namespace, target.Name, target.Mapping.GroupVersionKind.String(), err)
} }
options := metav1.PatchOptions{ options := metav1.PatchOptions{
Force: &forceConflicts, Force: &forceConflicts,
@ -1026,7 +1117,7 @@ func patchResourceServerSide(target *resource.Info, dryRun bool, forceConflicts
} }
if apierrors.IsConflict(err) { if apierrors.IsConflict(err) {
return fmt.Errorf("conflict occurred while applying %s/%s with kind %s: %w", target.Namespace, target.Name, target.Mapping.GroupVersionKind.Kind, err) return fmt.Errorf("conflict occurred while applying object %s/%s %s: %w", target.Namespace, target.Name, target.Mapping.GroupVersionKind.String(), err)
} }
return err return err

@ -339,9 +339,11 @@ func TestUpdate(t *testing.T) {
} }
expectedActionsServerSideApply := []string{ expectedActionsServerSideApply := []string{
"/namespaces/default/pods/starfish:GET",
"/namespaces/default/pods/starfish:GET", "/namespaces/default/pods/starfish:GET",
"/namespaces/default/pods/starfish:PATCH", "/namespaces/default/pods/starfish:PATCH",
"/namespaces/default/pods/otter:GET", "/namespaces/default/pods/otter:GET",
"/namespaces/default/pods/otter:GET",
"/namespaces/default/pods/otter:PATCH", "/namespaces/default/pods/otter:PATCH",
"/namespaces/default/pods/dolphin:GET", "/namespaces/default/pods/dolphin:GET",
"/namespaces/default/pods:POST", // create dolphin "/namespaces/default/pods:POST", // create dolphin
@ -467,7 +469,8 @@ func TestUpdate(t *testing.T) {
second, second,
ClientUpdateOptionThreeWayMergeForUnstructured(tc.ThreeWayMergeForUnstructured), ClientUpdateOptionThreeWayMergeForUnstructured(tc.ThreeWayMergeForUnstructured),
ClientUpdateOptionForceReplace(false), ClientUpdateOptionForceReplace(false),
ClientUpdateOptionServerSideApply(tc.ServerSideApply, false)) ClientUpdateOptionServerSideApply(tc.ServerSideApply, false),
ClientUpdateOptionUpgradeClientSideFieldManager(true))
require.NoError(t, err) require.NoError(t, err)
assert.Len(t, result.Created, 1, "expected 1 resource created, got %d", len(result.Created)) assert.Len(t, result.Created, 1, "expected 1 resource created, got %d", len(result.Created))

Loading…
Cancel
Save