(2/n): patch managed fields

pull/13547/head
Sidharth Menon 9 months ago
parent bf104d1af0
commit 603e88af6f

@ -59,6 +59,12 @@ import (
cmdutil "k8s.io/kubectl/pkg/cmd/util" cmdutil "k8s.io/kubectl/pkg/cmd/util"
) )
var FieldManagersToAdopt = []string{
"kubectl",
"kubectl-client-side-apply",
"kubectl-edit",
}
// ErrNoObjectsVisited indicates that during a visit operation, no matching objects were found. // ErrNoObjectsVisited indicates that during a visit operation, no matching objects were found.
var ErrNoObjectsVisited = errors.New("no objects visited") var ErrNoObjectsVisited = errors.New("no objects visited")
@ -425,6 +431,9 @@ func (c *Client) Update(original, target ResourceList, force bool) (*Result, err
updateErrors = append(updateErrors, err.Error()) updateErrors = append(updateErrors, err.Error())
} }
// afterwards, we will reconcile managed fields on these objects and re-apply if necessary.
// the reason we do this is to avoid 3 network requests in the "happy/normal" case with SSA.
// we want to first (1) apply, (2) try to check if a migration is needed, and (3) run the migration if we must.
return nil return nil
}) })
@ -673,12 +682,37 @@ func applyResource(target *resource.Info, force bool) error {
if err != nil { if err != nil {
return errors.Wrap(err, "failed to encode target object") return errors.Wrap(err, "failed to encode target object")
} }
_, err = helper.Patch(target.Namespace, target.Name, types.ApplyPatchType, data, &metav1.PatchOptions{ obj, err := helper.Patch(target.Namespace, target.Name, types.ApplyPatchType, data, &metav1.PatchOptions{
Force: &force, Force: &force,
}) })
if err != nil { if err != nil {
return errors.Wrapf(err, "cannot patch %q with kind %s", target.Name, target.Mapping.GroupVersionKind.Kind) return errors.Wrapf(err, "cannot patch %q with kind %s", target.Name, target.Mapping.GroupVersionKind.Kind)
} }
target.Refresh(obj, true)
// now, we will try to migrate managed fields of the object, if necessary.
didMigrate, err := migrateManagedFields(
helper,
target,
FieldManagersToAdopt,
getManagedFieldsManager(),
)
if err != nil {
return errors.Wrap(err, "failed to upgrade managed fields for helm ssa migration")
} else if !didMigrate {
// in the average case, there's no work to do - the object is already up to date.
return nil
}
// now we know that there were some extra managed fields lying around. Re-send original SSA to the api-server
// to clear the old managed fields.
obj, err = helper.Patch(target.Namespace, target.Name, types.ApplyPatchType, data, &metav1.PatchOptions{
Force: &force,
})
if err != nil {
return errors.Wrapf(err, "cannot patch %q with kind %s", target.Name, target.Mapping.GroupVersionKind.Kind)
}
target.Refresh(obj, true)
return nil return nil
} }

