diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 6509a1583..198f20f3c 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -37,18 +37,17 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericclioptions/resource" + "k8s.io/client-go/kubernetes/scheme" watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubernetes/pkg/api/legacyscheme" - batchinternal "k8s.io/kubernetes/pkg/apis/batch" - "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/kubectl/cmd/get" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/kubectl/validation" @@ -111,7 +110,7 @@ func (c *Client) Create(namespace string, reader io.Reader, timeout int64, shoul func (c *Client) newBuilder(namespace string, reader io.Reader) *resource.Result { return c.NewBuilder(). ContinueOnError(). - WithScheme(legacyscheme.Scheme). + WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...). Schema(c.validator()). NamespaceParam(namespace). DefaultNamespace(). @@ -163,7 +162,7 @@ func (c *Client) Get(namespace string, reader io.Reader) (string, error) { return "", err } - var objPods = make(map[string][]core.Pod) + var objPods = make(map[string][]v1.Pod) missing := []string{} err = perform(infos, func(info *resource.Info) error { @@ -493,7 +492,6 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, } pods, err := client.CoreV1().Pods(target.Namespace).List(metav1.ListOptions{ - FieldSelector: fields.Everything().String(), LabelSelector: labels.Set(selector).AsSelector().String(), }) if err != nil { @@ -601,15 +599,15 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err // // This operates on an event returned from a watcher. func (c *Client) waitForJob(e watch.Event, name string) (bool, error) { - o, ok := e.Object.(*batchinternal.Job) + o, ok := e.Object.(*batch.Job) if !ok { return true, fmt.Errorf("Expected %s to be a *batch.Job, got %T", name, e.Object) } for _, c := range o.Status.Conditions { - if c.Type == batchinternal.JobComplete && c.Status == core.ConditionTrue { + if c.Type == batch.JobComplete && c.Status == v1.ConditionTrue { return true, nil - } else if c.Type == batchinternal.JobFailed && c.Status == core.ConditionTrue { + } else if c.Type == batch.JobFailed && c.Status == v1.ConditionTrue { return true, fmt.Errorf("Job failed: %s", c.Reason) } } @@ -690,7 +688,7 @@ func isPodComplete(event watch.Event) (bool, error) { //get a kubernetes resources' relation pods // kubernetes resource used select labels to relate pods -func (c *Client) getSelectRelationPod(info *resource.Info, objPods map[string][]core.Pod) (map[string][]core.Pod, error) { +func (c *Client) getSelectRelationPod(info *resource.Info, objPods map[string][]v1.Pod) (map[string][]v1.Pod, error) { if info == nil { return objPods, nil } @@ -706,25 +704,14 @@ func (c *Client) getSelectRelationPod(info *resource.Info, objPods map[string][] client, _ := c.KubernetesClientSet() pods, err := client.CoreV1().Pods(info.Namespace).List(metav1.ListOptions{ - FieldSelector: fields.Everything().String(), LabelSelector: labels.Set(selector).AsSelector().String(), }) if err != nil { return objPods, err } - for _, externalPod := range pods.Items { - pod := core.Pod{} - legacyscheme.Scheme.Convert(&externalPod, &pod, nil) - if pod.APIVersion == "" { - pod.APIVersion = "v1" - } - - if pod.Kind == "" { - pod.Kind = "Pod" - } - vk := pod.GroupVersionKind().Version + "/" + pod.GroupVersionKind().Kind - + for _, pod := range pods.Items { + vk := "v1/Pod" if !isFoundPod(objPods[vk], pod) { objPods[vk] = append(objPods[vk], pod) } @@ -732,7 +719,7 @@ func (c *Client) getSelectRelationPod(info *resource.Info, objPods map[string][] return objPods, nil } -func isFoundPod(podItem []core.Pod, pod core.Pod) bool { +func isFoundPod(podItem []v1.Pod, pod v1.Pod) bool { for _, value := range podItem { if (value.Namespace == pod.Namespace) && (value.Name == pod.Name) { return true @@ -742,7 +729,16 @@ func isFoundPod(podItem []core.Pod, pod core.Pod) bool { } func asVersioned(info *resource.Info) runtime.Object { - return cmdutil.AsDefaultVersionedOrOriginal(info.Object, info.Mapping) + converter := runtime.ObjectConvertor(scheme.Scheme) + groupVersioner := runtime.GroupVersioner(schema.GroupVersions(scheme.Scheme.PrioritizedVersionsAllGroups())) + if info.Mapping != nil { + groupVersioner = info.Mapping.GroupVersionKind.GroupVersion() + } + + if obj, err := converter.ConvertToVersion(info.Object, groupVersioner); err == nil { + return obj + } + return info.Object } func asInternal(info *resource.Info) (runtime.Object, error) { diff --git a/pkg/kube/client_test.go b/pkg/kube/client_test.go index e505e8f37..de33881c8 100644 --- a/pkg/kube/client_test.go +++ b/pkg/kube/client_test.go @@ -29,16 +29,17 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/cli-runtime/pkg/genericclioptions/resource" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest/fake" - "k8s.io/kubernetes/pkg/api/legacyscheme" - "k8s.io/kubernetes/pkg/api/testapi" cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing" - "k8s.io/kubernetes/pkg/kubectl/scheme" ) -var unstructuredSerializer = resource.UnstructuredPlusDefaultContentConfig().NegotiatedSerializer +var ( + codec = scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...) + unstructuredSerializer = resource.UnstructuredPlusDefaultContentConfig().NegotiatedSerializer +) -func objBody(codec runtime.Codec, obj runtime.Object) io.ReadCloser { +func objBody(obj runtime.Object) io.ReadCloser { return ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(codec, obj)))) } @@ -89,7 +90,7 @@ func notFoundBody() *metav1.Status { func newResponse(code int, obj runtime.Object) (*http.Response, error) { header := http.Header{} header.Set("Content-Type", runtime.ContentTypeJSON) - body := ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(testapi.Default.Codec(), obj)))) + body := ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(codec, obj)))) return &http.Response{StatusCode: code, Header: header, Body: body}, nil } @@ -161,19 +162,18 @@ func TestUpdate(t *testing.T) { Factory: tf, Log: nopLogger, } - codec := legacyscheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...) - if err := c.Update(v1.NamespaceDefault, objBody(codec, &listA), objBody(codec, &listB), false, false, 0, false); err != nil { + if err := c.Update(v1.NamespaceDefault, objBody(&listA), objBody(&listB), false, false, 0, false); err != nil { t.Fatal(err) } // TODO: Find a way to test methods that use Client Set // Test with a wait - // if err := c.Update("test", objBody(codec, &listB), objBody(codec, &listC), false, 300, true); err != nil { + // if err := c.Update("test", objBody(&listB), objBody(&listC), false, 300, true); err != nil { // t.Fatal(err) // } // Test with a wait should fail // TODO: A way to make this not based off of an extremely short timeout? - // if err := c.Update("test", objBody(codec, &listC), objBody(codec, &listA), false, 2, true); err != nil { + // if err := c.Update("test", objBody(&listC), objBody(&listA), false, 2, true); err != nil { // t.Fatal(err) // } expectedActions := []string{