WIP: reworked to event-based monitoring with goroutines

pull/3479/head
Timofey Kirillov 8 years ago
parent f9ff43ad5b
commit 5f652cdbe5

@ -16,8 +16,16 @@ message JobLogChunk {
repeated LogLine log_lines = 4;
}
message JobPodError {
string job_name = 1;
string pod_name = 2;
string container_name = 3;
string message = 4;
}
message WatchFeed {
JobLogChunk job_log_chunk = 1;
JobPodError job_pod_error = 2;
/*
* WatchFeed could contain one or multiple events from tiller at the same time.

@ -388,8 +388,9 @@ func (h *Client) install(ctx context.Context, req *rls.InstallReleaseRequest) (*
}
if resp.WatchFeed.GetJobLogChunk() != nil {
for _, line := range resp.WatchFeed.GetJobLogChunk().LogLines {
fmt.Printf("%s %s\n", line.Timestamp, line.Data) // TODO: make normal formatting as follows.
chunk := resp.WatchFeed.GetJobLogChunk()
for _, line := range chunk.LogLines {
fmt.Printf("{job %s / pod %s / container %s} %s %s\n", chunk.JobName, chunk.PodName, chunk.ContainerName, line.Timestamp, line.Data) // TODO: make normal formatting as follows.
// TODO The client could work like state machine:
// TODO when receiving job-pod-container log chunk print header "==> job X pod X container Y logs <==\n",
// TODO just like `tail -f *` works on multiple files at the same time.
@ -399,6 +400,8 @@ func (h *Client) install(ctx context.Context, req *rls.InstallReleaseRequest) (*
// TODO The main reason to stream userspace-events like ImagePullBackOff or CrashLoopBackOff is
// TODO to give user enough info so that user can debug templates without accessing cluster using kubectl.
}
} else if resp.WatchFeed.GetJobPodError() != nil {
fmt.Printf("ERROR: %v", resp.WatchFeed.GetJobPodError()) // TODO: normal formatting
} else {
finalResp = resp // TODO verify/debug this code
}

@ -2,21 +2,44 @@ package kube
import (
"bytes"
_ "fmt"
"fmt"
"io"
"sort"
_ "sort"
"strings"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/fields"
_ "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/kubectl/resource"
)
var (
WatchFeedStub = &WatchFeedProto{
WriteJobLogChunkFunc: func(JobLogChunk) error { return nil },
WriteJobPodErrorFunc: func(JobPodError) error { return nil },
}
)
type WatchFeed interface {
WriteJobLogChunk(*JobLogChunk) error
WriteJobLogChunk(JobLogChunk) error
WriteJobPodError(JobPodError) error
}
// Prototype-struct helper to create feed with callbacks specified in-place of creation (such as WatchFeedStub var)
type WatchFeedProto struct {
WriteJobLogChunkFunc func(JobLogChunk) error
WriteJobPodErrorFunc func(JobPodError) error
}
func (proto *WatchFeedProto) WriteJobLogChunk(arg JobLogChunk) error {
return proto.WriteJobLogChunkFunc(arg)
}
func (proto *WatchFeedProto) WriteJobPodError(arg JobPodError) error {
return proto.WriteJobPodErrorFunc(arg)
}
type LogLine struct {
@ -24,217 +47,325 @@ type LogLine struct {
Data string
}
type JobLogChunk struct {
JobName string
type PodLogChunk struct {
PodName string
ContainerName string
LogLines []LogLine
}
type WriteJobLogChunkFunc func(*JobLogChunk) error
type PodError struct {
Message string
PodName string
ContainerName string
}
type JobLogChunk struct {
PodLogChunk
JobName string
}
func (f WriteJobLogChunkFunc) WriteJobLogChunk(chunk *JobLogChunk) error {
return f(chunk)
type JobPodError struct {
PodError
JobName string
}
type WatchMonitor struct {
kube *Client
timeout time.Duration
watchFeed WatchFeed
Kube *Client
Timeout time.Duration
Namespace string
ResourceName string
UID types.UID
Namespace string
ResourceName string
InitialResourceVersion string
}
type PodWatchMonitor struct {
WatchMonitor
Manifest *core.Pod
PodLogChunk chan *PodLogChunk
PodError chan PodError
Error chan error
InitContainersNames []string
ProcessedInitContainersNames []string
ProcessedInitContainersIDs []string
ContainerMonitorStates map[string]string
ProcessedContainerLogTimestamps map[string]time.Time
ContainersNames []string
ProcessedContainersNames []string
ProcessedContainersIDs []string
InitContainersNames []string
ContainersNames []string
}
func (pod *PodWatchMonitor) GetMonitoredContainersNames() []string {
res := make([]string, 0)
func (pod *PodWatchMonitor) FollowContainerLogs(containerName string) error {
client, err := pod.Kube.ClientSet()
if err != nil {
return err
}
FilterProcessedContainersNames:
for _, name := range pod.ContainersNames {
for _, processedContainerName := range pod.ProcessedContainersNames {
if processedContainerName == name {
continue FilterProcessedContainersNames
}
}
res = append(res, name)
// var sinceTime *metav1.Time
// if v, found := pod.ProcessedContainerLogTimestamps[containerName]; found {
// sinceTime = &metav1.Time{v}
// }
req := client.Core().
Pods(pod.Namespace).
GetLogs(pod.ResourceName, &core.PodLogOptions{
Container: containerName,
Timestamps: true,
Follow: true,
})
readCloser, err := req.Stream()
if err != nil {
return err
}
defer readCloser.Close()
return res
}
lineBuf := bytes.Buffer{}
rawBuf := make([]byte, 4096)
func (pod *PodWatchMonitor) SetContainerProcessed(containerName string, containerID string) {
pod.ProcessedContainersNames = append(pod.ProcessedContainersNames, containerName)
pod.ProcessedContainersIDs = append(pod.ProcessedContainersIDs, containerID)
}
for {
n, err := readCloser.Read(rawBuf)
if err != nil && err == io.EOF {
break
} else if err != nil {
return err
}
func (pod *PodWatchMonitor) GetMonitoredInitContainersNames() []string {
res := make([]string, 0)
chunkLines := make([]LogLine, 0)
for i := 0; i < n; i++ {
if rawBuf[i] == '\n' {
lineParts := strings.SplitN(lineBuf.String(), " ", 2)
if len(lineParts) == 2 {
chunkLines = append(chunkLines, LogLine{Timestamp: lineParts[0], Data: lineParts[1]})
}
FilterProcessedInitContainersNames:
for _, name := range pod.InitContainersNames {
for _, processedInitContainerName := range pod.ProcessedInitContainersNames {
if processedInitContainerName == name {
continue FilterProcessedInitContainersNames
lineBuf.Reset()
continue
}
lineBuf.WriteByte(rawBuf[i])
}
pod.PodLogChunk <- &PodLogChunk{
PodName: pod.ResourceName,
ContainerName: containerName,
LogLines: chunkLines,
}
res = append(res, name)
}
return res
}
return nil
func (pod *PodWatchMonitor) SetInitContainerProcessed(containerName string, containerID string) {
pod.ProcessedInitContainersNames = append(pod.ProcessedInitContainersNames, containerName)
pod.ProcessedInitContainersIDs = append(pod.ProcessedInitContainersIDs, containerID)
}
// buf := bytes.Buffer{}
// _, err = io.Copy(&buf, readCloser)
func (pod *PodWatchMonitor) RefreshManifest() error {
client, err := pod.kube.ClientSet()
if err != nil {
return err
}
// lines := strings.Split(strings.TrimSuffix(buf.String(), "\n"), "\n")
manifest, err := client.Core().
Pods(pod.Namespace).
Get(pod.ResourceName, metav1.GetOptions{})
if err != nil {
return err
}
pod.Manifest = manifest
// res := make([]LogLine, 0)
// for _, line := range lines {
// lineParts := strings.SplitN(line, " ", 2)
// if len(lineParts) == 2 {
// ll := LogLine{
// Timestamp: lineParts[0],
// Data: lineParts[1],
// }
// res = append(res, ll)
// pod.Kube.Log(">>> %s", ll)
// }
// }
return nil
}
// if len(res) > 0 {
// t, err := time.Parse(time.RFC3339, res[len(res)-1].Timestamp)
// if err != nil {
// return nil, err
// }
// pod.ProcessedContainerLogTimestamps[containerName] = t
// }
func (pod *PodWatchMonitor) GetReadyCondition() (res *core.PodCondition) {
for i, _ := range pod.Manifest.Status.Conditions {
if pod.Manifest.Status.Conditions[i].Type == "Ready" {
res = &pod.Manifest.Status.Conditions[i]
break
}
}
return
// logLines, err := pod.GetContainerLogs(containerName)
// if err != nil {
// return err
// }
// pod.PodLogChunk <- &PodLogChunk{
// PodName: pod.ResourceName,
// ContainerName: containerName,
// LogLines: logLines,
// }
// return res, nil
}
func (pod *PodWatchMonitor) GetInitContainerStatus(containerName string) (res *core.ContainerStatus) {
for i, _ := range pod.Manifest.Status.InitContainerStatuses {
if pod.Manifest.Status.InitContainerStatuses[i].Name == containerName {
res = &pod.Manifest.Status.InitContainerStatuses[i]
break
func (pod *PodWatchMonitor) WatchContainerLogs(containerName string) error {
for {
for _, containerName := range pod.ContainersNames {
switch pod.ContainerMonitorStates[containerName] {
case "Running", "Terminated":
return pod.FollowContainerLogs(containerName)
case "Waiting":
default:
}
}
time.Sleep(time.Duration(200) * time.Millisecond)
}
return
return nil
}
func (pod *PodWatchMonitor) GetContainerStatus(containerName string) (res *core.ContainerStatus) {
for i, _ := range pod.Manifest.Status.ContainerStatuses {
if pod.Manifest.Status.ContainerStatuses[i].Name == containerName {
res = &pod.Manifest.Status.ContainerStatuses[i]
break
}
func (pod *PodWatchMonitor) Watch() error {
for _, containerName := range pod.ContainersNames {
go func() {
err := pod.WatchContainerLogs(containerName)
if err != nil {
pod.Error <- err
}
}()
}
return
}
func (pod *PodWatchMonitor) GetContainerLogs(containerName string) ([]LogLine, error) {
client, err := pod.kube.ClientSet()
client, err := pod.Kube.ClientSet()
if err != nil {
return nil, err
return err
}
req := client.Core().
Pods(pod.Namespace).
GetLogs(pod.ResourceName, &core.PodLogOptions{
Container: containerName,
Timestamps: true,
watcher, err := client.Core().Pods(pod.Namespace).
Watch(metav1.ListOptions{
ResourceVersion: pod.InitialResourceVersion,
Watch: true,
FieldSelector: fields.OneTermEqualSelector("metadata.name", pod.ResourceName).String(),
})
readCloser, err := req.Stream()
if err != nil {
return nil, err
return err
}
defer readCloser.Close()
buf := bytes.Buffer{}
_, err = io.Copy(&buf, readCloser)
res := make([]LogLine, 0)
for _, line := range strings.Split(strings.TrimSuffix(buf.String(), "\n"), "\n") {
lineParts := strings.SplitN(line, " ", 2)
// TODO: receive only new log lines, save state into PodWatchMonitor
if len(lineParts) == 2 {
ll := LogLine{
Timestamp: lineParts[0],
Data: lineParts[1],
_, err = watch.Until(pod.Timeout, watcher, func(e watch.Event) (bool, error) {
pod.Kube.Log("[DEBUG] Pod %s event: %+v", pod.ResourceName, e)
object, ok := e.Object.(*core.Pod)
if !ok {
return true, fmt.Errorf("Expected %s to be a *core.Pod, got %T", pod.ResourceName, e.Object)
}
for _, cs := range object.Status.ContainerStatuses {
oldState := pod.ContainerMonitorStates[cs.Name]
if cs.State.Waiting != nil {
pod.ContainerMonitorStates[cs.Name] = "Waiting"
switch cs.State.Waiting.Reason {
case "ImagePullBackOff", "ErrImagePull", "CrashLoopBackOff":
pod.PodError <- PodError{
ContainerName: cs.Name,
PodName: pod.ResourceName,
Message: fmt.Sprintf("%s: %s", cs.State.Waiting.Reason, cs.State.Waiting.Message),
}
}
}
if cs.State.Running != nil {
pod.ContainerMonitorStates[cs.Name] = "Running"
}
if cs.State.Terminated != nil {
pod.ContainerMonitorStates[cs.Name] = "Running"
}
if oldState != pod.ContainerMonitorStates[cs.Name] {
pod.Kube.Log("Pod %s container %s state changed %v -> %v", pod.ResourceName, cs.Name, oldState, pod.ContainerMonitorStates[cs.Name])
}
res = append(res, ll)
}
}
return res, nil
return false, nil
})
return nil
}
type JobWatchMonitor struct {
WatchMonitor
MonitoredPodsQueue []*PodWatchMonitor
ProcessedPodsUIDs []types.UID
State string
Started chan bool
Succeeded chan bool
Error chan error
AddedPod chan *PodWatchMonitor
PodLogChunk chan *PodLogChunk
PodError chan PodError
MonitoredPods []*PodWatchMonitor
}
func (job *JobWatchMonitor) WaitTillResourceVersionAdded(resourceVersion string, jobInfo *resource.Info) error {
w, err := resource.
NewHelper(jobInfo.Client, jobInfo.Mapping).
WatchSingle(job.Namespace, job.ResourceName, resourceVersion)
func (job *JobWatchMonitor) Watch() error {
client, err := job.Kube.ClientSet()
if err != nil {
return err
}
watcher, err := client.Batch().Jobs(job.Namespace).
Watch(metav1.ListOptions{
ResourceVersion: job.InitialResourceVersion,
Watch: true,
FieldSelector: fields.OneTermEqualSelector("metadata.name", job.ResourceName).String(),
})
if err != nil {
return err
}
_, err = watch.Until(job.timeout, w, func(e watch.Event) (bool, error) {
if e.Type == watch.Added {
return true, nil
_, err = watch.Until(job.Timeout, watcher, func(e watch.Event) (bool, error) {
job.Kube.Log("[DEBUG] Job %s event: %+v", job.ResourceName, e)
switch job.State {
case "":
if e.Type == watch.Added {
job.Started <- true
oldState := job.State
job.State = "Started"
job.Kube.Log("[DEBUG] Job %s watcher state changed %v -> %v", job.ResourceName, oldState, job.State)
job.Kube.Log("[DEBUG] Starting job %s pods watcher", job.ResourceName)
go func() {
err := job.WatchPods()
if err != nil {
job.Error <- err
}
}()
}
case "Started":
object, ok := e.Object.(*batch.Job)
if !ok {
return true, fmt.Errorf("Expected %s to be a *batch.Job, got %T", job.ResourceName, e.Object)
}
for _, c := range object.Status.Conditions {
if c.Type == batch.JobComplete && c.Status == core.ConditionTrue {
oldState := job.State
job.State = "Succeeded"
job.Kube.Log("[DEBUG] Job %s watcher state changed %v -> %v", job.ResourceName, oldState, job.State)
job.Kube.Log("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", job.ResourceName, object.Status.Active, object.Status.Failed, object.Status.Succeeded)
return true, nil
} else if c.Type == batch.JobFailed && c.Status == core.ConditionTrue {
oldState := job.State
job.State = "Failed"
job.Kube.Log("[DEBUG] Job %s watcher state changed %v -> %v", job.ResourceName, oldState, job.State)
return true, fmt.Errorf("Job failed: %s", c.Reason)
}
}
default:
return true, fmt.Errorf("Unknown job %s watcher state: %s", job.ResourceName, job.State)
}
return false, nil
})
return err
}
func (job *JobWatchMonitor) TakeNextMonitoredPod() *PodWatchMonitor {
if len(job.MonitoredPodsQueue) == 0 {
return nil
if err != nil {
return err
}
var res *PodWatchMonitor
res, job.MonitoredPodsQueue = job.MonitoredPodsQueue[0], job.MonitoredPodsQueue[1:]
return res
}
func (job *JobWatchMonitor) SetPodProcessed(uid types.UID) {
job.ProcessedPodsUIDs = append(job.ProcessedPodsUIDs, uid)
}
func (job *JobWatchMonitor) SchedulePodMonitoring(pod *PodWatchMonitor) {
job.MonitoredPodsQueue = append(job.MonitoredPodsQueue, pod)
return nil
}
func (job *JobWatchMonitor) RefreshMonitoredPods() error {
job.kube.Log("RefreshMonitoredPods") // TODO: remove
client, err := job.kube.ClientSet()
func (job *JobWatchMonitor) WatchPods() error {
client, err := job.Kube.ClientSet()
if err != nil {
return err
}
@ -245,242 +376,146 @@ func (job *JobWatchMonitor) RefreshMonitoredPods() error {
if err != nil {
return err
}
job.kube.Log("jobManifest: %+v", jobManifest) // TODO: remove
selector, err := metav1.LabelSelectorAsSelector(jobManifest.Spec.Selector)
if err != nil {
return err
}
podList, err := client.Core().
podListWatcher, err := client.Core().
Pods(job.Namespace).
List(metav1.ListOptions{LabelSelector: selector.String()})
Watch(metav1.ListOptions{
Watch: true,
LabelSelector: selector.String(),
})
if err != nil {
return err
}
job.kube.Log("podList: %+v", podList) // TODO: remove
// TODO filter out pods that does not belong to controller-uid=job-uid
// TODO calculate timeout since job-watch started
_, err = watch.Until(job.Timeout, podListWatcher, func(e watch.Event) (bool, error) {
job.Kube.Log("[DEBUG] Job %s pods list event: %+v", job.ResourceName, e)
// Add new pods to monitor queue in chronological order by creation timestamp
podItems := make([]core.Pod, 0)
for _, item := range podList.Items {
podItems = append(podItems, item)
}
sort.Slice(podItems, func(i, j int) bool {
return podItems[i].CreationTimestamp.Time.Before(podItems[j].CreationTimestamp.Time)
})
searchNewPods:
for _, item := range podItems {
// filter out under processing
for _, monitoredPod := range job.MonitoredPodsQueue {
// TODO is there a need to check resource-version change?
if monitoredPod.UID == item.UID {
continue searchNewPods
}
podObject, ok := e.Object.(*core.Pod)
if !ok {
return true, fmt.Errorf("Expected %s to be a *core.Pod, got %T", job.ResourceName, e.Object)
}
// filter out already processed
for _, processedPodUID := range job.ProcessedPodsUIDs {
if processedPodUID == item.UID {
continue searchNewPods
for _, pod := range job.MonitoredPods {
if pod.ResourceName == podObject.Name {
// Already under monitoring
return false, nil
}
}
// TODO constructor from job & podObject
pod := &PodWatchMonitor{
WatchMonitor: WatchMonitor{
kube: job.kube,
timeout: job.timeout,
watchFeed: job.watchFeed,
Kube: job.Kube,
Timeout: job.Timeout,
Namespace: job.Namespace,
ResourceName: item.Name,
UID: item.UID,
Namespace: job.Namespace,
ResourceName: podObject.Name,
InitialResourceVersion: "",
},
PodLogChunk: job.PodLogChunk,
PodError: job.PodError,
Error: job.Error,
ContainerMonitorStates: make(map[string]string),
ProcessedContainerLogTimestamps: make(map[string]time.Time),
}
if err = pod.RefreshManifest(); err != nil {
return err
}
for _, containerConf := range pod.Manifest.Spec.InitContainers {
for _, containerConf := range podObject.Spec.InitContainers {
pod.InitContainersNames = append(pod.InitContainersNames, containerConf.Name)
}
for _, containerConf := range pod.Manifest.Spec.Containers {
for _, containerConf := range podObject.Spec.Containers {
pod.ContainersNames = append(pod.ContainersNames, containerConf.Name)
}
job.MonitoredPodsQueue = append(job.MonitoredPodsQueue, pod)
}
job.MonitoredPods = append(job.MonitoredPods, pod)
job.kube.Log("RefreshMonitoredPods done: MonitoredPodsQueue: %+v", job.MonitoredPodsQueue) // TODO: remove
go func() {
err := pod.Watch()
if err != nil {
job.Error <- err
}
}()
job.AddedPod <- pod
return false, nil
})
return nil
}
func (c *Client) WatchJobsTillDone(namespace string, reader io.Reader, watchFeed WatchFeed, timeout time.Duration) error {
infos, err := c.Build(namespace, reader)
if err != nil {
return err
}
return perform(infos, func(info *resource.Info) error {
return c.watchJobTillDone(info, watchFeed, timeout)
})
}
func (c *Client) watchJobTillDone(jobInfo *resource.Info, watchFeed WatchFeed, timeout time.Duration) error {
if jobInfo.Mapping.GroupVersionKind.Kind != "Job" {
return nil
}
uid, err := jobInfo.Mapping.UID(jobInfo.Object)
if err != nil {
return err
}
// TODO: constructor
job := &JobWatchMonitor{
WatchMonitor: WatchMonitor{
kube: c,
timeout: timeout,
watchFeed: watchFeed,
Kube: c,
Timeout: timeout,
Namespace: jobInfo.Namespace,
ResourceName: jobInfo.Name,
UID: uid,
Namespace: jobInfo.Namespace,
ResourceName: jobInfo.Name,
InitialResourceVersion: jobInfo.ResourceVersion,
},
}
if err := job.WaitTillResourceVersionAdded(jobInfo.ResourceVersion, jobInfo); err != nil {
return err // TODO
}
Started: make(chan bool, 0),
Succeeded: make(chan bool, 0),
AddedPod: make(chan *PodWatchMonitor, 10),
PodLogChunk: make(chan *PodLogChunk, 1000),
if err = job.RefreshMonitoredPods(); err != nil {
return err // TODO
PodError: make(chan PodError, 0),
Error: make(chan error, 0),
}
var processPod *PodWatchMonitor
// TODO: split method into corresponding functions
TakeNextMonitoredPod:
for {
if processPod = job.TakeNextMonitoredPod(); processPod == nil {
break
}
if err := processPod.RefreshManifest(); err != nil {
c.Log("Pod %s refresh manifest failed: %s", processPod.ResourceName, err)
// TODO stream system-error to feed
job.SetPodProcessed(processPod.UID)
continue TakeNextMonitoredPod
}
c.Log("Pod manifest refreshed, ResourceVersion: %s", processPod.Manifest.ResourceVersion)
if processPod.Manifest.Status.Phase == core.PodPending {
c.Log("Pod %s is in PENDING state", processPod.ResourceName)
if podReadyCondition := processPod.GetReadyCondition(); podReadyCondition != nil {
c.Log("Pod %s ready condition: %+v", processPod.ResourceName, podReadyCondition)
if podReadyCondition.Status != core.ConditionTrue {
// TODO: init-containers-statuses
for _, containerStatus := range processPod.Manifest.Status.ContainerStatuses {
if containerStatus.Ready {
continue
}
if containerStatus.State.Waiting != nil {
c.Log("Pod %s container %s is in waiting state: %s: %s", processPod.ResourceName, containerStatus.Name, containerStatus.State.Waiting.Reason, containerStatus.State.Waiting.Message)
switch containerStatus.State.Waiting.Reason {
case "ImagePullBackOff", "ErrImagePull":
// TODO stream bad_image user-error
processPod.SetContainerProcessed(containerStatus.Name, containerStatus.ContainerID)
case "CrashLoopBackOff":
// TODO stream container_crash user-error
processPod.SetContainerProcessed(containerStatus.Name, containerStatus.ContainerID)
}
}
if containerStatus.State.Terminated != nil {
// TODO dig more, think more.
// TODO not necessary because in that container state we still able to reach containers logs
// TODO what about failed state? we should stream user-error about incorrectly terminated container
// TODO init-container should be finally terminated in normal situation
// TODO that error should follow logs and not preceede them
// TODO so it is needed to move that if into after send-logs-for-container section
c.Log("Pod %s container %s (%s) is in terminated state: %s: %s", processPod.ResourceName, containerStatus.Name, containerStatus.State.Terminated.ContainerID, containerStatus.State.Terminated.Reason, containerStatus.State.Terminated.Message)
processPod.SetContainerProcessed(containerStatus.Name, containerStatus.ContainerID)
}
}
job.SchedulePodMonitoring(processPod)
time.Sleep(time.Duration(1) * time.Second) // TODO remove this
continue TakeNextMonitoredPod
// TODO: fetch state and stream to feed userspace-error
}
}
}
// TODO: init-containers
ProcessContainers:
for _, containerName := range processPod.GetMonitoredContainersNames() {
c.Log("Process pod %s container %s", processPod.ResourceName, containerName)
if containerStatus := processPod.GetContainerStatus(containerName); containerStatus != nil {
c.Log("Process pod %s container %s status: %+v", processPod.ResourceName, containerName, containerStatus)
if containerStatus.State.Waiting != nil {
if containerStatus.State.Waiting.Reason == "RunContainerError" {
// TODO: stream userspace-error container_stuck to watch feed
processPod.SetContainerProcessed(containerName, containerStatus.ContainerID)
continue ProcessContainers
}
}
} else {
c.Log("Process pod %s container %s status not available", processPod.ResourceName, containerName)
}
logLines, err := processPod.GetContainerLogs(containerName)
if err != nil {
c.Log("Error getting job %s pod %s container %s log chunk: %s", job.ResourceName, processPod.ResourceName, containerName, err)
}
chunk := &JobLogChunk{
JobName: job.ResourceName,
PodName: processPod.ResourceName,
ContainerName: containerName,
LogLines: logLines,
}
if err = job.watchFeed.WriteJobLogChunk(chunk); err != nil {
c.Log("Error writing job %s pod %s container %s log chunk to watch feed: %s", chunk.JobName, chunk.PodName, chunk.ContainerName, err)
}
c.Log("[DEBUG] Starting job %s watcher", job.ResourceName)
go func() {
err := job.Watch()
if err != nil {
job.Error <- err
}
}()
if len(processPod.GetMonitoredContainersNames()) > 0 {
job.SchedulePodMonitoring(processPod)
for {
select {
case <-job.Started:
c.Log("Job %s started", job.ResourceName)
// TODO watchFeed
case <-job.Succeeded:
return nil
case err := <-job.Error:
return err
case pod := <-job.AddedPod:
c.Log("Job %s pod %s added", job.ResourceName, pod.ResourceName)
case podLogChunk := <-job.PodLogChunk:
watchFeed.WriteJobLogChunk(JobLogChunk{
PodLogChunk: *podLogChunk,
JobName: job.ResourceName,
})
case podError := <-job.PodError:
watchFeed.WriteJobPodError(JobPodError{
JobName: job.ResourceName,
PodError: podError,
})
}
time.Sleep(time.Duration(1) * time.Second) // TODO: show logs flawlessly without any suspension if there is something to show, also use ticker
}
// TODO: wait till job done event
// TODO: make refresh for pods while waiting: job.RefreshMonitoredPods()
// TODO: it is not necessary to refresh list on every tick
// TODO: add following event watch before ending this function
// switch e.Type {
// case watch.Added, watch.Modified:
// c.Log("o = %v", o)
// c.Log("o.Status = %v", o.Status)
// c.Log("o.Status.Conditions = %v", o.Status.Conditions)
// for _, c := range o.Status.Conditions {
// if c.Type == batchinternal.JobComplete && c.Status == core.ConditionTrue {
// return true, nil
// } else if c.Type == batchinternal.JobFailed && c.Status == core.ConditionTrue {
// return true, fmt.Errorf("Job failed: %s", c.Reason)
// }
// }
// c.Log("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", jobInfo.Name, o.Status.Active, o.Status.Failed, o.Status.Succeeded)
// return false, nil
// }
return nil
}
func (c *Client) WatchJobsTillDone(namespace string, reader io.Reader, watchFeed WatchFeed, timeout time.Duration) error {
infos, err := c.Build(namespace, reader)
if err != nil {
return err
}
return perform(infos, func(info *resource.Info) error {
return c.watchJobTillDone(info, watchFeed, timeout)
})
}

@ -22,6 +22,7 @@ It has these top-level messages:
TestSuite
LogLine
JobLogChunk
JobPodError
WatchFeed
*/
package release

@ -76,14 +76,55 @@ func (m *JobLogChunk) GetLogLines() []*LogLine {
return nil
}
type JobPodError struct {
JobName string `protobuf:"bytes,1,opt,name=job_name,json=jobName" json:"job_name,omitempty"`
PodName string `protobuf:"bytes,2,opt,name=pod_name,json=podName" json:"pod_name,omitempty"`
ContainerName string `protobuf:"bytes,3,opt,name=container_name,json=containerName" json:"container_name,omitempty"`
Message string `protobuf:"bytes,4,opt,name=message" json:"message,omitempty"`
}
func (m *JobPodError) Reset() { *m = JobPodError{} }
func (m *JobPodError) String() string { return proto.CompactTextString(m) }
func (*JobPodError) ProtoMessage() {}
func (*JobPodError) Descriptor() ([]byte, []int) { return fileDescriptor6, []int{2} }
func (m *JobPodError) GetJobName() string {
if m != nil {
return m.JobName
}
return ""
}
func (m *JobPodError) GetPodName() string {
if m != nil {
return m.PodName
}
return ""
}
func (m *JobPodError) GetContainerName() string {
if m != nil {
return m.ContainerName
}
return ""
}
func (m *JobPodError) GetMessage() string {
if m != nil {
return m.Message
}
return ""
}
type WatchFeed struct {
JobLogChunk *JobLogChunk `protobuf:"bytes,1,opt,name=job_log_chunk,json=jobLogChunk" json:"job_log_chunk,omitempty"`
JobPodError *JobPodError `protobuf:"bytes,2,opt,name=job_pod_error,json=jobPodError" json:"job_pod_error,omitempty"`
}
func (m *WatchFeed) Reset() { *m = WatchFeed{} }
func (m *WatchFeed) String() string { return proto.CompactTextString(m) }
func (*WatchFeed) ProtoMessage() {}
func (*WatchFeed) Descriptor() ([]byte, []int) { return fileDescriptor6, []int{2} }
func (*WatchFeed) Descriptor() ([]byte, []int) { return fileDescriptor6, []int{3} }
func (m *WatchFeed) GetJobLogChunk() *JobLogChunk {
if m != nil {
@ -92,31 +133,42 @@ func (m *WatchFeed) GetJobLogChunk() *JobLogChunk {
return nil
}
func (m *WatchFeed) GetJobPodError() *JobPodError {
if m != nil {
return m.JobPodError
}
return nil
}
func init() {
proto.RegisterType((*LogLine)(nil), "hapi.release.LogLine")
proto.RegisterType((*JobLogChunk)(nil), "hapi.release.JobLogChunk")
proto.RegisterType((*JobPodError)(nil), "hapi.release.JobPodError")
proto.RegisterType((*WatchFeed)(nil), "hapi.release.WatchFeed")
}
func init() { proto.RegisterFile("hapi/release/watch_feed.proto", fileDescriptor6) }
var fileDescriptor6 = []byte{
// 258 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x90, 0x41, 0x4b, 0xc3, 0x40,
0x14, 0x84, 0x89, 0x2d, 0xa6, 0x79, 0xb1, 0x1e, 0x16, 0x84, 0x14, 0x14, 0x4a, 0x40, 0xe8, 0x29,
0x85, 0x78, 0x14, 0x2f, 0x0a, 0x1e, 0x4a, 0xf0, 0x90, 0x8b, 0xe0, 0x25, 0x6c, 0xb2, 0xcf, 0x64,
0x63, 0xb2, 0x6f, 0x49, 0x56, 0xfc, 0x37, 0xfe, 0x56, 0xd9, 0x4d, 0xb0, 0xed, 0xed, 0xed, 0x7c,
0xc3, 0x30, 0xb3, 0x70, 0xd7, 0x70, 0x2d, 0xf7, 0x03, 0x76, 0xc8, 0x47, 0xdc, 0xff, 0x70, 0x53,
0x35, 0xc5, 0x27, 0xa2, 0x48, 0xf4, 0x40, 0x86, 0xd8, 0x95, 0xc5, 0xc9, 0x8c, 0xe3, 0x47, 0xf0,
0x33, 0xaa, 0x33, 0xa9, 0x90, 0xdd, 0x42, 0x60, 0x64, 0x8f, 0xa3, 0xe1, 0xbd, 0x8e, 0xbc, 0xad,
0xb7, 0x0b, 0xf2, 0xa3, 0xc0, 0x18, 0x2c, 0x05, 0x37, 0x3c, 0xba, 0x70, 0xc0, 0xdd, 0xf1, 0xaf,
0x07, 0xe1, 0x81, 0xca, 0x8c, 0xea, 0x97, 0xe6, 0x5b, 0x7d, 0xb1, 0x0d, 0xac, 0x5a, 0x2a, 0x0b,
0xc5, 0x7b, 0x9c, 0x03, 0xfc, 0x96, 0xca, 0x37, 0xde, 0xa3, 0x45, 0x9a, 0xc4, 0x84, 0xa6, 0x08,
0x5f, 0x93, 0x70, 0xe8, 0x1e, 0xae, 0x2b, 0x52, 0x86, 0x4b, 0x85, 0xc3, 0x64, 0x58, 0x38, 0xc3,
0xfa, 0x5f, 0x75, 0xb6, 0x14, 0x82, 0x8e, 0xea, 0xa2, 0x93, 0x0a, 0xc7, 0x68, 0xb9, 0x5d, 0xec,
0xc2, 0xf4, 0x26, 0x39, 0xdd, 0x92, 0xcc, 0x43, 0xf2, 0x55, 0x37, 0x1d, 0x63, 0x7c, 0x80, 0xe0,
0xdd, 0xee, 0x7f, 0x45, 0x14, 0xec, 0x09, 0xd6, 0xb6, 0x9d, 0x0d, 0xa9, 0x6c, 0x5d, 0x57, 0x31,
0x4c, 0x37, 0xe7, 0x21, 0x27, 0x7b, 0xf2, 0xb0, 0x3d, 0x3e, 0x9e, 0x83, 0x0f, 0x7f, 0xf6, 0x94,
0x97, 0xee, 0x27, 0x1f, 0xfe, 0x02, 0x00, 0x00, 0xff, 0xff, 0x6f, 0x51, 0x56, 0xb2, 0x6a, 0x01,
0x00, 0x00,
// 307 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x52, 0x4f, 0x4b, 0xfb, 0x40,
0x10, 0x25, 0xbf, 0x96, 0x5f, 0x9a, 0x8d, 0xf5, 0xb0, 0x20, 0xa4, 0xa0, 0x50, 0x02, 0x42, 0x4f,
0x29, 0xd4, 0xa3, 0x78, 0x51, 0xf4, 0x20, 0x45, 0xa4, 0x17, 0xc1, 0x4b, 0xd8, 0x64, 0xc7, 0xfc,
0x31, 0xd9, 0x09, 0xbb, 0x2b, 0x7e, 0x01, 0x2f, 0x7e, 0x09, 0x3f, 0xab, 0xec, 0x9f, 0x6a, 0x04,
0xaf, 0xde, 0x66, 0xe6, 0xbd, 0x19, 0xde, 0x7b, 0xbb, 0xe4, 0xa4, 0x66, 0x43, 0xb3, 0x96, 0xd0,
0x01, 0x53, 0xb0, 0x7e, 0x65, 0xba, 0xac, 0xf3, 0x27, 0x00, 0x9e, 0x0d, 0x12, 0x35, 0xd2, 0x03,
0x03, 0x67, 0x1e, 0x4e, 0xcf, 0x49, 0xb8, 0xc5, 0x6a, 0xdb, 0x08, 0xa0, 0xc7, 0x24, 0xd2, 0x4d,
0x0f, 0x4a, 0xb3, 0x7e, 0x48, 0x82, 0x65, 0xb0, 0x8a, 0x76, 0xdf, 0x03, 0x4a, 0xc9, 0x94, 0x33,
0xcd, 0x92, 0x7f, 0x16, 0xb0, 0x75, 0xfa, 0x11, 0x90, 0xf8, 0x16, 0x8b, 0x2d, 0x56, 0x57, 0xf5,
0x8b, 0x78, 0xa6, 0x0b, 0x32, 0x6b, 0xb1, 0xc8, 0x05, 0xeb, 0xc1, 0x1f, 0x08, 0x5b, 0x2c, 0xee,
0x58, 0x0f, 0x06, 0x1a, 0x90, 0x3b, 0xc8, 0x9d, 0x08, 0x07, 0xe4, 0x16, 0x3a, 0x25, 0x87, 0x25,
0x0a, 0xcd, 0x1a, 0x01, 0xd2, 0x11, 0x26, 0x96, 0x30, 0xff, 0x9a, 0x5a, 0xda, 0x86, 0x44, 0x1d,
0x56, 0x79, 0xd7, 0x08, 0x50, 0xc9, 0x74, 0x39, 0x59, 0xc5, 0x9b, 0xa3, 0x6c, 0xec, 0x25, 0xf3,
0x46, 0x76, 0xb3, 0xce, 0x15, 0x2a, 0x7d, 0x73, 0x02, 0xef, 0x91, 0x5f, 0x4b, 0x89, 0xf2, 0x6f,
0x05, 0x26, 0x24, 0xec, 0x41, 0x29, 0x56, 0x41, 0x32, 0x75, 0x07, 0x7c, 0x9b, 0xbe, 0x07, 0x24,
0x7a, 0x30, 0xef, 0x70, 0x03, 0xc0, 0xe9, 0x05, 0x99, 0x1b, 0x11, 0xc6, 0x4c, 0x69, 0x62, 0xb3,
0x4a, 0xe2, 0xcd, 0xe2, 0xa7, 0x99, 0x51, 0xae, 0xbb, 0xb8, 0x1d, 0x85, 0xec, 0xd7, 0x8d, 0x58,
0x30, 0xa6, 0xac, 0xda, 0xdf, 0xd6, 0xf7, 0xae, 0xed, 0xfa, 0xbe, 0xb9, 0x8c, 0x1e, 0x43, 0xcf,
0x29, 0xfe, 0xdb, 0x0f, 0x71, 0xf6, 0x19, 0x00, 0x00, 0xff, 0xff, 0xf8, 0x73, 0x41, 0xa7, 0x31,
0x02, 0x00, 0x00,
}

@ -191,28 +191,47 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install
// pre-install hooks
if !req.DisableHooks {
handleLogChunk := kube.WriteJobLogChunkFunc(func(chunk *kube.JobLogChunk) error {
chunkResp := &services.InstallReleaseResponse{
WatchFeed: &release.WatchFeed{
JobLogChunk: &release.JobLogChunk{},
},
}
chunkResp.WatchFeed.JobLogChunk.LogLines = make([]*release.LogLine, 0)
watchFeed := &kube.WatchFeedProto{
WriteJobLogChunkFunc: func(chunk kube.JobLogChunk) error {
chunkResp := &services.InstallReleaseResponse{
WatchFeed: &release.WatchFeed{
JobLogChunk: &release.JobLogChunk{
JobName: chunk.JobName,
PodName: chunk.PodName,
ContainerName: chunk.ContainerName,
LogLines: make([]*release.LogLine, 0),
},
},
}
for _, line := range chunk.LogLines {
ll := &release.LogLine{
Timestamp: line.Timestamp,
Data: line.Data,
for _, line := range chunk.LogLines {
ll := &release.LogLine{
Timestamp: line.Timestamp,
Data: line.Data,
}
chunkResp.WatchFeed.JobLogChunk.LogLines = append(chunkResp.WatchFeed.JobLogChunk.LogLines, ll)
}
chunkResp.WatchFeed.JobLogChunk.LogLines = append(chunkResp.WatchFeed.JobLogChunk.LogLines, ll)
}
return stream.Send(chunkResp)
})
return stream.Send(chunkResp)
},
WriteJobPodErrorFunc: func(obj kube.JobPodError) error {
chunkResp := &services.InstallReleaseResponse{
WatchFeed: &release.WatchFeed{
JobPodError: &release.JobPodError{
JobName: obj.JobName,
PodName: obj.PodName,
ContainerName: obj.ContainerName,
Message: obj.Message,
},
},
}
return stream.Send(chunkResp)
},
}
// TODO watch job with feed only if job have annotation "helm/watch-logs": "true"
// TODO otherwise watch as ordinary hook just like before, using WatchUntilReady
if err := s.execHookWithWatchFeed(r.Hooks, r.Name, r.Namespace, hooks.PreInstall, req.Timeout, handleLogChunk); err != nil {
if err := s.execHookWithWatchFeed(r.Hooks, r.Name, r.Namespace, hooks.PreInstall, req.Timeout, watchFeed); err != nil {
return res, err
}
} else {

@ -365,7 +365,7 @@ func (s *ReleaseServer) recordRelease(r *release.Release, reuse bool) {
}
func (s *ReleaseServer) execHook(hs []*release.Hook, name, namespace, hook string, timeout int64) error {
return s.execHookWithWatchFeed(hs, name, namespace, hook, timeout, kube.WriteJobLogChunkFunc(func(*kube.JobLogChunk) error { return nil }))
return s.execHookWithWatchFeed(hs, name, namespace, hook, timeout, kube.WatchFeedStub)
}
func (s *ReleaseServer) execHookWithWatchFeed(hs []*release.Hook, name, namespace, hook string, timeout int64, watchFeed kube.WatchFeed) error {

Loading…
Cancel
Save