From e4c44d0226bf25ffd4a537ad8b8a75d137f17dda Mon Sep 17 00:00:00 2001 From: Nick Schuch Date: Fri, 24 Aug 2018 23:39:17 +1000 Subject: [PATCH] Stream Job output --- pkg/kube/client.go | 156 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 153 insertions(+), 3 deletions(-) diff --git a/pkg/kube/client.go b/pkg/kube/client.go index bf530408e..8b92fb70d 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -17,15 +17,18 @@ limitations under the License. package kube // import "k8s.io/helm/pkg/kube" import ( + "bufio" "bytes" "encoding/json" "fmt" "io" "log" "strings" + "sync" "time" jsonpatch "github.com/evanphx/json-patch" + "github.com/heptio/workgroup" goerrors "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" appsv1beta1 "k8s.io/api/apps/v1beta1" @@ -329,6 +332,12 @@ func (c *Client) skipIfNotFound(err error) error { func (c *Client) watchTimeout(t time.Duration) ResourceActorFunc { return func(info *resource.Info) error { + kind := info.Mapping.GroupVersionKind.Kind + + if kind == "Job" { + return c.streamJobLogsUntilReady(t, info) + } + return c.watchUntilReady(t, info) } } @@ -560,6 +569,150 @@ func getSelectorFromObject(obj runtime.Object) (map[string]string, error) { } } +func (c *Client) streamJobLogsUntilReady(timeout time.Duration, info *resource.Info) error { + job, ok := info.Object.(*batchinternal.Job) + if !ok { + return goerrors.Errorf("expected %s to be a *batch.Job, got %T", "", info.Object) + } + + client, err := c.ClientSet() + if err != nil { + return err + } + + // Refresh the Job object to get the controller generated labels for querying pods eg. + // + // Pod Template: + // Labels: controller-uid=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxx + // job-name=example + job, err = client.Batch().Jobs(job.ObjectMeta.Namespace).Get(job.ObjectMeta.Name, metav1.GetOptions{}) + if err != nil { + return goerrors.Wrap(err, "cannot load Job") + } + + var wg workgroup.Group + + // Wait for the timeout. + wg.Add(func(stop <-chan struct{}) error { + timer := time.NewTimer(timeout) + select { + case <-timer.C: + return nil + case <-stop: + return nil + } + }) + + // Wait for Job to Complete or Fail. + wg.Add(func(stop <-chan struct{}) error { + watcher, err := client.Batch().Jobs(job.ObjectMeta.Namespace).Watch(metav1.ListOptions{ + FieldSelector: fields.SelectorFromSet(map[string]string{ + "metadata.name": job.ObjectMeta.Name, + }).String(), + }) + if err != nil { + return goerrors.Wrap(err, "cannot create Job event watcher") + } + + for { + select { + case e := <-watcher.ResultChan(): + if e.Object == nil { + continue + } + + job, ok := e.Object.(*batchinternal.Job) + if !ok { + continue + } + + if e.Type != watch.Modified { + continue + } + + for _, condition := range job.Status.Conditions { + if condition.Type == batchinternal.JobComplete && condition.Status == core.ConditionTrue { + return nil + } else if condition.Type == batchinternal.JobFailed && condition.Status == core.ConditionTrue { + return goerrors.Errorf("job failed: %s", condition.Reason) + } + } + case <-stop: + watcher.Stop() + return nil + } + } + }) + + // Stream all the Pod Containers which are running during the Jobs lifetime. + wg.Add(func(stop <-chan struct{}) error { + watcher, err := client.Core().Pods(job.ObjectMeta.Namespace).Watch(metav1.ListOptions{ + LabelSelector: labels.SelectorFromSet(job.Spec.Selector.MatchLabels).String(), + FieldSelector: fields.SelectorFromSet(map[string]string{ + "status.phase": "Running", + }).String(), + }) + if err != nil { + return goerrors.Wrap(err, "cannot create Pod event watcher") + } + + var wg sync.WaitGroup + + for { + select { + case e := <-watcher.ResultChan(): + if e.Object == nil { + continue + } + + pod, ok := e.Object.(*core.Pod) + if !ok { + continue + } + + if e.Type != watch.Added { + continue + } + + for _, container := range pod.Spec.Containers { + wg.Add(1) + + go func(pod *core.Pod, container core.Container) { + defer wg.Done() + + req := client.Core().Pods(pod.ObjectMeta.Namespace).GetLogs(pod.ObjectMeta.Name, &core.PodLogOptions{ + Container: container.Name, + Follow: true, + }) + + rc, err := req.Stream() + if err != nil { + // @todo. panic?? + panic(err) + } + defer rc.Close() + + scanner := bufio.NewScanner(rc) + for scanner.Scan() { + // @todo, Use a writer, not straight to stdout. + fmt.Printf("%s | %s | %s\n", pod.ObjectMeta.Name, container.Name, scanner.Text()) + } + }(pod, container) + } + case <-stop: + watcher.Stop() + + // Wait for all the container streams to complete. + wg.Wait() + + return nil + } + } + }) + + return wg.Run() +} + func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error { w, err := resource.NewHelper(info.Client, info.Mapping).WatchSingle(info.Namespace, info.Name, info.ResourceVersion) if err != nil { @@ -583,9 +736,6 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err // the status go into a good state. For other types, like ReplicaSet // we don't really do anything to support these as hooks. c.Log("Add/Modify event for %s: %v", info.Name, e.Type) - if kind == "Job" { - return c.waitForJob(e, info.Name) - } return true, nil case watch.Deleted: c.Log("Deleted event for %s", info.Name)