diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 17a4a1759..7c77a4d6b 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -93,6 +93,15 @@ func (c *Client) Create(namespace string, reader io.Reader) error { return perform(c, namespace, reader, createResource) } +func (c *Client) newBuilder(namespace string, reader io.Reader) *resource.Builder { + return c.NewBuilder(c.IncludeThirdPartyAPIs). + ContinueOnError(). + NamespaceParam(namespace). + DefaultNamespace(). + Stream(reader, ""). + Flatten() +} + // Get gets kubernetes resources as pretty printed string // // Namespace will set the namespace @@ -153,32 +162,20 @@ func (c *Client) Get(namespace string, reader io.Reader) (string, error) { // // Namespace will set the namespaces func (c *Client) Update(namespace string, currentReader, targetReader io.Reader) error { - current := c.NewBuilder(c.IncludeThirdPartyAPIs). - ContinueOnError(). - NamespaceParam(namespace). - DefaultNamespace(). - Stream(currentReader, ""). - Flatten(). - Do() - - target := c.NewBuilder(c.IncludeThirdPartyAPIs). - ContinueOnError(). - NamespaceParam(namespace). - DefaultNamespace(). - Stream(targetReader, ""). - Flatten(). - Do() - - currentInfos, err := current.Infos() + currentInfos, err := c.newBuilder(namespace, currentReader).Do().Infos() if err != nil { - return err + return fmt.Errorf("failed decoding reader into objects: %s", err) + } + + target := c.newBuilder(namespace, targetReader).Do() + if target.Err() != nil { + return fmt.Errorf("failed decoding reader into objects: %s", target.Err()) } targetInfos := []*resource.Info{} updateErrors := []string{} err = target.Visit(func(info *resource.Info, err error) error { - targetInfos = append(targetInfos, info) if err != nil { return err @@ -194,7 +191,7 @@ func (c *Client) Update(namespace string, currentReader, targetReader io.Reader) // Since the resource does not exist, create it. if err := createResource(info); err != nil { - return err + return fmt.Errorf("failed to create resource: %s", err) } kind := info.Mapping.GroupVersionKind.Kind @@ -219,17 +216,14 @@ func (c *Client) Update(namespace string, currentReader, targetReader io.Reader) return nil }) - deleteUnwantedResources(currentInfos, targetInfos) - if err != nil { return err } else if len(updateErrors) != 0 { return fmt.Errorf(strings.Join(updateErrors, " && ")) } - + deleteUnwantedResources(currentInfos, targetInfos) return nil - } // Delete deletes kubernetes resources from an io.reader @@ -261,7 +255,6 @@ func skipIfNotFound(err error) error { log.Printf("%v", err) return nil } - return err } @@ -284,36 +277,17 @@ func (c *Client) WatchUntilReady(namespace string, reader io.Reader) error { } func perform(c *Client, namespace string, reader io.Reader, fn ResourceActorFunc) error { - r := c.NewBuilder(c.IncludeThirdPartyAPIs). - ContinueOnError(). - NamespaceParam(namespace). - DefaultNamespace(). - Stream(reader, ""). - Flatten(). - Do() - - if r.Err() != nil { - return r.Err() + infos, err := c.newBuilder(namespace, reader).Do().Infos() + switch { + case err != nil: + return err + case len(infos) == 0: + return fmt.Errorf("no objects visited") } - - count := 0 - err := r.Visit(func(info *resource.Info, err error) error { - if err != nil { + for _, info := range infos { + if err := fn(info); err != nil { return err } - err = fn(info) - - if err == nil { - count++ - } - return err - }) - - if err != nil { - return err - } - if count == 0 { - return fmt.Errorf("no objects visited") } return nil }