Sending k8s events to helm-cli

pull/2386/head
theairkit 9 years ago
parent 12300745d3
commit eb46bb14c9

@ -75,7 +75,7 @@ type ResourceActorFunc func(*resource.Info) error
// Create creates kubernetes resources from an io.reader // Create creates kubernetes resources from an io.reader
// //
// Namespace will set the namespace // Namespace will set the namespace
func (c *Client) Create(namespace string, reader io.Reader, timeout int64, shouldWait bool) error { func (c *Client) Create(namespace string, reader io.Reader, writer io.Writer, timeout int64, shouldWait bool) error {
client, err := c.ClientSet() client, err := c.ClientSet()
if err != nil { if err != nil {
return err return err
@ -91,7 +91,7 @@ func (c *Client) Create(namespace string, reader io.Reader, timeout int64, shoul
return err return err
} }
if shouldWait { if shouldWait {
return c.waitForResources(time.Duration(timeout)*time.Second, infos) return c.waitForResources(time.Duration(timeout)*time.Second, infos, writer)
} }
return nil return nil
} }
@ -208,7 +208,7 @@ func (c *Client) Get(namespace string, reader io.Reader) (string, error) {
// not present in the target configuration // not present in the target configuration
// //
// Namespace will set the namespaces // Namespace will set the namespaces
func (c *Client) Update(namespace string, originalReader, targetReader io.Reader, recreate bool, timeout int64, shouldWait bool) error { func (c *Client) Update(namespace string, originalReader, targetReader io.Reader, writer io.Writer, recreate bool, timeout int64, shouldWait bool) error {
original, err := c.BuildUnstructured(namespace, originalReader) original, err := c.BuildUnstructured(namespace, originalReader)
if err != nil { if err != nil {
return fmt.Errorf("failed decoding reader into objects: %s", err) return fmt.Errorf("failed decoding reader into objects: %s", err)
@ -269,7 +269,7 @@ func (c *Client) Update(namespace string, originalReader, targetReader io.Reader
} }
} }
if shouldWait { if shouldWait {
return c.waitForResources(time.Duration(timeout)*time.Second, target) return c.waitForResources(time.Duration(timeout)*time.Second, target, writer)
} }
return nil return nil
} }

@ -17,6 +17,8 @@ limitations under the License.
package kube // import "k8s.io/helm/pkg/kube" package kube // import "k8s.io/helm/pkg/kube"
import ( import (
"fmt"
"io"
"log" "log"
"time" "time"
@ -25,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/pkg/api"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
@ -43,9 +46,8 @@ type deployment struct {
// waitForResources polls to get the current status of all pods, PVCs, and Services // waitForResources polls to get the current status of all pods, PVCs, and Services
// until all are ready or a timeout is reached // until all are ready or a timeout is reached
func (c *Client) waitForResources(timeout time.Duration, created Result) error { func (c *Client) waitForResources(timeout time.Duration, created Result, writer io.Writer) error {
log.Printf("beginning wait for resources with timeout of %v", timeout) log.Printf("beginning wait for resources with timeout of %v", timeout)
cs, _ := c.ClientSet() cs, _ := c.ClientSet()
client := versionedClientsetForDeployment(cs) client := versionedClientsetForDeployment(cs)
return wait.Poll(2*time.Second, timeout, func() (bool, error) { return wait.Poll(2*time.Second, timeout, func() (bool, error) {
@ -132,6 +134,7 @@ func (c *Client) waitForResources(timeout time.Duration, created Result) error {
services = append(services, *svc) services = append(services, *svc)
} }
} }
writeEvents(client, writer, pods, services, pvc, deployments)
return podsReady(pods) && servicesReady(services) && volumesReady(pvc) && deploymentsReady(deployments), nil return podsReady(pods) && servicesReady(services) && volumesReady(pvc) && deploymentsReady(deployments), nil
}) })
} }
@ -194,3 +197,48 @@ func versionedClientsetForDeployment(internalClient internalclientset.Interface)
ExtensionsV1beta1Client: extensionsclient.New(internalClient.Extensions().RESTClient()), ExtensionsV1beta1Client: extensionsclient.New(internalClient.Extensions().RESTClient()),
} }
} }
func writeEvents(
client clientset.Interface,
writer io.Writer,
pods []v1.Pod,
services []v1.Service,
pvcs []v1.PersistentVolumeClaim,
deployments []deployment,
) {
// Aggregate all events from k8s on each poll:
// Pods:
for _, pod := range pods {
events, _ := client.Core().Events(pod.Namespace).Search(api.Scheme, runtime.Object(&pod))
write(writer, events, "Pod")
}
// Services:
for _, service := range services {
events, _ := client.Core().Events(service.Namespace).Search(api.Scheme, runtime.Object(&service))
write(writer, events, "Service")
}
// PVCs:
for _, pvc := range pvcs {
events, _ := client.Core().Events(pvc.Namespace).Search(api.Scheme, runtime.Object(&pvc))
write(writer, events, "PVC")
}
// Deployments:
for _, deployment := range deployments {
events, _ := client.Core().Events(deployment.deployment.Namespace).Search(api.Scheme, runtime.Object(deployment.deployment))
write(writer, events, "Deployment")
}
}
func write(writer io.Writer, events *v1.EventList, resource string) {
for _, event := range events.Items {
fmt.Fprintf(
writer,
"[%s] [%s] [Type]: %s [Reason]: %s [Message]: %s\n",
time.Now().Format("2006/01/02 15:04:05"),
resource,
event.Type,
event.Reason,
event.Message,
)
}
}

