diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 333e3ab37..8415cdff4 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -75,7 +75,7 @@ type ResourceActorFunc func(*resource.Info) error // Create creates kubernetes resources from an io.reader // // Namespace will set the namespace -func (c *Client) Create(namespace string, reader io.Reader, timeout int64, shouldWait bool) error { +func (c *Client) Create(namespace string, reader io.Reader, writer io.Writer, timeout int64, shouldWait bool) error { client, err := c.ClientSet() if err != nil { return err @@ -91,7 +91,7 @@ func (c *Client) Create(namespace string, reader io.Reader, timeout int64, shoul return err } if shouldWait { - return c.waitForResources(time.Duration(timeout)*time.Second, infos) + return c.waitForResources(time.Duration(timeout)*time.Second, infos, writer) } return nil } @@ -208,7 +208,7 @@ func (c *Client) Get(namespace string, reader io.Reader) (string, error) { // not present in the target configuration // // Namespace will set the namespaces -func (c *Client) Update(namespace string, originalReader, targetReader io.Reader, recreate bool, timeout int64, shouldWait bool) error { +func (c *Client) Update(namespace string, originalReader, targetReader io.Reader, writer io.Writer, recreate bool, timeout int64, shouldWait bool) error { original, err := c.BuildUnstructured(namespace, originalReader) if err != nil { return fmt.Errorf("failed decoding reader into objects: %s", err) @@ -269,7 +269,7 @@ func (c *Client) Update(namespace string, originalReader, targetReader io.Reader } } if shouldWait { - return c.waitForResources(time.Duration(timeout)*time.Second, target) + return c.waitForResources(time.Duration(timeout)*time.Second, target, writer) } return nil } diff --git a/pkg/kube/wait.go b/pkg/kube/wait.go index bef366b4f..97de2c3f8 100644 --- a/pkg/kube/wait.go +++ b/pkg/kube/wait.go @@ -17,6 +17,8 @@ limitations under the License. package kube // import "k8s.io/helm/pkg/kube" import ( + "fmt" + "io" "log" "time" @@ -25,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/pkg/api" "k8s.io/kubernetes/pkg/api/v1" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" @@ -43,9 +46,8 @@ type deployment struct { // waitForResources polls to get the current status of all pods, PVCs, and Services // until all are ready or a timeout is reached -func (c *Client) waitForResources(timeout time.Duration, created Result) error { +func (c *Client) waitForResources(timeout time.Duration, created Result, writer io.Writer) error { log.Printf("beginning wait for resources with timeout of %v", timeout) - cs, _ := c.ClientSet() client := versionedClientsetForDeployment(cs) return wait.Poll(2*time.Second, timeout, func() (bool, error) { @@ -132,6 +134,7 @@ func (c *Client) waitForResources(timeout time.Duration, created Result) error { services = append(services, *svc) } } + writeEvents(client, writer, pods, services, pvc, deployments) return podsReady(pods) && servicesReady(services) && volumesReady(pvc) && deploymentsReady(deployments), nil }) } @@ -194,3 +197,48 @@ func versionedClientsetForDeployment(internalClient internalclientset.Interface) ExtensionsV1beta1Client: extensionsclient.New(internalClient.Extensions().RESTClient()), } } + +func writeEvents( + client clientset.Interface, + writer io.Writer, + pods []v1.Pod, + services []v1.Service, + pvcs []v1.PersistentVolumeClaim, + deployments []deployment, +) { + // Aggregate all events from k8s on each poll: + // Pods: + for _, pod := range pods { + events, _ := client.Core().Events(pod.Namespace).Search(api.Scheme, runtime.Object(&pod)) + write(writer, events, "Pod") + } + // Services: + for _, service := range services { + events, _ := client.Core().Events(service.Namespace).Search(api.Scheme, runtime.Object(&service)) + write(writer, events, "Service") + } + // PVCs: + for _, pvc := range pvcs { + events, _ := client.Core().Events(pvc.Namespace).Search(api.Scheme, runtime.Object(&pvc)) + write(writer, events, "PVC") + } + // Deployments: + for _, deployment := range deployments { + events, _ := client.Core().Events(deployment.deployment.Namespace).Search(api.Scheme, runtime.Object(deployment.deployment)) + write(writer, events, "Deployment") + } +} + +func write(writer io.Writer, events *v1.EventList, resource string) { + for _, event := range events.Items { + fmt.Fprintf( + writer, + "[%s] [%s] [Type]: %s [Reason]: %s [Message]: %s\n", + time.Now().Format("2006/01/02 15:04:05"), + resource, + event.Type, + event.Reason, + event.Message, + ) + } +} diff --git a/pkg/releasetesting/environment.go b/pkg/releasetesting/environment.go index a56721333..4a2deb5d4 100644 --- a/pkg/releasetesting/environment.go +++ b/pkg/releasetesting/environment.go @@ -39,7 +39,8 @@ type Environment struct { func (env *Environment) createTestPod(test *test) error { b := bytes.NewBufferString(test.manifest) - if err := env.KubeClient.Create(env.Namespace, b, env.Timeout, false); err != nil { + eventsLog := bytes.NewBuffer(nil) + if err := env.KubeClient.Create(env.Namespace, b, eventsLog, env.Timeout, false); err != nil { log.Printf(err.Error()) test.result.Info = err.Error() test.result.Status = release.TestRun_FAILURE diff --git a/pkg/tiller/environment/environment.go b/pkg/tiller/environment/environment.go index 26516474b..508f8f704 100644 --- a/pkg/tiller/environment/environment.go +++ b/pkg/tiller/environment/environment.go @@ -114,7 +114,7 @@ type KubeClient interface { // // reader must contain a YAML stream (one or more YAML documents separated // by "\n---\n"). - Create(namespace string, reader io.Reader, timeout int64, shouldWait bool) error + Create(namespace string, reader io.Reader, writer io.Writer, timeout int64, shouldWait bool) error // Get gets one or more resources. Returned string hsa the format like kubectl // provides with the column headers separating the resource types. @@ -147,7 +147,7 @@ type KubeClient interface { // // reader must contain a YAML stream (one or more YAML documents separated // by "\n---\n"). - Update(namespace string, originalReader, modifiedReader io.Reader, recreate bool, timeout int64, shouldWait bool) error + Update(namespace string, originalReader, modifiedReader io.Reader, writer io.Writer, recreate bool, timeout int64, shouldWait bool) error Build(namespace string, reader io.Reader) (kube.Result, error) BuildUnstructured(namespace string, reader io.Reader) (kube.Result, error) diff --git a/pkg/tiller/release_server.go b/pkg/tiller/release_server.go index 5bd0a48de..af04a4c2e 100644 --- a/pkg/tiller/release_server.go +++ b/pkg/tiller/release_server.go @@ -20,6 +20,7 @@ import ( "bytes" "errors" "fmt" + "io" "log" "path" "regexp" @@ -318,7 +319,8 @@ func (s *ReleaseServer) performUpdate(originalRelease, updatedRelease *release.R } } - if err := s.performKubeUpdate(originalRelease, updatedRelease, req.Recreate, req.Timeout, req.Wait); err != nil { + eventsLog := bytes.NewBuffer(nil) + if err := s.performKubeUpdate(originalRelease, updatedRelease, eventsLog, req.Recreate, req.Timeout, req.Wait); err != nil { msg := fmt.Sprintf("Upgrade %q failed: %s", updatedRelease.Name, err) log.Printf("warning: %s", msg) originalRelease.Info.Status.Code = release.Status_SUPERSEDED @@ -326,7 +328,12 @@ func (s *ReleaseServer) performUpdate(originalRelease, updatedRelease *release.R updatedRelease.Info.Description = msg s.recordRelease(originalRelease, true) s.recordRelease(updatedRelease, false) - return res, err + return res, fmt.Errorf( + "\nTiller log:\n%s\nupdate %s failed: %s", + eventsLog, + originalRelease.Name, + err, + ) } // post-upgrade hooks @@ -499,7 +506,8 @@ func (s *ReleaseServer) performRollback(currentRelease, targetRelease *release.R } } - if err := s.performKubeUpdate(currentRelease, targetRelease, req.Recreate, req.Timeout, req.Wait); err != nil { + eventsLog := bytes.NewBuffer(nil) + if err := s.performKubeUpdate(currentRelease, targetRelease, eventsLog, req.Recreate, req.Timeout, req.Wait); err != nil { msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err) log.Printf("warning: %s", msg) currentRelease.Info.Status.Code = release.Status_SUPERSEDED @@ -507,7 +515,12 @@ func (s *ReleaseServer) performRollback(currentRelease, targetRelease *release.R targetRelease.Info.Description = msg s.recordRelease(currentRelease, true) s.recordRelease(targetRelease, false) - return res, err + return res, fmt.Errorf( + "\nTiller log:\n%s\nrollback %s failed: %s", + eventsLog, + currentRelease.Name, + err, + ) } // post-rollback hooks @@ -525,11 +538,11 @@ func (s *ReleaseServer) performRollback(currentRelease, targetRelease *release.R return res, nil } -func (s *ReleaseServer) performKubeUpdate(currentRelease, targetRelease *release.Release, recreate bool, timeout int64, shouldWait bool) error { +func (s *ReleaseServer) performKubeUpdate(currentRelease, targetRelease *release.Release, writer io.Writer, recreate bool, timeout int64, shouldWait bool) error { kubeCli := s.env.KubeClient current := bytes.NewBufferString(currentRelease.Manifest) target := bytes.NewBufferString(targetRelease.Manifest) - return kubeCli.Update(targetRelease.Namespace, current, target, recreate, timeout, shouldWait) + return kubeCli.Update(targetRelease.Namespace, current, target, writer, recreate, timeout, shouldWait) } // prepareRollback finds the previous release and prepares a new release object with @@ -864,6 +877,7 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install } } + eventsLog := bytes.NewBuffer(nil) switch h, err := s.env.Releases.History(req.Name); { // if this is a replace operation, append to the release history case req.ReuseName && err == nil && len(h) >= 1: @@ -881,7 +895,7 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install // so as to append to the old release's history r.Version = old.Version + 1 - if err := s.performKubeUpdate(old, r, false, req.Timeout, req.Wait); err != nil { + if err := s.performKubeUpdate(old, r, eventsLog, false, req.Timeout, req.Wait); err != nil { msg := fmt.Sprintf("Release replace %q failed: %s", r.Name, err) log.Printf("warning: %s", msg) old.Info.Status.Code = release.Status_SUPERSEDED @@ -889,20 +903,30 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install r.Info.Description = msg s.recordRelease(old, true) s.recordRelease(r, false) - return res, err + return res, fmt.Errorf( + "\nTiller log:\n%s\nrelease %s failed: %s", + eventsLog, + r.Name, + err, + ) } default: // nothing to replace, create as normal // regular manifests b := bytes.NewBufferString(r.Manifest) - if err := s.env.KubeClient.Create(r.Namespace, b, req.Timeout, req.Wait); err != nil { + if err := s.env.KubeClient.Create(r.Namespace, b, eventsLog, req.Timeout, req.Wait); err != nil { msg := fmt.Sprintf("Release %q failed: %s", r.Name, err) log.Printf("warning: %s", msg) r.Info.Status.Code = release.Status_FAILED r.Info.Description = msg s.recordRelease(r, false) - return res, fmt.Errorf("release %s failed: %s", r.Name, err) + return res, fmt.Errorf( + "\nTiller log:\n%s\nrelease %s failed: %s", + eventsLog, + r.Name, + err, + ) // add resourse statuses here } } @@ -954,7 +978,7 @@ func (s *ReleaseServer) execHook(hs []*release.Hook, name, namespace, hook strin for _, h := range executingHooks { b := bytes.NewBufferString(h.Manifest) - if err := kubeCli.Create(namespace, b, timeout, false); err != nil { + if err := kubeCli.Create(namespace, b, nil, timeout, false); err != nil { log.Printf("warning: Release %q %s %s failed: %s", name, hook, h.Path, err) return err }