From 603e88af6ff7f3c6f08a30a8ef62a736e5db2623 Mon Sep 17 00:00:00 2001 From: Sidharth Menon Date: Fri, 20 Dec 2024 02:07:25 +0000 Subject: [PATCH] (2/n): patch managed fields --- pkg/kube/client.go | 36 +++++- pkg/kube/migrate_to_ssa.go | 245 +++++++++++++++++++++++++++++++++++++ 2 files changed, 280 insertions(+), 1 deletion(-) create mode 100644 pkg/kube/migrate_to_ssa.go diff --git a/pkg/kube/client.go b/pkg/kube/client.go index be9469ce9..203c70198 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -59,6 +59,12 @@ import ( cmdutil "k8s.io/kubectl/pkg/cmd/util" ) +var FieldManagersToAdopt = []string{ + "kubectl", + "kubectl-client-side-apply", + "kubectl-edit", +} + // ErrNoObjectsVisited indicates that during a visit operation, no matching objects were found. var ErrNoObjectsVisited = errors.New("no objects visited") @@ -425,6 +431,9 @@ func (c *Client) Update(original, target ResourceList, force bool) (*Result, err updateErrors = append(updateErrors, err.Error()) } + // afterwards, we will reconcile managed fields on these objects and re-apply if necessary. + // the reason we do this is to avoid 3 network requests in the "happy/normal" case with SSA. + // we want to first (1) apply, (2) try to check if a migration is needed, and (3) run the migration if we must. return nil }) @@ -673,12 +682,37 @@ func applyResource(target *resource.Info, force bool) error { if err != nil { return errors.Wrap(err, "failed to encode target object") } - _, err = helper.Patch(target.Namespace, target.Name, types.ApplyPatchType, data, &metav1.PatchOptions{ + obj, err := helper.Patch(target.Namespace, target.Name, types.ApplyPatchType, data, &metav1.PatchOptions{ Force: &force, }) if err != nil { return errors.Wrapf(err, "cannot patch %q with kind %s", target.Name, target.Mapping.GroupVersionKind.Kind) } + target.Refresh(obj, true) + + // now, we will try to migrate managed fields of the object, if necessary. + didMigrate, err := migrateManagedFields( + helper, + target, + FieldManagersToAdopt, + getManagedFieldsManager(), + ) + if err != nil { + return errors.Wrap(err, "failed to upgrade managed fields for helm ssa migration") + } else if !didMigrate { + // in the average case, there's no work to do - the object is already up to date. + return nil + } + + // now we know that there were some extra managed fields lying around. Re-send original SSA to the api-server + // to clear the old managed fields. + obj, err = helper.Patch(target.Namespace, target.Name, types.ApplyPatchType, data, &metav1.PatchOptions{ + Force: &force, + }) + if err != nil { + return errors.Wrapf(err, "cannot patch %q with kind %s", target.Name, target.Mapping.GroupVersionKind.Kind) + } + target.Refresh(obj, true) return nil } diff --git a/pkg/kube/migrate_to_ssa.go b/pkg/kube/migrate_to_ssa.go new file mode 100644 index 000000000..f28bab828 --- /dev/null +++ b/pkg/kube/migrate_to_ssa.go @@ -0,0 +1,245 @@ +package kube + +import ( + "bytes" + "encoding/json" + "fmt" + "reflect" + + "github.com/pkg/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/cli-runtime/pkg/resource" + "sigs.k8s.io/structured-merge-diff/v4/fieldpath" +) + +func migrateManagedFields( + helper *resource.Helper, + info *resource.Info, + managersToAdopt []string, + helmManager string, +) (didMigrate bool, err error) { + // retry a few times on conflict errors. + for i := 0; i < 5; i++ { + var patchData []byte + var obj runtime.Object + + patchData, err := createMigrateManagedFieldsPatch(info.Object, managersToAdopt, helmManager) + if err != nil { + return false, errors.Wrap(err, "failed to generate patch for upgrading managed fields") + } else if patchData == nil { + // no work to do. + return false, nil + } + + obj, err = helper.Patch(info.Namespace, info.Name, types.JSONPatchType, patchData, nil) + if err != nil { + if !apierrors.IsConflict(err) { + return false, errors.Wrap(err, "unexpected error patching managed fields on object") + } + // retry on conflicts, but refresh object first + if err = info.Get(); err != nil { + return false, errors.Wrap(err, "unexpected error refreshing object") + } + continue + } + info.Refresh(obj, true) + return true, nil + } + return false, nil +} + +// createMigrateManagedFieldsPatch Calculates a minimal JSON Patch to send to upgrade managed fields +func createMigrateManagedFieldsPatch( + obj runtime.Object, + managersToAdopt []string, + helmManager string, +) ([]byte, error) { + accessor, err := meta.Accessor(obj) + if err != nil { + return nil, err + } + + managedFields := accessor.GetManagedFields() + filteredManagers := accessor.GetManagedFields() + // note that we also adopt previous non-apply operations from the helm manager. + for managerName := range sets.New(managersToAdopt...).Insert(helmManager) { + filteredManagers, err = upgradedManagedFields( + filteredManagers, + managerName, + helmManager, + ) + if err != nil { + return nil, err + } + } + + if reflect.DeepEqual(managedFields, filteredManagers) { + // If the managed fields have not changed from the transformed version, + // there is no patch to perform + return nil, nil + } + + // Create a patch with a diff between old and new objects. + // Just include all managed fields since that is only thing that will change + // + // Also include test for RV to avoid race condition + jsonPatch := []map[string]interface{}{ + { + "op": "replace", + "path": "/metadata/managedFields", + "value": filteredManagers, + }, + { + // Use "replace" instead of "test" operation so that etcd rejects with + // 409 conflict instead of apiserver with an invalid request + "op": "replace", + "path": "/metadata/resourceVersion", + "value": accessor.GetResourceVersion(), + }, + } + + return json.Marshal(jsonPatch) +} + +// Returns a copy of the provided managed fields that has been migrated from +// client-side-apply to server-side-apply, or an error if there was an issue +func upgradedManagedFields( + managedFields []metav1.ManagedFieldsEntry, + oldManager, + newManager string, +) ([]metav1.ManagedFieldsEntry, error) { + if managedFields == nil { + return nil, nil + } + + // Create managed fields clone since we modify the values + managedFieldsCopy := make([]metav1.ManagedFieldsEntry, len(managedFields)) + if copy(managedFieldsCopy, managedFields) != len(managedFields) { + return nil, errors.New("failed to copy managed fields") + } + managedFields = managedFieldsCopy + + // Locate new manager + replaceIndex, ok := findFirstIndex(managedFields, + func(entry metav1.ManagedFieldsEntry) bool { + return entry.Manager == newManager && + entry.Operation == metav1.ManagedFieldsOperationApply + }) + + if !ok { + return nil, errors.New("apply: unexpected error - no manager found") + } + err := unionManagerIntoIndex(managedFields, replaceIndex, oldManager) + if err != nil { + return nil, err + } + + // Create version of managed fields without the old field manager. + filteredManagers := filter(managedFields, func(entry metav1.ManagedFieldsEntry) bool { + if entry.Manager != oldManager { + // keep unaffected entries + return true + } else if oldManager != newManager { + // remove if a different field manager entirely. + return false + } + // special-case: if migrating the same field manager, only remove the old non-Apply entries. + return (entry.Manager == newManager && entry.Operation == metav1.ManagedFieldsOperationApply) + }) + + return filteredManagers, nil +} + +func unionManagerIntoIndex( + entries []metav1.ManagedFieldsEntry, + targetIndex int, + oldManager string, +) error { + ssaManager := entries[targetIndex] + + // find any other manager of same APIVersion, union ssa fields with it. + oldManagerIndex, ok := findFirstIndex(entries, + func(entry metav1.ManagedFieldsEntry) bool { + return entry.Manager == oldManager && + entry.Operation == metav1.ManagedFieldsOperationUpdate && + entry.APIVersion == ssaManager.APIVersion + }) + + targetFieldSet, err := decodeManagedFieldsEntrySet(ssaManager) + if err != nil { + return fmt.Errorf("failed to convert fields to set: %w", err) + } + + combinedFieldSet := &targetFieldSet + + // Union the old manager with the new manager. Do nothing if + // there was no good candidate found + if ok { + csaManager := entries[oldManagerIndex] + + csaFieldSet, err := decodeManagedFieldsEntrySet(csaManager) + if err != nil { + return fmt.Errorf("failed to convert fields to set: %w", err) + } + + combinedFieldSet = combinedFieldSet.Union(&csaFieldSet) + } + + // Encode the fields back to the serialized format + err = encodeManagedFieldsEntrySet(&entries[targetIndex], *combinedFieldSet) + if err != nil { + return fmt.Errorf("failed to encode field set: %w", err) + } + + return nil +} + +func findFirstIndex[T any]( + collection []T, + predicate func(T) bool, +) (int, bool) { + for idx, entry := range collection { + if predicate(entry) { + return idx, true + } + } + + return -1, false +} + +func filter[T any]( + collection []T, + predicate func(T) bool, +) []T { + result := make([]T, 0, len(collection)) + + for _, value := range collection { + if predicate(value) { + result = append(result, value) + } + } + + if len(result) == 0 { + return nil + } + + return result +} + +// Included from fieldmanager.internal to avoid dependency cycle +// FieldsToSet creates a set paths from an input trie of fields +func decodeManagedFieldsEntrySet(f metav1.ManagedFieldsEntry) (s fieldpath.Set, err error) { + err = s.FromJSON(bytes.NewReader(f.FieldsV1.Raw)) + return s, err +} + +// SetToFields creates a trie of fields from an input set of paths +func encodeManagedFieldsEntrySet(f *metav1.ManagedFieldsEntry, s fieldpath.Set) (err error) { + f.FieldsV1.Raw, err = s.ToJSON() + return err +}