@ -0,0 +1,245 @@
package kube
import (
"bytes"
"encoding/json"
"fmt"
"reflect"
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/cli-runtime/pkg/resource"
"sigs.k8s.io/structured-merge-diff/v4/fieldpath"
)
func migrateManagedFields(
helper *resource.Helper,
info *resource.Info,
managersToAdopt []string,
helmManager string,
) (didMigrate bool, err error) {
// retry a few times on conflict errors.
for i := 0; i < 5; i++ {
var patchData []byte
var obj runtime.Object
patchData, err := createMigrateManagedFieldsPatch(info.Object, managersToAdopt, helmManager)
if err != nil {
return false, errors.Wrap(err, "failed to generate patch for upgrading managed fields")
} else if patchData == nil {
// no work to do.
return false, nil
}
obj, err = helper.Patch(info.Namespace, info.Name, types.JSONPatchType, patchData, nil)
if err != nil {
if !apierrors.IsConflict(err) {
return false, errors.Wrap(err, "unexpected error patching managed fields on object")
}
// retry on conflicts, but refresh object first
if err = info.Get(); err != nil {
return false, errors.Wrap(err, "unexpected error refreshing object")
}
continue
}
info.Refresh(obj, true)
return true, nil
}
return false, nil
}
// createMigrateManagedFieldsPatch Calculates a minimal JSON Patch to send to upgrade managed fields
func createMigrateManagedFieldsPatch(
obj runtime.Object,
managersToAdopt []string,
helmManager string,
) ([]byte, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return nil, err
}
managedFields := accessor.GetManagedFields()
filteredManagers := accessor.GetManagedFields()
// note that we also adopt previous non-apply operations from the helm manager.
for managerName := range sets.New(managersToAdopt...).Insert(helmManager) {
filteredManagers, err = upgradedManagedFields(
filteredManagers,
managerName,
helmManager,
)
if err != nil {
return nil, err
}
}
if reflect.DeepEqual(managedFields, filteredManagers) {
// If the managed fields have not changed from the transformed version,
// there is no patch to perform
return nil, nil
}
// Create a patch with a diff between old and new objects.
// Just include all managed fields since that is only thing that will change
//
// Also include test for RV to avoid race condition
jsonPatch := []map[string]interface{}{
{
"op": "replace",
"path": "/metadata/managedFields",
"value": filteredManagers,
},
{
// Use "replace" instead of "test" operation so that etcd rejects with
// 409 conflict instead of apiserver with an invalid request
"op": "replace",
"path": "/metadata/resourceVersion",
"value": accessor.GetResourceVersion(),
},
}
return json.Marshal(jsonPatch)
}
// Returns a copy of the provided managed fields that has been migrated from
// client-side-apply to server-side-apply, or an error if there was an issue
func upgradedManagedFields(
managedFields []metav1.ManagedFieldsEntry,
oldManager,
newManager string,
) ([]metav1.ManagedFieldsEntry, error) {
if managedFields == nil {
return nil, nil
}
// Create managed fields clone since we modify the values
managedFieldsCopy := make([]metav1.ManagedFieldsEntry, len(managedFields))
if copy(managedFieldsCopy, managedFields) != len(managedFields) {
return nil, errors.New("failed to copy managed fields")
}
managedFields = managedFieldsCopy
// Locate new manager
replaceIndex, ok := findFirstIndex(managedFields,
func(entry metav1.ManagedFieldsEntry) bool {
return entry.Manager == newManager &&
entry.Operation == metav1.ManagedFieldsOperationApply
})
if !ok {
return nil, errors.New("apply: unexpected error - no manager found")
}
err := unionManagerIntoIndex(managedFields, replaceIndex, oldManager)
if err != nil {
return nil, err
}
// Create version of managed fields without the old field manager.
filteredManagers := filter(managedFields, func(entry metav1.ManagedFieldsEntry) bool {
if entry.Manager != oldManager {
// keep unaffected entries
return true
} else if oldManager != newManager {
// remove if a different field manager entirely.
return false
}
// special-case: if migrating the same field manager, only remove the old non-Apply entries.
return (entry.Manager == newManager && entry.Operation == metav1.ManagedFieldsOperationApply)
})
return filteredManagers, nil
}
func unionManagerIntoIndex(
entries []metav1.ManagedFieldsEntry,
targetIndex int,
oldManager string,
) error {
ssaManager := entries[targetIndex]
// find any other manager of same APIVersion, union ssa fields with it.
oldManagerIndex, ok := findFirstIndex(entries,
func(entry metav1.ManagedFieldsEntry) bool {
return entry.Manager == oldManager &&
entry.Operation == metav1.ManagedFieldsOperationUpdate &&
entry.APIVersion == ssaManager.APIVersion
})
targetFieldSet, err := decodeManagedFieldsEntrySet(ssaManager)
if err != nil {
return fmt.Errorf("failed to convert fields to set: %w", err)
}
combinedFieldSet := &targetFieldSet
// Union the old manager with the new manager. Do nothing if
// there was no good candidate found
if ok {
csaManager := entries[oldManagerIndex]
csaFieldSet, err := decodeManagedFieldsEntrySet(csaManager)
if err != nil {
return fmt.Errorf("failed to convert fields to set: %w", err)
}
combinedFieldSet = combinedFieldSet.Union(&csaFieldSet)
}
// Encode the fields back to the serialized format
err = encodeManagedFieldsEntrySet(&entries[targetIndex], *combinedFieldSet)
if err != nil {
return fmt.Errorf("failed to encode field set: %w", err)
}
return nil
}
func findFirstIndex[T any](
collection []T,
predicate func(T) bool,
) (int, bool) {
for idx, entry := range collection {
if predicate(entry) {
return idx, true
}
}
return -1, false
}
func filter[T any](
collection []T,
predicate func(T) bool,
) []T {
result := make([]T, 0, len(collection))
for _, value := range collection {
if predicate(value) {
result = append(result, value)
}
}
if len(result) == 0 {
return nil
}
return result
}
// Included from fieldmanager.internal to avoid dependency cycle
// FieldsToSet creates a set paths from an input trie of fields
func decodeManagedFieldsEntrySet(f metav1.ManagedFieldsEntry) (s fieldpath.Set, err error) {
err = s.FromJSON(bytes.NewReader(f.FieldsV1.Raw))
return s, err
}
// SetToFields creates a trie of fields from an input set of paths
func encodeManagedFieldsEntrySet(f *metav1.ManagedFieldsEntry, s fieldpath.Set) (err error) {
f.FieldsV1.Raw, err = s.ToJSON()
return err
}
Loading…
Cancel
Save