diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 9df833a43..368a11b54 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -55,6 +55,7 @@ import ( "k8s.io/client-go/rest" cachetools "k8s.io/client-go/tools/cache" watchtools "k8s.io/client-go/tools/watch" + "k8s.io/client-go/util/retry" cmdutil "k8s.io/kubectl/pkg/cmd/util" ) @@ -596,17 +597,25 @@ func batchPerform(infos ResourceList, fn func(*resource.Info) error, errs chan<- } func createResource(info *resource.Info) error { - obj, err := resource.NewHelper(info.Client, info.Mapping).WithFieldManager(getManagedFieldsManager()).Create(info.Namespace, true, info.Object) - if err != nil { - return err - } - return info.Refresh(obj, true) + return retry.RetryOnConflict( + retry.DefaultRetry, + func() error { + obj, err := resource.NewHelper(info.Client, info.Mapping).WithFieldManager(getManagedFieldsManager()).Create(info.Namespace, true, info.Object) + if err != nil { + return err + } + return info.Refresh(obj, true) + }) } func deleteResource(info *resource.Info, policy metav1.DeletionPropagation) error { - opts := &metav1.DeleteOptions{PropagationPolicy: &policy} - _, err := resource.NewHelper(info.Client, info.Mapping).WithFieldManager(getManagedFieldsManager()).DeleteWithOptions(info.Namespace, info.Name, opts) - return err + return retry.RetryOnConflict( + retry.DefaultRetry, + func() error { + opts := &metav1.DeleteOptions{PropagationPolicy: &policy} + _, err := resource.NewHelper(info.Client, info.Mapping).WithFieldManager(getManagedFieldsManager()).DeleteWithOptions(info.Namespace, info.Name, opts) + return err + }) } func createPatch(target *resource.Info, current runtime.Object) ([]byte, types.PatchType, error) { diff --git a/pkg/kube/client_test.go b/pkg/kube/client_test.go index 55aa5d8ed..4ceb5f4b3 100644 --- a/pkg/kube/client_test.go +++ b/pkg/kube/client_test.go @@ -90,6 +90,13 @@ func newResponse(code int, obj runtime.Object) (*http.Response, error) { return &http.Response{StatusCode: code, Header: header, Body: body}, nil } +func newResponseJSON(code int, json []byte) (*http.Response, error) { + header := http.Header{} + header.Set("Content-Type", runtime.ContentTypeJSON) + body := io.NopCloser(bytes.NewReader(json)) + return &http.Response{StatusCode: code, Header: header, Body: body}, nil +} + func newTestClient(t *testing.T) *Client { testFactory := cmdtesting.NewTestFactory() t.Cleanup(testFactory.Cleanup) @@ -100,6 +107,103 @@ func newTestClient(t *testing.T) *Client { } } +func TestCreate(t *testing.T) { + // Note: c.Create with the fake client can currently only test creation of a single pod in the same list. When testing + // with more than one pod, c.Create will run into a data race as it calls perform->batchPerform which performs creation + // in batches. The first data race is on accessing var actions and can be fixed easily with a mutex lock in the Client + // function. The second data race though is something in the fake client itself in func (c *RESTClient) do(...) + // when it stores the req: c.Req = req and cannot (?) be fixed easily. + listA := newPodList("starfish") + listB := newPodList("dolphin") + + var actions []string + var iterationCounter int + + c := newTestClient(t) + c.Factory.(*cmdtesting.TestFactory).UnstructuredClient = &fake.RESTClient{ + NegotiatedSerializer: unstructuredSerializer, + Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { + path, method := req.URL.Path, req.Method + bodyReader := new(strings.Builder) + _, _ = io.Copy(bodyReader, req.Body) + body := bodyReader.String() + actions = append(actions, path+":"+method) + t.Logf("got request %s %s", path, method) + switch { + case path == "/namespaces/default/pods" && method == "POST": + if strings.Contains(body, "starfish") { + if iterationCounter < 2 { + iterationCounter++ + return newResponseJSON(409, resourceQuotaConflict) + } + return newResponse(200, &listA.Items[0]) + } + return newResponseJSON(409, resourceQuotaConflict) + default: + t.Fatalf("unexpected request: %s %s", method, path) + return nil, nil + } + }), + } + + t.Run("Create success", func(t *testing.T) { + list, err := c.Build(objBody(&listA), false) + if err != nil { + t.Fatal(err) + } + + result, err := c.Create(list) + if err != nil { + t.Fatal(err) + } + + if len(result.Created) != 1 { + t.Errorf("expected 1 resource created, got %d", len(result.Created)) + } + + expectedActions := []string{ + "/namespaces/default/pods:POST", + "/namespaces/default/pods:POST", + "/namespaces/default/pods:POST", + } + if len(expectedActions) != len(actions) { + t.Fatalf("unexpected number of requests, expected %d, got %d", len(expectedActions), len(actions)) + } + for k, v := range expectedActions { + if actions[k] != v { + t.Errorf("expected %s request got %s", v, actions[k]) + } + } + }) + + t.Run("Create failure", func(t *testing.T) { + list, err := c.Build(objBody(&listB), false) + if err != nil { + t.Fatal(err) + } + + _, err = c.Create(list) + if err == nil { + t.Errorf("expected error") + } + + expectedString := "Operation cannot be fulfilled on resourcequotas \"quota\": the object has been modified; " + + "please apply your changes to the latest version and try again" + if !strings.Contains(err.Error(), expectedString) { + t.Errorf("Unexpected error message: %q", err) + } + + expectedActions := []string{ + "/namespaces/default/pods:POST", + } + for k, v := range actions { + if expectedActions[0] != v { + t.Errorf("expected %s request got %s", v, actions[k]) + } + } + }) +} + func TestUpdate(t *testing.T) { listA := newPodList("starfish", "otter", "squid") listB := newPodList("starfish", "otter", "dolphin") @@ -108,6 +212,7 @@ func TestUpdate(t *testing.T) { listC.Items[0].Spec.Containers[0].Ports = []v1.ContainerPort{{Name: "https", ContainerPort: 443}} var actions []string + var iterationCounter int c := newTestClient(t) c.Factory.(*cmdtesting.TestFactory).UnstructuredClient = &fake.RESTClient{ @@ -146,6 +251,10 @@ func TestUpdate(t *testing.T) { } return newResponse(200, &listB.Items[0]) case p == "/namespaces/default/pods" && m == "POST": + if iterationCounter < 2 { + iterationCounter++ + return newResponseJSON(409, resourceQuotaConflict) + } return newResponse(200, &listB.Items[1]) case p == "/namespaces/default/pods/squid" && m == "DELETE": return newResponse(200, &listB.Items[1]) @@ -199,7 +308,9 @@ func TestUpdate(t *testing.T) { "/namespaces/default/pods/otter:GET", "/namespaces/default/pods/otter:GET", "/namespaces/default/pods/dolphin:GET", - "/namespaces/default/pods:POST", + "/namespaces/default/pods:POST", // create dolphin + "/namespaces/default/pods:POST", // retry due to 409 + "/namespaces/default/pods:POST", // retry due to 409 "/namespaces/default/pods/squid:GET", "/namespaces/default/pods/squid:DELETE", } @@ -558,3 +669,6 @@ spec: ports: - containerPort: 80 ` + +var resourceQuotaConflict = []byte(` +{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Operation cannot be fulfilled on resourcequotas \"quota\": the object has been modified; please apply your changes to the latest version and try again","reason":"Conflict","details":{"name":"quota","kind":"resourcequotas"},"code":409}`)