Stream Job output

pull/4523/head
Nick Schuch 7 years ago
parent c6df39597c
commit e4c44d0226

@ -17,15 +17,18 @@ limitations under the License.
package kube // import "k8s.io/helm/pkg/kube" package kube // import "k8s.io/helm/pkg/kube"
import ( import (
"bufio"
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"log" "log"
"strings" "strings"
"sync"
"time" "time"
jsonpatch "github.com/evanphx/json-patch" jsonpatch "github.com/evanphx/json-patch"
"github.com/heptio/workgroup"
goerrors "github.com/pkg/errors" goerrors "github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1" appsv1 "k8s.io/api/apps/v1"
appsv1beta1 "k8s.io/api/apps/v1beta1" appsv1beta1 "k8s.io/api/apps/v1beta1"
@ -329,6 +332,12 @@ func (c *Client) skipIfNotFound(err error) error {
func (c *Client) watchTimeout(t time.Duration) ResourceActorFunc { func (c *Client) watchTimeout(t time.Duration) ResourceActorFunc {
return func(info *resource.Info) error { return func(info *resource.Info) error {
kind := info.Mapping.GroupVersionKind.Kind
if kind == "Job" {
return c.streamJobLogsUntilReady(t, info)
}
return c.watchUntilReady(t, info) return c.watchUntilReady(t, info)
} }
} }
@ -560,6 +569,150 @@ func getSelectorFromObject(obj runtime.Object) (map[string]string, error) {
} }
} }
func (c *Client) streamJobLogsUntilReady(timeout time.Duration, info *resource.Info) error {
job, ok := info.Object.(*batchinternal.Job)
if !ok {
return goerrors.Errorf("expected %s to be a *batch.Job, got %T", "", info.Object)
}
client, err := c.ClientSet()
if err != nil {
return err
}
// Refresh the Job object to get the controller generated labels for querying pods eg.
//
// Pod Template:
// Labels: controller-uid=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxx
// job-name=example
job, err = client.Batch().Jobs(job.ObjectMeta.Namespace).Get(job.ObjectMeta.Name, metav1.GetOptions{})
if err != nil {
return goerrors.Wrap(err, "cannot load Job")
}
var wg workgroup.Group
// Wait for the timeout.
wg.Add(func(stop <-chan struct{}) error {
timer := time.NewTimer(timeout)
select {
case <-timer.C:
return nil
case <-stop:
return nil
}
})
// Wait for Job to Complete or Fail.
wg.Add(func(stop <-chan struct{}) error {
watcher, err := client.Batch().Jobs(job.ObjectMeta.Namespace).Watch(metav1.ListOptions{
FieldSelector: fields.SelectorFromSet(map[string]string{
"metadata.name": job.ObjectMeta.Name,
}).String(),
})
if err != nil {
return goerrors.Wrap(err, "cannot create Job event watcher")
}
for {
select {
case e := <-watcher.ResultChan():
if e.Object == nil {
continue
}
job, ok := e.Object.(*batchinternal.Job)
if !ok {
continue
}
if e.Type != watch.Modified {
continue
}
for _, condition := range job.Status.Conditions {
if condition.Type == batchinternal.JobComplete && condition.Status == core.ConditionTrue {
return nil
} else if condition.Type == batchinternal.JobFailed && condition.Status == core.ConditionTrue {
return goerrors.Errorf("job failed: %s", condition.Reason)
}
}
case <-stop:
watcher.Stop()
return nil
}
}
})
// Stream all the Pod Containers which are running during the Jobs lifetime.
wg.Add(func(stop <-chan struct{}) error {
watcher, err := client.Core().Pods(job.ObjectMeta.Namespace).Watch(metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(job.Spec.Selector.MatchLabels).String(),
FieldSelector: fields.SelectorFromSet(map[string]string{
"status.phase": "Running",
}).String(),
})
if err != nil {
return goerrors.Wrap(err, "cannot create Pod event watcher")
}
var wg sync.WaitGroup
for {
select {
case e := <-watcher.ResultChan():
if e.Object == nil {
continue
}
pod, ok := e.Object.(*core.Pod)
if !ok {
continue
}
if e.Type != watch.Added {
continue
}
for _, container := range pod.Spec.Containers {
wg.Add(1)
go func(pod *core.Pod, container core.Container) {
defer wg.Done()
req := client.Core().Pods(pod.ObjectMeta.Namespace).GetLogs(pod.ObjectMeta.Name, &core.PodLogOptions{
Container: container.Name,
Follow: true,
})
rc, err := req.Stream()
if err != nil {
// @todo. panic??
panic(err)
}
defer rc.Close()
scanner := bufio.NewScanner(rc)
for scanner.Scan() {
// @todo, Use a writer, not straight to stdout.
fmt.Printf("%s | %s | %s\n", pod.ObjectMeta.Name, container.Name, scanner.Text())
}
}(pod, container)
}
case <-stop:
watcher.Stop()
// Wait for all the container streams to complete.
wg.Wait()
return nil
}
}
})
return wg.Run()
}
func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error { func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error {
w, err := resource.NewHelper(info.Client, info.Mapping).WatchSingle(info.Namespace, info.Name, info.ResourceVersion) w, err := resource.NewHelper(info.Client, info.Mapping).WatchSingle(info.Namespace, info.Name, info.ResourceVersion)
if err != nil { if err != nil {
@ -583,9 +736,6 @@ func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) err
// the status go into a good state. For other types, like ReplicaSet // the status go into a good state. For other types, like ReplicaSet
// we don't really do anything to support these as hooks. // we don't really do anything to support these as hooks.
c.Log("Add/Modify event for %s: %v", info.Name, e.Type) c.Log("Add/Modify event for %s: %v", info.Name, e.Type)
if kind == "Job" {
return c.waitForJob(e, info.Name)
}
return true, nil return true, nil
case watch.Deleted: case watch.Deleted:
c.Log("Deleted event for %s", info.Name) c.Log("Deleted event for %s", info.Name)

Loading…
Cancel
Save