@ -39,7 +39,8 @@ type Environment struct {
func (env *Environment) createTestPod(test *test) error { func (env *Environment) createTestPod(test *test) error {
b := bytes.NewBufferString(test.manifest) b := bytes.NewBufferString(test.manifest)
if err := env.KubeClient.Create(env.Namespace, b, env.Timeout, false); err != nil { eventsLog := bytes.NewBuffer(nil)
if err := env.KubeClient.Create(env.Namespace, b, eventsLog, env.Timeout, false); err != nil {
log.Printf(err.Error()) log.Printf(err.Error())
test.result.Info = err.Error() test.result.Info = err.Error()
test.result.Status = release.TestRun_FAILURE test.result.Status = release.TestRun_FAILURE

@ -114,7 +114,7 @@ type KubeClient interface {
// //
// reader must contain a YAML stream (one or more YAML documents separated // reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n"). // by "\n---\n").
Create(namespace string, reader io.Reader, timeout int64, shouldWait bool) error Create(namespace string, reader io.Reader, writer io.Writer, timeout int64, shouldWait bool) error
// Get gets one or more resources. Returned string hsa the format like kubectl // Get gets one or more resources. Returned string hsa the format like kubectl
// provides with the column headers separating the resource types. // provides with the column headers separating the resource types.
@ -147,7 +147,7 @@ type KubeClient interface {
// //
// reader must contain a YAML stream (one or more YAML documents separated // reader must contain a YAML stream (one or more YAML documents separated
// by "\n---\n"). // by "\n---\n").
Update(namespace string, originalReader, modifiedReader io.Reader, recreate bool, timeout int64, shouldWait bool) error Update(namespace string, originalReader, modifiedReader io.Reader, writer io.Writer, recreate bool, timeout int64, shouldWait bool) error
Build(namespace string, reader io.Reader) (kube.Result, error) Build(namespace string, reader io.Reader) (kube.Result, error)
BuildUnstructured(namespace string, reader io.Reader) (kube.Result, error) BuildUnstructured(namespace string, reader io.Reader) (kube.Result, error)

@ -20,6 +20,7 @@ import (
"bytes" "bytes"
"errors" "errors"
"fmt" "fmt"
"io"
"log" "log"
"path" "path"
"regexp" "regexp"
@ -318,7 +319,8 @@ func (s *ReleaseServer) performUpdate(originalRelease, updatedRelease *release.R
} }
} }
if err := s.performKubeUpdate(originalRelease, updatedRelease, req.Recreate, req.Timeout, req.Wait); err != nil { eventsLog := bytes.NewBuffer(nil)
if err := s.performKubeUpdate(originalRelease, updatedRelease, eventsLog, req.Recreate, req.Timeout, req.Wait); err != nil {
msg := fmt.Sprintf("Upgrade %q failed: %s", updatedRelease.Name, err) msg := fmt.Sprintf("Upgrade %q failed: %s", updatedRelease.Name, err)
log.Printf("warning: %s", msg) log.Printf("warning: %s", msg)
originalRelease.Info.Status.Code = release.Status_SUPERSEDED originalRelease.Info.Status.Code = release.Status_SUPERSEDED
@ -326,7 +328,12 @@ func (s *ReleaseServer) performUpdate(originalRelease, updatedRelease *release.R
updatedRelease.Info.Description = msg updatedRelease.Info.Description = msg
s.recordRelease(originalRelease, true) s.recordRelease(originalRelease, true)
s.recordRelease(updatedRelease, false) s.recordRelease(updatedRelease, false)
return res, err return res, fmt.Errorf(
"\nTiller log:\n%s\nupdate %s failed: %s",
eventsLog,
originalRelease.Name,
err,
)
} }
// post-upgrade hooks // post-upgrade hooks
@ -499,7 +506,8 @@ func (s *ReleaseServer) performRollback(currentRelease, targetRelease *release.R
} }
} }
if err := s.performKubeUpdate(currentRelease, targetRelease, req.Recreate, req.Timeout, req.Wait); err != nil { eventsLog := bytes.NewBuffer(nil)
if err := s.performKubeUpdate(currentRelease, targetRelease, eventsLog, req.Recreate, req.Timeout, req.Wait); err != nil {
msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err) msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err)
log.Printf("warning: %s", msg) log.Printf("warning: %s", msg)
currentRelease.Info.Status.Code = release.Status_SUPERSEDED currentRelease.Info.Status.Code = release.Status_SUPERSEDED
@ -507,7 +515,12 @@ func (s *ReleaseServer) performRollback(currentRelease, targetRelease *release.R
targetRelease.Info.Description = msg targetRelease.Info.Description = msg
s.recordRelease(currentRelease, true) s.recordRelease(currentRelease, true)
s.recordRelease(targetRelease, false) s.recordRelease(targetRelease, false)
return res, err return res, fmt.Errorf(
"\nTiller log:\n%s\nrollback %s failed: %s",
eventsLog,
currentRelease.Name,
err,
)
} }
// post-rollback hooks // post-rollback hooks
@ -525,11 +538,11 @@ func (s *ReleaseServer) performRollback(currentRelease, targetRelease *release.R
return res, nil return res, nil
} }
func (s *ReleaseServer) performKubeUpdate(currentRelease, targetRelease *release.Release, recreate bool, timeout int64, shouldWait bool) error { func (s *ReleaseServer) performKubeUpdate(currentRelease, targetRelease *release.Release, writer io.Writer, recreate bool, timeout int64, shouldWait bool) error {
kubeCli := s.env.KubeClient kubeCli := s.env.KubeClient
current := bytes.NewBufferString(currentRelease.Manifest) current := bytes.NewBufferString(currentRelease.Manifest)
target := bytes.NewBufferString(targetRelease.Manifest) target := bytes.NewBufferString(targetRelease.Manifest)
return kubeCli.Update(targetRelease.Namespace, current, target, recreate, timeout, shouldWait) return kubeCli.Update(targetRelease.Namespace, current, target, writer, recreate, timeout, shouldWait)
} }
// prepareRollback finds the previous release and prepares a new release object with // prepareRollback finds the previous release and prepares a new release object with
@ -864,6 +877,7 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install
} }
} }
eventsLog := bytes.NewBuffer(nil)
switch h, err := s.env.Releases.History(req.Name); { switch h, err := s.env.Releases.History(req.Name); {
// if this is a replace operation, append to the release history // if this is a replace operation, append to the release history
case req.ReuseName && err == nil && len(h) >= 1: case req.ReuseName && err == nil && len(h) >= 1:
@ -881,7 +895,7 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install
// so as to append to the old release's history // so as to append to the old release's history
r.Version = old.Version + 1 r.Version = old.Version + 1
if err := s.performKubeUpdate(old, r, false, req.Timeout, req.Wait); err != nil { if err := s.performKubeUpdate(old, r, eventsLog, false, req.Timeout, req.Wait); err != nil {
msg := fmt.Sprintf("Release replace %q failed: %s", r.Name, err) msg := fmt.Sprintf("Release replace %q failed: %s", r.Name, err)
log.Printf("warning: %s", msg) log.Printf("warning: %s", msg)
old.Info.Status.Code = release.Status_SUPERSEDED old.Info.Status.Code = release.Status_SUPERSEDED
@ -889,20 +903,30 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install
r.Info.Description = msg r.Info.Description = msg
s.recordRelease(old, true) s.recordRelease(old, true)
s.recordRelease(r, false) s.recordRelease(r, false)
return res, err return res, fmt.Errorf(
"\nTiller log:\n%s\nrelease %s failed: %s",
eventsLog,
r.Name,
err,
)
} }
default: default:
// nothing to replace, create as normal // nothing to replace, create as normal
// regular manifests // regular manifests
b := bytes.NewBufferString(r.Manifest) b := bytes.NewBufferString(r.Manifest)
if err := s.env.KubeClient.Create(r.Namespace, b, req.Timeout, req.Wait); err != nil { if err := s.env.KubeClient.Create(r.Namespace, b, eventsLog, req.Timeout, req.Wait); err != nil {
msg := fmt.Sprintf("Release %q failed: %s", r.Name, err) msg := fmt.Sprintf("Release %q failed: %s", r.Name, err)
log.Printf("warning: %s", msg) log.Printf("warning: %s", msg)
r.Info.Status.Code = release.Status_FAILED r.Info.Status.Code = release.Status_FAILED
r.Info.Description = msg r.Info.Description = msg
s.recordRelease(r, false) s.recordRelease(r, false)
return res, fmt.Errorf("release %s failed: %s", r.Name, err) return res, fmt.Errorf(
"\nTiller log:\n%s\nrelease %s failed: %s",
eventsLog,
r.Name,
err,
) // add resourse statuses here
} }
} }
@ -954,7 +978,7 @@ func (s *ReleaseServer) execHook(hs []*release.Hook, name, namespace, hook strin
for _, h := range executingHooks { for _, h := range executingHooks {
b := bytes.NewBufferString(h.Manifest) b := bytes.NewBufferString(h.Manifest)
if err := kubeCli.Create(namespace, b, timeout, false); err != nil { if err := kubeCli.Create(namespace, b, nil, timeout, false); err != nil {
log.Printf("warning: Release %q %s %s failed: %s", name, hook, h.Path, err) log.Printf("warning: Release %q %s %s failed: %s", name, hook, h.Path, err)
return err return err
} }

Loading…
Cancel
Save