Use watcher with retries to wait for resources

When waiting for resources use the `ListWatchUntil` instead of
`UntilWithoutRetry` so that if the connection drops between tiller and
the API while waiting the operation can still succeed.

Signed-off-by: Richard Connon <richard.connon@oracle.com>
pull/6014/head
Richard Connon 5 years ago
parent 70af7ba4db
commit 55ccdd040d

@ -41,6 +41,7 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
@ -51,6 +52,7 @@ import (
"k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/resource" "k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
cachetools "k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch" watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/core"
@ -810,10 +812,7 @@ func getSelectorFromObject(obj runtime.Object) (map[string]string, bool) {
} }
func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error { func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error {
w, err := resource.NewHelper(info.Client, info.Mapping).WatchSingle(info.Namespace, info.Name, info.ResourceVersion) lw := cachetools.NewListWatchFromClient(info.Client, info.Mapping.Resource.Resource, info.Namespace, fields.Everything())
if err != nil {
return err
}
kind := info.Mapping.GroupVersionKind.Kind kind := info.Mapping.GroupVersionKind.Kind
c.Log("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeout) c.Log("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeout)
@ -826,7 +825,7 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel() defer cancel()
_, err = watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) { _, err := watchtools.ListWatchUntil(ctx, lw, func(e watch.Event) (bool, error) {
switch e.Type { switch e.Type {
case watch.Added, watch.Modified: case watch.Added, watch.Modified:
// For things like a secret or a config map, this is the best indicator // For things like a secret or a config map, this is the best indicator
@ -914,15 +913,12 @@ func (c *Client) WaitAndGetCompletedPodPhase(namespace string, reader io.Reader,
} }
func (c *Client) watchPodUntilComplete(timeout time.Duration, info *resource.Info) error { func (c *Client) watchPodUntilComplete(timeout time.Duration, info *resource.Info) error {
w, err := resource.NewHelper(info.Client, info.Mapping).WatchSingle(info.Namespace, info.Name, info.ResourceVersion) lw := cachetools.NewListWatchFromClient(info.Client, info.Mapping.Resource.Resource, info.Namespace, fields.Everything())
if err != nil {
return err
}
c.Log("Watching pod %s for completion with timeout of %v", info.Name, timeout) c.Log("Watching pod %s for completion with timeout of %v", info.Name, timeout)
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel() defer cancel()
_, err = watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) { _, err := watchtools.ListWatchUntil(ctx, lw, func(e watch.Event) (bool, error) {
return isPodComplete(e) return isPodComplete(e)
}) })

Loading…
Cancel
Save