fix(kube): fix wait and recreate

fixes #2006
pull/2016/head
Adam Reese 7 years ago
parent 2806c0d59c
commit 12db1f945f

@ -34,7 +34,6 @@ import (
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
batchinternal "k8s.io/kubernetes/pkg/apis/batch"
batch "k8s.io/kubernetes/pkg/apis/batch/v1"
extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
conditions "k8s.io/kubernetes/pkg/client/unversioned"
@ -408,7 +407,15 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object,
return nil
}
selector, err := getSelectorFromObject(currentObj)
versioned, err := c.AsVersionedObject(target.Object)
if runtime.IsNotRegisteredError(err) {
return nil
}
if err != nil {
return err
}
selector, err := getSelectorFromObject(versioned)
if err != nil {
return nil
}
@ -536,6 +543,17 @@ func getPods(client *internalclientset.Clientset, namespace string, selector map
return list.Items, err
}
// AsVersionedObject converts a runtime.object to a versioned object.
func (c *Client) AsVersionedObject(obj runtime.Object) (runtime.Object, error) {
json, err := runtime.Encode(runtime.UnstructuredJSONScheme, obj)
if err != nil {
return nil, err
}
versions := &runtime.VersionedObjects{}
err = runtime.DecodeInto(c.Decoder(true), json, versions)
return versions.First(), err
}
// waitForResources polls to get the current status of all pods, PVCs, and Services
// until all are ready or a timeout is reached
func (c *Client) waitForResources(timeout time.Duration, created Result) error {
@ -546,20 +564,24 @@ func (c *Client) waitForResources(timeout time.Duration, created Result) error {
services := []api.Service{}
pvc := []api.PersistentVolumeClaim{}
for _, v := range created {
switch value := v.Object.(type) {
case (*api.ReplicationController):
obj, err := c.AsVersionedObject(v.Object)
if err != nil && !runtime.IsNotRegisteredError(err) {
return false, err
}
switch value := obj.(type) {
case (*v1.ReplicationController):
list, err := getPods(client, value.Namespace, value.Spec.Selector)
if err != nil {
return false, err
}
pods = append(pods, list...)
case (*api.Pod):
case (*v1.Pod):
pod, err := client.Pods(value.Namespace).Get(value.Name)
if err != nil {
return false, err
}
pods = append(pods, *pod)
case (*extensionsinternal.Deployment):
case (*extensions.Deployment):
// Get the RS children first
rs, err := client.ReplicaSets(value.Namespace).List(api.ListOptions{
FieldSelector: fields.Everything(),
@ -575,7 +597,7 @@ func (c *Client) waitForResources(timeout time.Duration, created Result) error {
}
pods = append(pods, list...)
}
case (*extensionsinternal.DaemonSet):
case (*extensions.DaemonSet):
list, err := getPods(client, value.Namespace, value.Spec.Selector.MatchLabels)
if err != nil {
return false, err
@ -587,19 +609,19 @@ func (c *Client) waitForResources(timeout time.Duration, created Result) error {
return false, err
}
pods = append(pods, list...)
case (*extensionsinternal.ReplicaSet):
case (*extensions.ReplicaSet):
list, err := getPods(client, value.Namespace, value.Spec.Selector.MatchLabels)
if err != nil {
return false, err
}
pods = append(pods, list...)
case (*api.PersistentVolumeClaim):
case (*v1.PersistentVolumeClaim):
claim, err := client.PersistentVolumeClaims(value.Namespace).Get(value.Name)
if err != nil {
return false, err
}
pvc = append(pvc, *claim)
case (*api.Service):
case (*v1.Service):
svc, err := client.Services(value.Namespace).Get(value.Name)
if err != nil {
return false, err

Loading…
Cancel
Save