diff --git a/_proto/hapi/release/watch_feed.proto b/_proto/hapi/release/watch_feed.proto index 183724dba..2285058ea 100644 --- a/_proto/hapi/release/watch_feed.proto +++ b/_proto/hapi/release/watch_feed.proto @@ -16,8 +16,16 @@ message JobLogChunk { repeated LogLine log_lines = 4; } +message JobPodError { + string job_name = 1; + string pod_name = 2; + string container_name = 3; + string message = 4; +} + message WatchFeed { JobLogChunk job_log_chunk = 1; + JobPodError job_pod_error = 2; /* * WatchFeed could contain one or multiple events from tiller at the same time. diff --git a/pkg/helm/client.go b/pkg/helm/client.go index 3df0c8717..7214be2d4 100644 --- a/pkg/helm/client.go +++ b/pkg/helm/client.go @@ -388,8 +388,9 @@ func (h *Client) install(ctx context.Context, req *rls.InstallReleaseRequest) (* } if resp.WatchFeed.GetJobLogChunk() != nil { - for _, line := range resp.WatchFeed.GetJobLogChunk().LogLines { - fmt.Printf("%s %s\n", line.Timestamp, line.Data) // TODO: make normal formatting as follows. + chunk := resp.WatchFeed.GetJobLogChunk() + for _, line := range chunk.LogLines { + fmt.Printf("{job %s / pod %s / container %s} %s %s\n", chunk.JobName, chunk.PodName, chunk.ContainerName, line.Timestamp, line.Data) // TODO: make normal formatting as follows. // TODO The client could work like state machine: // TODO when receiving job-pod-container log chunk print header "==> job X pod X container Y logs <==\n", // TODO just like `tail -f *` works on multiple files at the same time. @@ -399,6 +400,8 @@ func (h *Client) install(ctx context.Context, req *rls.InstallReleaseRequest) (* // TODO The main reason to stream userspace-events like ImagePullBackOff or CrashLoopBackOff is // TODO to give user enough info so that user can debug templates without accessing cluster using kubectl. } + } else if resp.WatchFeed.GetJobPodError() != nil { + fmt.Printf("ERROR: %v", resp.WatchFeed.GetJobPodError()) // TODO: normal formatting } else { finalResp = resp // TODO verify/debug this code } diff --git a/pkg/kube/job.go b/pkg/kube/job.go index 9fd71cc27..419912a04 100644 --- a/pkg/kube/job.go +++ b/pkg/kube/job.go @@ -2,21 +2,44 @@ package kube import ( "bytes" - _ "fmt" + "fmt" "io" - "sort" + _ "sort" "strings" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/fields" + _ "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" + "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/kubectl/resource" ) +var ( + WatchFeedStub = &WatchFeedProto{ + WriteJobLogChunkFunc: func(JobLogChunk) error { return nil }, + WriteJobPodErrorFunc: func(JobPodError) error { return nil }, + } +) + type WatchFeed interface { - WriteJobLogChunk(*JobLogChunk) error + WriteJobLogChunk(JobLogChunk) error + WriteJobPodError(JobPodError) error +} + +// Prototype-struct helper to create feed with callbacks specified in-place of creation (such as WatchFeedStub var) +type WatchFeedProto struct { + WriteJobLogChunkFunc func(JobLogChunk) error + WriteJobPodErrorFunc func(JobPodError) error +} + +func (proto *WatchFeedProto) WriteJobLogChunk(arg JobLogChunk) error { + return proto.WriteJobLogChunkFunc(arg) +} +func (proto *WatchFeedProto) WriteJobPodError(arg JobPodError) error { + return proto.WriteJobPodErrorFunc(arg) } type LogLine struct { @@ -24,217 +47,325 @@ type LogLine struct { Data string } -type JobLogChunk struct { - JobName string +type PodLogChunk struct { PodName string ContainerName string LogLines []LogLine } -type WriteJobLogChunkFunc func(*JobLogChunk) error +type PodError struct { + Message string + PodName string + ContainerName string +} + +type JobLogChunk struct { + PodLogChunk + JobName string +} -func (f WriteJobLogChunkFunc) WriteJobLogChunk(chunk *JobLogChunk) error { - return f(chunk) +type JobPodError struct { + PodError + JobName string } type WatchMonitor struct { - kube *Client - timeout time.Duration - watchFeed WatchFeed + Kube *Client + Timeout time.Duration - Namespace string - ResourceName string - UID types.UID + Namespace string + ResourceName string + InitialResourceVersion string } type PodWatchMonitor struct { WatchMonitor - Manifest *core.Pod + PodLogChunk chan *PodLogChunk + PodError chan PodError + Error chan error - InitContainersNames []string - ProcessedInitContainersNames []string - ProcessedInitContainersIDs []string + ContainerMonitorStates map[string]string + ProcessedContainerLogTimestamps map[string]time.Time - ContainersNames []string - ProcessedContainersNames []string - ProcessedContainersIDs []string + InitContainersNames []string + ContainersNames []string } -func (pod *PodWatchMonitor) GetMonitoredContainersNames() []string { - res := make([]string, 0) +func (pod *PodWatchMonitor) FollowContainerLogs(containerName string) error { + client, err := pod.Kube.ClientSet() + if err != nil { + return err + } -FilterProcessedContainersNames: - for _, name := range pod.ContainersNames { - for _, processedContainerName := range pod.ProcessedContainersNames { - if processedContainerName == name { - continue FilterProcessedContainersNames - } - } - res = append(res, name) + // var sinceTime *metav1.Time + // if v, found := pod.ProcessedContainerLogTimestamps[containerName]; found { + // sinceTime = &metav1.Time{v} + // } + + req := client.Core(). + Pods(pod.Namespace). + GetLogs(pod.ResourceName, &core.PodLogOptions{ + Container: containerName, + Timestamps: true, + Follow: true, + }) + + readCloser, err := req.Stream() + if err != nil { + return err } + defer readCloser.Close() - return res -} + lineBuf := bytes.Buffer{} + rawBuf := make([]byte, 4096) -func (pod *PodWatchMonitor) SetContainerProcessed(containerName string, containerID string) { - pod.ProcessedContainersNames = append(pod.ProcessedContainersNames, containerName) - pod.ProcessedContainersIDs = append(pod.ProcessedContainersIDs, containerID) -} + for { + n, err := readCloser.Read(rawBuf) + if err != nil && err == io.EOF { + break + } else if err != nil { + return err + } -func (pod *PodWatchMonitor) GetMonitoredInitContainersNames() []string { - res := make([]string, 0) + chunkLines := make([]LogLine, 0) + for i := 0; i < n; i++ { + if rawBuf[i] == '\n' { + lineParts := strings.SplitN(lineBuf.String(), " ", 2) + if len(lineParts) == 2 { + chunkLines = append(chunkLines, LogLine{Timestamp: lineParts[0], Data: lineParts[1]}) + } -FilterProcessedInitContainersNames: - for _, name := range pod.InitContainersNames { - for _, processedInitContainerName := range pod.ProcessedInitContainersNames { - if processedInitContainerName == name { - continue FilterProcessedInitContainersNames + lineBuf.Reset() + continue } + + lineBuf.WriteByte(rawBuf[i]) + } + + pod.PodLogChunk <- &PodLogChunk{ + PodName: pod.ResourceName, + ContainerName: containerName, + LogLines: chunkLines, } - res = append(res, name) } - return res -} + return nil -func (pod *PodWatchMonitor) SetInitContainerProcessed(containerName string, containerID string) { - pod.ProcessedInitContainersNames = append(pod.ProcessedInitContainersNames, containerName) - pod.ProcessedInitContainersIDs = append(pod.ProcessedInitContainersIDs, containerID) -} + // buf := bytes.Buffer{} + // _, err = io.Copy(&buf, readCloser) -func (pod *PodWatchMonitor) RefreshManifest() error { - client, err := pod.kube.ClientSet() - if err != nil { - return err - } + // lines := strings.Split(strings.TrimSuffix(buf.String(), "\n"), "\n") - manifest, err := client.Core(). - Pods(pod.Namespace). - Get(pod.ResourceName, metav1.GetOptions{}) - if err != nil { - return err - } - pod.Manifest = manifest + // res := make([]LogLine, 0) + // for _, line := range lines { + // lineParts := strings.SplitN(line, " ", 2) + // if len(lineParts) == 2 { + // ll := LogLine{ + // Timestamp: lineParts[0], + // Data: lineParts[1], + // } + // res = append(res, ll) + // pod.Kube.Log(">>> %s", ll) + // } + // } - return nil -} + // if len(res) > 0 { + // t, err := time.Parse(time.RFC3339, res[len(res)-1].Timestamp) + // if err != nil { + // return nil, err + // } + // pod.ProcessedContainerLogTimestamps[containerName] = t + // } -func (pod *PodWatchMonitor) GetReadyCondition() (res *core.PodCondition) { - for i, _ := range pod.Manifest.Status.Conditions { - if pod.Manifest.Status.Conditions[i].Type == "Ready" { - res = &pod.Manifest.Status.Conditions[i] - break - } - } - return + // logLines, err := pod.GetContainerLogs(containerName) + // if err != nil { + // return err + // } + + // pod.PodLogChunk <- &PodLogChunk{ + // PodName: pod.ResourceName, + // ContainerName: containerName, + // LogLines: logLines, + // } + + // return res, nil } -func (pod *PodWatchMonitor) GetInitContainerStatus(containerName string) (res *core.ContainerStatus) { - for i, _ := range pod.Manifest.Status.InitContainerStatuses { - if pod.Manifest.Status.InitContainerStatuses[i].Name == containerName { - res = &pod.Manifest.Status.InitContainerStatuses[i] - break +func (pod *PodWatchMonitor) WatchContainerLogs(containerName string) error { + for { + for _, containerName := range pod.ContainersNames { + switch pod.ContainerMonitorStates[containerName] { + case "Running", "Terminated": + return pod.FollowContainerLogs(containerName) + case "Waiting": + default: + } } + time.Sleep(time.Duration(200) * time.Millisecond) } - return + + return nil } -func (pod *PodWatchMonitor) GetContainerStatus(containerName string) (res *core.ContainerStatus) { - for i, _ := range pod.Manifest.Status.ContainerStatuses { - if pod.Manifest.Status.ContainerStatuses[i].Name == containerName { - res = &pod.Manifest.Status.ContainerStatuses[i] - break - } +func (pod *PodWatchMonitor) Watch() error { + for _, containerName := range pod.ContainersNames { + go func() { + err := pod.WatchContainerLogs(containerName) + if err != nil { + pod.Error <- err + } + }() } - return -} -func (pod *PodWatchMonitor) GetContainerLogs(containerName string) ([]LogLine, error) { - client, err := pod.kube.ClientSet() + client, err := pod.Kube.ClientSet() if err != nil { - return nil, err + return err } - req := client.Core(). - Pods(pod.Namespace). - GetLogs(pod.ResourceName, &core.PodLogOptions{ - Container: containerName, - Timestamps: true, + watcher, err := client.Core().Pods(pod.Namespace). + Watch(metav1.ListOptions{ + ResourceVersion: pod.InitialResourceVersion, + Watch: true, + FieldSelector: fields.OneTermEqualSelector("metadata.name", pod.ResourceName).String(), }) - - readCloser, err := req.Stream() if err != nil { - return nil, err + return err } - defer readCloser.Close() - buf := bytes.Buffer{} - _, err = io.Copy(&buf, readCloser) - - res := make([]LogLine, 0) - for _, line := range strings.Split(strings.TrimSuffix(buf.String(), "\n"), "\n") { - lineParts := strings.SplitN(line, " ", 2) - // TODO: receive only new log lines, save state into PodWatchMonitor - if len(lineParts) == 2 { - ll := LogLine{ - Timestamp: lineParts[0], - Data: lineParts[1], + _, err = watch.Until(pod.Timeout, watcher, func(e watch.Event) (bool, error) { + pod.Kube.Log("[DEBUG] Pod %s event: %+v", pod.ResourceName, e) + + object, ok := e.Object.(*core.Pod) + if !ok { + return true, fmt.Errorf("Expected %s to be a *core.Pod, got %T", pod.ResourceName, e.Object) + } + + for _, cs := range object.Status.ContainerStatuses { + oldState := pod.ContainerMonitorStates[cs.Name] + + if cs.State.Waiting != nil { + pod.ContainerMonitorStates[cs.Name] = "Waiting" + + switch cs.State.Waiting.Reason { + case "ImagePullBackOff", "ErrImagePull", "CrashLoopBackOff": + pod.PodError <- PodError{ + ContainerName: cs.Name, + PodName: pod.ResourceName, + Message: fmt.Sprintf("%s: %s", cs.State.Waiting.Reason, cs.State.Waiting.Message), + } + } + } + if cs.State.Running != nil { + pod.ContainerMonitorStates[cs.Name] = "Running" + } + if cs.State.Terminated != nil { + pod.ContainerMonitorStates[cs.Name] = "Running" + } + + if oldState != pod.ContainerMonitorStates[cs.Name] { + pod.Kube.Log("Pod %s container %s state changed %v -> %v", pod.ResourceName, cs.Name, oldState, pod.ContainerMonitorStates[cs.Name]) } - res = append(res, ll) } - } - return res, nil + return false, nil + }) + + return nil } type JobWatchMonitor struct { WatchMonitor - MonitoredPodsQueue []*PodWatchMonitor - ProcessedPodsUIDs []types.UID + State string + + Started chan bool + Succeeded chan bool + Error chan error + AddedPod chan *PodWatchMonitor + PodLogChunk chan *PodLogChunk + PodError chan PodError + + MonitoredPods []*PodWatchMonitor } -func (job *JobWatchMonitor) WaitTillResourceVersionAdded(resourceVersion string, jobInfo *resource.Info) error { - w, err := resource. - NewHelper(jobInfo.Client, jobInfo.Mapping). - WatchSingle(job.Namespace, job.ResourceName, resourceVersion) +func (job *JobWatchMonitor) Watch() error { + client, err := job.Kube.ClientSet() + if err != nil { + return err + } + + watcher, err := client.Batch().Jobs(job.Namespace). + Watch(metav1.ListOptions{ + ResourceVersion: job.InitialResourceVersion, + Watch: true, + FieldSelector: fields.OneTermEqualSelector("metadata.name", job.ResourceName).String(), + }) if err != nil { return err } - _, err = watch.Until(job.timeout, w, func(e watch.Event) (bool, error) { - if e.Type == watch.Added { - return true, nil + _, err = watch.Until(job.Timeout, watcher, func(e watch.Event) (bool, error) { + job.Kube.Log("[DEBUG] Job %s event: %+v", job.ResourceName, e) + + switch job.State { + case "": + if e.Type == watch.Added { + job.Started <- true + + oldState := job.State + job.State = "Started" + job.Kube.Log("[DEBUG] Job %s watcher state changed %v -> %v", job.ResourceName, oldState, job.State) + + job.Kube.Log("[DEBUG] Starting job %s pods watcher", job.ResourceName) + go func() { + err := job.WatchPods() + if err != nil { + job.Error <- err + } + }() + } + + case "Started": + object, ok := e.Object.(*batch.Job) + if !ok { + return true, fmt.Errorf("Expected %s to be a *batch.Job, got %T", job.ResourceName, e.Object) + } + + for _, c := range object.Status.Conditions { + if c.Type == batch.JobComplete && c.Status == core.ConditionTrue { + oldState := job.State + job.State = "Succeeded" + job.Kube.Log("[DEBUG] Job %s watcher state changed %v -> %v", job.ResourceName, oldState, job.State) + + job.Kube.Log("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", job.ResourceName, object.Status.Active, object.Status.Failed, object.Status.Succeeded) + + return true, nil + } else if c.Type == batch.JobFailed && c.Status == core.ConditionTrue { + oldState := job.State + job.State = "Failed" + job.Kube.Log("[DEBUG] Job %s watcher state changed %v -> %v", job.ResourceName, oldState, job.State) + + return true, fmt.Errorf("Job failed: %s", c.Reason) + } + } + + default: + return true, fmt.Errorf("Unknown job %s watcher state: %s", job.ResourceName, job.State) } + return false, nil }) - return err -} - -func (job *JobWatchMonitor) TakeNextMonitoredPod() *PodWatchMonitor { - if len(job.MonitoredPodsQueue) == 0 { - return nil + if err != nil { + return err } - var res *PodWatchMonitor - res, job.MonitoredPodsQueue = job.MonitoredPodsQueue[0], job.MonitoredPodsQueue[1:] - return res -} - -func (job *JobWatchMonitor) SetPodProcessed(uid types.UID) { - job.ProcessedPodsUIDs = append(job.ProcessedPodsUIDs, uid) -} - -func (job *JobWatchMonitor) SchedulePodMonitoring(pod *PodWatchMonitor) { - job.MonitoredPodsQueue = append(job.MonitoredPodsQueue, pod) + return nil } -func (job *JobWatchMonitor) RefreshMonitoredPods() error { - job.kube.Log("RefreshMonitoredPods") // TODO: remove - - client, err := job.kube.ClientSet() +func (job *JobWatchMonitor) WatchPods() error { + client, err := job.Kube.ClientSet() if err != nil { return err } @@ -245,242 +376,146 @@ func (job *JobWatchMonitor) RefreshMonitoredPods() error { if err != nil { return err } - job.kube.Log("jobManifest: %+v", jobManifest) // TODO: remove selector, err := metav1.LabelSelectorAsSelector(jobManifest.Spec.Selector) if err != nil { return err } - podList, err := client.Core(). + podListWatcher, err := client.Core(). Pods(job.Namespace). - List(metav1.ListOptions{LabelSelector: selector.String()}) + Watch(metav1.ListOptions{ + Watch: true, + LabelSelector: selector.String(), + }) if err != nil { return err } - job.kube.Log("podList: %+v", podList) // TODO: remove - // TODO filter out pods that does not belong to controller-uid=job-uid + // TODO calculate timeout since job-watch started + _, err = watch.Until(job.Timeout, podListWatcher, func(e watch.Event) (bool, error) { + job.Kube.Log("[DEBUG] Job %s pods list event: %+v", job.ResourceName, e) - // Add new pods to monitor queue in chronological order by creation timestamp - podItems := make([]core.Pod, 0) - for _, item := range podList.Items { - podItems = append(podItems, item) - } - sort.Slice(podItems, func(i, j int) bool { - return podItems[i].CreationTimestamp.Time.Before(podItems[j].CreationTimestamp.Time) - }) - -searchNewPods: - for _, item := range podItems { - // filter out under processing - for _, monitoredPod := range job.MonitoredPodsQueue { - // TODO is there a need to check resource-version change? - if monitoredPod.UID == item.UID { - continue searchNewPods - } + podObject, ok := e.Object.(*core.Pod) + if !ok { + return true, fmt.Errorf("Expected %s to be a *core.Pod, got %T", job.ResourceName, e.Object) } - // filter out already processed - for _, processedPodUID := range job.ProcessedPodsUIDs { - if processedPodUID == item.UID { - continue searchNewPods + + for _, pod := range job.MonitoredPods { + if pod.ResourceName == podObject.Name { + // Already under monitoring + return false, nil } } + // TODO constructor from job & podObject pod := &PodWatchMonitor{ WatchMonitor: WatchMonitor{ - kube: job.kube, - timeout: job.timeout, - watchFeed: job.watchFeed, + Kube: job.Kube, + Timeout: job.Timeout, - Namespace: job.Namespace, - ResourceName: item.Name, - UID: item.UID, + Namespace: job.Namespace, + ResourceName: podObject.Name, + InitialResourceVersion: "", }, + + PodLogChunk: job.PodLogChunk, + PodError: job.PodError, + Error: job.Error, + + ContainerMonitorStates: make(map[string]string), + ProcessedContainerLogTimestamps: make(map[string]time.Time), } - if err = pod.RefreshManifest(); err != nil { - return err - } - for _, containerConf := range pod.Manifest.Spec.InitContainers { + + for _, containerConf := range podObject.Spec.InitContainers { pod.InitContainersNames = append(pod.InitContainersNames, containerConf.Name) } - for _, containerConf := range pod.Manifest.Spec.Containers { + for _, containerConf := range podObject.Spec.Containers { pod.ContainersNames = append(pod.ContainersNames, containerConf.Name) } - job.MonitoredPodsQueue = append(job.MonitoredPodsQueue, pod) - } + job.MonitoredPods = append(job.MonitoredPods, pod) - job.kube.Log("RefreshMonitoredPods done: MonitoredPodsQueue: %+v", job.MonitoredPodsQueue) // TODO: remove + go func() { + err := pod.Watch() + if err != nil { + job.Error <- err + } + }() + + job.AddedPod <- pod + + return false, nil + }) return nil } +func (c *Client) WatchJobsTillDone(namespace string, reader io.Reader, watchFeed WatchFeed, timeout time.Duration) error { + infos, err := c.Build(namespace, reader) + if err != nil { + return err + } + + return perform(infos, func(info *resource.Info) error { + return c.watchJobTillDone(info, watchFeed, timeout) + }) +} + func (c *Client) watchJobTillDone(jobInfo *resource.Info, watchFeed WatchFeed, timeout time.Duration) error { if jobInfo.Mapping.GroupVersionKind.Kind != "Job" { return nil } - uid, err := jobInfo.Mapping.UID(jobInfo.Object) - if err != nil { - return err - } - + // TODO: constructor job := &JobWatchMonitor{ WatchMonitor: WatchMonitor{ - kube: c, - timeout: timeout, - watchFeed: watchFeed, + Kube: c, + Timeout: timeout, - Namespace: jobInfo.Namespace, - ResourceName: jobInfo.Name, - UID: uid, + Namespace: jobInfo.Namespace, + ResourceName: jobInfo.Name, + InitialResourceVersion: jobInfo.ResourceVersion, }, - } - if err := job.WaitTillResourceVersionAdded(jobInfo.ResourceVersion, jobInfo); err != nil { - return err // TODO - } + Started: make(chan bool, 0), + Succeeded: make(chan bool, 0), + AddedPod: make(chan *PodWatchMonitor, 10), + PodLogChunk: make(chan *PodLogChunk, 1000), - if err = job.RefreshMonitoredPods(); err != nil { - return err // TODO + PodError: make(chan PodError, 0), + Error: make(chan error, 0), } - var processPod *PodWatchMonitor - - // TODO: split method into corresponding functions - -TakeNextMonitoredPod: - for { - if processPod = job.TakeNextMonitoredPod(); processPod == nil { - break - } - - if err := processPod.RefreshManifest(); err != nil { - c.Log("Pod %s refresh manifest failed: %s", processPod.ResourceName, err) - // TODO stream system-error to feed - job.SetPodProcessed(processPod.UID) - continue TakeNextMonitoredPod - } - - c.Log("Pod manifest refreshed, ResourceVersion: %s", processPod.Manifest.ResourceVersion) - - if processPod.Manifest.Status.Phase == core.PodPending { - c.Log("Pod %s is in PENDING state", processPod.ResourceName) - - if podReadyCondition := processPod.GetReadyCondition(); podReadyCondition != nil { - c.Log("Pod %s ready condition: %+v", processPod.ResourceName, podReadyCondition) - if podReadyCondition.Status != core.ConditionTrue { - // TODO: init-containers-statuses - for _, containerStatus := range processPod.Manifest.Status.ContainerStatuses { - if containerStatus.Ready { - continue - } - if containerStatus.State.Waiting != nil { - c.Log("Pod %s container %s is in waiting state: %s: %s", processPod.ResourceName, containerStatus.Name, containerStatus.State.Waiting.Reason, containerStatus.State.Waiting.Message) - - switch containerStatus.State.Waiting.Reason { - case "ImagePullBackOff", "ErrImagePull": - // TODO stream bad_image user-error - processPod.SetContainerProcessed(containerStatus.Name, containerStatus.ContainerID) - case "CrashLoopBackOff": - // TODO stream container_crash user-error - processPod.SetContainerProcessed(containerStatus.Name, containerStatus.ContainerID) - } - } - if containerStatus.State.Terminated != nil { - // TODO dig more, think more. - // TODO not necessary because in that container state we still able to reach containers logs - // TODO what about failed state? we should stream user-error about incorrectly terminated container - // TODO init-container should be finally terminated in normal situation - // TODO that error should follow logs and not preceede them - // TODO so it is needed to move that if into after send-logs-for-container section - - c.Log("Pod %s container %s (%s) is in terminated state: %s: %s", processPod.ResourceName, containerStatus.Name, containerStatus.State.Terminated.ContainerID, containerStatus.State.Terminated.Reason, containerStatus.State.Terminated.Message) - processPod.SetContainerProcessed(containerStatus.Name, containerStatus.ContainerID) - } - } - - job.SchedulePodMonitoring(processPod) - time.Sleep(time.Duration(1) * time.Second) // TODO remove this - continue TakeNextMonitoredPod - // TODO: fetch state and stream to feed userspace-error - } - } - } - - // TODO: init-containers - - ProcessContainers: - for _, containerName := range processPod.GetMonitoredContainersNames() { - c.Log("Process pod %s container %s", processPod.ResourceName, containerName) - if containerStatus := processPod.GetContainerStatus(containerName); containerStatus != nil { - c.Log("Process pod %s container %s status: %+v", processPod.ResourceName, containerName, containerStatus) - if containerStatus.State.Waiting != nil { - if containerStatus.State.Waiting.Reason == "RunContainerError" { - // TODO: stream userspace-error container_stuck to watch feed - processPod.SetContainerProcessed(containerName, containerStatus.ContainerID) - continue ProcessContainers - } - } - } else { - c.Log("Process pod %s container %s status not available", processPod.ResourceName, containerName) - } - - logLines, err := processPod.GetContainerLogs(containerName) - if err != nil { - c.Log("Error getting job %s pod %s container %s log chunk: %s", job.ResourceName, processPod.ResourceName, containerName, err) - } - - chunk := &JobLogChunk{ - JobName: job.ResourceName, - PodName: processPod.ResourceName, - ContainerName: containerName, - LogLines: logLines, - } - if err = job.watchFeed.WriteJobLogChunk(chunk); err != nil { - c.Log("Error writing job %s pod %s container %s log chunk to watch feed: %s", chunk.JobName, chunk.PodName, chunk.ContainerName, err) - } + c.Log("[DEBUG] Starting job %s watcher", job.ResourceName) + go func() { + err := job.Watch() + if err != nil { + job.Error <- err } + }() - if len(processPod.GetMonitoredContainersNames()) > 0 { - job.SchedulePodMonitoring(processPod) + for { + select { + case <-job.Started: + c.Log("Job %s started", job.ResourceName) + // TODO watchFeed + case <-job.Succeeded: + return nil + case err := <-job.Error: + return err + case pod := <-job.AddedPod: + c.Log("Job %s pod %s added", job.ResourceName, pod.ResourceName) + case podLogChunk := <-job.PodLogChunk: + watchFeed.WriteJobLogChunk(JobLogChunk{ + PodLogChunk: *podLogChunk, + JobName: job.ResourceName, + }) + case podError := <-job.PodError: + watchFeed.WriteJobPodError(JobPodError{ + JobName: job.ResourceName, + PodError: podError, + }) } - - time.Sleep(time.Duration(1) * time.Second) // TODO: show logs flawlessly without any suspension if there is something to show, also use ticker - } - - // TODO: wait till job done event - // TODO: make refresh for pods while waiting: job.RefreshMonitoredPods() - // TODO: it is not necessary to refresh list on every tick - - // TODO: add following event watch before ending this function - // switch e.Type { - // case watch.Added, watch.Modified: - // c.Log("o = %v", o) - // c.Log("o.Status = %v", o.Status) - // c.Log("o.Status.Conditions = %v", o.Status.Conditions) - // for _, c := range o.Status.Conditions { - // if c.Type == batchinternal.JobComplete && c.Status == core.ConditionTrue { - // return true, nil - // } else if c.Type == batchinternal.JobFailed && c.Status == core.ConditionTrue { - // return true, fmt.Errorf("Job failed: %s", c.Reason) - // } - // } - // c.Log("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", jobInfo.Name, o.Status.Active, o.Status.Failed, o.Status.Succeeded) - // return false, nil - // } - - return nil -} - -func (c *Client) WatchJobsTillDone(namespace string, reader io.Reader, watchFeed WatchFeed, timeout time.Duration) error { - infos, err := c.Build(namespace, reader) - if err != nil { - return err } - - return perform(infos, func(info *resource.Info) error { - return c.watchJobTillDone(info, watchFeed, timeout) - }) } diff --git a/pkg/proto/hapi/release/hook.pb.go b/pkg/proto/hapi/release/hook.pb.go index a12279b7d..f7946e185 100644 --- a/pkg/proto/hapi/release/hook.pb.go +++ b/pkg/proto/hapi/release/hook.pb.go @@ -22,6 +22,7 @@ It has these top-level messages: TestSuite LogLine JobLogChunk + JobPodError WatchFeed */ package release diff --git a/pkg/proto/hapi/release/watch_feed.pb.go b/pkg/proto/hapi/release/watch_feed.pb.go index 6c34b03a2..e73e78d42 100644 --- a/pkg/proto/hapi/release/watch_feed.pb.go +++ b/pkg/proto/hapi/release/watch_feed.pb.go @@ -76,14 +76,55 @@ func (m *JobLogChunk) GetLogLines() []*LogLine { return nil } +type JobPodError struct { + JobName string `protobuf:"bytes,1,opt,name=job_name,json=jobName" json:"job_name,omitempty"` + PodName string `protobuf:"bytes,2,opt,name=pod_name,json=podName" json:"pod_name,omitempty"` + ContainerName string `protobuf:"bytes,3,opt,name=container_name,json=containerName" json:"container_name,omitempty"` + Message string `protobuf:"bytes,4,opt,name=message" json:"message,omitempty"` +} + +func (m *JobPodError) Reset() { *m = JobPodError{} } +func (m *JobPodError) String() string { return proto.CompactTextString(m) } +func (*JobPodError) ProtoMessage() {} +func (*JobPodError) Descriptor() ([]byte, []int) { return fileDescriptor6, []int{2} } + +func (m *JobPodError) GetJobName() string { + if m != nil { + return m.JobName + } + return "" +} + +func (m *JobPodError) GetPodName() string { + if m != nil { + return m.PodName + } + return "" +} + +func (m *JobPodError) GetContainerName() string { + if m != nil { + return m.ContainerName + } + return "" +} + +func (m *JobPodError) GetMessage() string { + if m != nil { + return m.Message + } + return "" +} + type WatchFeed struct { JobLogChunk *JobLogChunk `protobuf:"bytes,1,opt,name=job_log_chunk,json=jobLogChunk" json:"job_log_chunk,omitempty"` + JobPodError *JobPodError `protobuf:"bytes,2,opt,name=job_pod_error,json=jobPodError" json:"job_pod_error,omitempty"` } func (m *WatchFeed) Reset() { *m = WatchFeed{} } func (m *WatchFeed) String() string { return proto.CompactTextString(m) } func (*WatchFeed) ProtoMessage() {} -func (*WatchFeed) Descriptor() ([]byte, []int) { return fileDescriptor6, []int{2} } +func (*WatchFeed) Descriptor() ([]byte, []int) { return fileDescriptor6, []int{3} } func (m *WatchFeed) GetJobLogChunk() *JobLogChunk { if m != nil { @@ -92,31 +133,42 @@ func (m *WatchFeed) GetJobLogChunk() *JobLogChunk { return nil } +func (m *WatchFeed) GetJobPodError() *JobPodError { + if m != nil { + return m.JobPodError + } + return nil +} + func init() { proto.RegisterType((*LogLine)(nil), "hapi.release.LogLine") proto.RegisterType((*JobLogChunk)(nil), "hapi.release.JobLogChunk") + proto.RegisterType((*JobPodError)(nil), "hapi.release.JobPodError") proto.RegisterType((*WatchFeed)(nil), "hapi.release.WatchFeed") } func init() { proto.RegisterFile("hapi/release/watch_feed.proto", fileDescriptor6) } var fileDescriptor6 = []byte{ - // 258 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x90, 0x41, 0x4b, 0xc3, 0x40, - 0x14, 0x84, 0x89, 0x2d, 0xa6, 0x79, 0xb1, 0x1e, 0x16, 0x84, 0x14, 0x14, 0x4a, 0x40, 0xe8, 0x29, - 0x85, 0x78, 0x14, 0x2f, 0x0a, 0x1e, 0x4a, 0xf0, 0x90, 0x8b, 0xe0, 0x25, 0x6c, 0xb2, 0xcf, 0x64, - 0x63, 0xb2, 0x6f, 0x49, 0x56, 0xfc, 0x37, 0xfe, 0x56, 0xd9, 0x4d, 0xb0, 0xed, 0xed, 0xed, 0x7c, - 0xc3, 0x30, 0xb3, 0x70, 0xd7, 0x70, 0x2d, 0xf7, 0x03, 0x76, 0xc8, 0x47, 0xdc, 0xff, 0x70, 0x53, - 0x35, 0xc5, 0x27, 0xa2, 0x48, 0xf4, 0x40, 0x86, 0xd8, 0x95, 0xc5, 0xc9, 0x8c, 0xe3, 0x47, 0xf0, - 0x33, 0xaa, 0x33, 0xa9, 0x90, 0xdd, 0x42, 0x60, 0x64, 0x8f, 0xa3, 0xe1, 0xbd, 0x8e, 0xbc, 0xad, - 0xb7, 0x0b, 0xf2, 0xa3, 0xc0, 0x18, 0x2c, 0x05, 0x37, 0x3c, 0xba, 0x70, 0xc0, 0xdd, 0xf1, 0xaf, - 0x07, 0xe1, 0x81, 0xca, 0x8c, 0xea, 0x97, 0xe6, 0x5b, 0x7d, 0xb1, 0x0d, 0xac, 0x5a, 0x2a, 0x0b, - 0xc5, 0x7b, 0x9c, 0x03, 0xfc, 0x96, 0xca, 0x37, 0xde, 0xa3, 0x45, 0x9a, 0xc4, 0x84, 0xa6, 0x08, - 0x5f, 0x93, 0x70, 0xe8, 0x1e, 0xae, 0x2b, 0x52, 0x86, 0x4b, 0x85, 0xc3, 0x64, 0x58, 0x38, 0xc3, - 0xfa, 0x5f, 0x75, 0xb6, 0x14, 0x82, 0x8e, 0xea, 0xa2, 0x93, 0x0a, 0xc7, 0x68, 0xb9, 0x5d, 0xec, - 0xc2, 0xf4, 0x26, 0x39, 0xdd, 0x92, 0xcc, 0x43, 0xf2, 0x55, 0x37, 0x1d, 0x63, 0x7c, 0x80, 0xe0, - 0xdd, 0xee, 0x7f, 0x45, 0x14, 0xec, 0x09, 0xd6, 0xb6, 0x9d, 0x0d, 0xa9, 0x6c, 0x5d, 0x57, 0x31, - 0x4c, 0x37, 0xe7, 0x21, 0x27, 0x7b, 0xf2, 0xb0, 0x3d, 0x3e, 0x9e, 0x83, 0x0f, 0x7f, 0xf6, 0x94, - 0x97, 0xee, 0x27, 0x1f, 0xfe, 0x02, 0x00, 0x00, 0xff, 0xff, 0x6f, 0x51, 0x56, 0xb2, 0x6a, 0x01, - 0x00, 0x00, + // 307 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x52, 0x4f, 0x4b, 0xfb, 0x40, + 0x10, 0x25, 0xbf, 0x96, 0x5f, 0x9a, 0x8d, 0xf5, 0xb0, 0x20, 0xa4, 0xa0, 0x50, 0x02, 0x42, 0x4f, + 0x29, 0xd4, 0xa3, 0x78, 0x51, 0xf4, 0x20, 0x45, 0xa4, 0x17, 0xc1, 0x4b, 0xd8, 0x64, 0xc7, 0xfc, + 0x31, 0xd9, 0x09, 0xbb, 0x2b, 0x7e, 0x01, 0x2f, 0x7e, 0x09, 0x3f, 0xab, 0xec, 0x9f, 0x6a, 0x04, + 0xaf, 0xde, 0x66, 0xe6, 0xbd, 0x19, 0xde, 0x7b, 0xbb, 0xe4, 0xa4, 0x66, 0x43, 0xb3, 0x96, 0xd0, + 0x01, 0x53, 0xb0, 0x7e, 0x65, 0xba, 0xac, 0xf3, 0x27, 0x00, 0x9e, 0x0d, 0x12, 0x35, 0xd2, 0x03, + 0x03, 0x67, 0x1e, 0x4e, 0xcf, 0x49, 0xb8, 0xc5, 0x6a, 0xdb, 0x08, 0xa0, 0xc7, 0x24, 0xd2, 0x4d, + 0x0f, 0x4a, 0xb3, 0x7e, 0x48, 0x82, 0x65, 0xb0, 0x8a, 0x76, 0xdf, 0x03, 0x4a, 0xc9, 0x94, 0x33, + 0xcd, 0x92, 0x7f, 0x16, 0xb0, 0x75, 0xfa, 0x11, 0x90, 0xf8, 0x16, 0x8b, 0x2d, 0x56, 0x57, 0xf5, + 0x8b, 0x78, 0xa6, 0x0b, 0x32, 0x6b, 0xb1, 0xc8, 0x05, 0xeb, 0xc1, 0x1f, 0x08, 0x5b, 0x2c, 0xee, + 0x58, 0x0f, 0x06, 0x1a, 0x90, 0x3b, 0xc8, 0x9d, 0x08, 0x07, 0xe4, 0x16, 0x3a, 0x25, 0x87, 0x25, + 0x0a, 0xcd, 0x1a, 0x01, 0xd2, 0x11, 0x26, 0x96, 0x30, 0xff, 0x9a, 0x5a, 0xda, 0x86, 0x44, 0x1d, + 0x56, 0x79, 0xd7, 0x08, 0x50, 0xc9, 0x74, 0x39, 0x59, 0xc5, 0x9b, 0xa3, 0x6c, 0xec, 0x25, 0xf3, + 0x46, 0x76, 0xb3, 0xce, 0x15, 0x2a, 0x7d, 0x73, 0x02, 0xef, 0x91, 0x5f, 0x4b, 0x89, 0xf2, 0x6f, + 0x05, 0x26, 0x24, 0xec, 0x41, 0x29, 0x56, 0x41, 0x32, 0x75, 0x07, 0x7c, 0x9b, 0xbe, 0x07, 0x24, + 0x7a, 0x30, 0xef, 0x70, 0x03, 0xc0, 0xe9, 0x05, 0x99, 0x1b, 0x11, 0xc6, 0x4c, 0x69, 0x62, 0xb3, + 0x4a, 0xe2, 0xcd, 0xe2, 0xa7, 0x99, 0x51, 0xae, 0xbb, 0xb8, 0x1d, 0x85, 0xec, 0xd7, 0x8d, 0x58, + 0x30, 0xa6, 0xac, 0xda, 0xdf, 0xd6, 0xf7, 0xae, 0xed, 0xfa, 0xbe, 0xb9, 0x8c, 0x1e, 0x43, 0xcf, + 0x29, 0xfe, 0xdb, 0x0f, 0x71, 0xf6, 0x19, 0x00, 0x00, 0xff, 0xff, 0xf8, 0x73, 0x41, 0xa7, 0x31, + 0x02, 0x00, 0x00, } diff --git a/pkg/tiller/release_install.go b/pkg/tiller/release_install.go index 17daf79e7..9fbed98c6 100644 --- a/pkg/tiller/release_install.go +++ b/pkg/tiller/release_install.go @@ -191,28 +191,47 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install // pre-install hooks if !req.DisableHooks { - handleLogChunk := kube.WriteJobLogChunkFunc(func(chunk *kube.JobLogChunk) error { - chunkResp := &services.InstallReleaseResponse{ - WatchFeed: &release.WatchFeed{ - JobLogChunk: &release.JobLogChunk{}, - }, - } - chunkResp.WatchFeed.JobLogChunk.LogLines = make([]*release.LogLine, 0) + watchFeed := &kube.WatchFeedProto{ + WriteJobLogChunkFunc: func(chunk kube.JobLogChunk) error { + chunkResp := &services.InstallReleaseResponse{ + WatchFeed: &release.WatchFeed{ + JobLogChunk: &release.JobLogChunk{ + JobName: chunk.JobName, + PodName: chunk.PodName, + ContainerName: chunk.ContainerName, + LogLines: make([]*release.LogLine, 0), + }, + }, + } - for _, line := range chunk.LogLines { - ll := &release.LogLine{ - Timestamp: line.Timestamp, - Data: line.Data, + for _, line := range chunk.LogLines { + ll := &release.LogLine{ + Timestamp: line.Timestamp, + Data: line.Data, + } + chunkResp.WatchFeed.JobLogChunk.LogLines = append(chunkResp.WatchFeed.JobLogChunk.LogLines, ll) } - chunkResp.WatchFeed.JobLogChunk.LogLines = append(chunkResp.WatchFeed.JobLogChunk.LogLines, ll) - } - return stream.Send(chunkResp) - }) + return stream.Send(chunkResp) + }, + WriteJobPodErrorFunc: func(obj kube.JobPodError) error { + chunkResp := &services.InstallReleaseResponse{ + WatchFeed: &release.WatchFeed{ + JobPodError: &release.JobPodError{ + JobName: obj.JobName, + PodName: obj.PodName, + ContainerName: obj.ContainerName, + Message: obj.Message, + }, + }, + } + return stream.Send(chunkResp) + }, + } // TODO watch job with feed only if job have annotation "helm/watch-logs": "true" // TODO otherwise watch as ordinary hook just like before, using WatchUntilReady - if err := s.execHookWithWatchFeed(r.Hooks, r.Name, r.Namespace, hooks.PreInstall, req.Timeout, handleLogChunk); err != nil { + if err := s.execHookWithWatchFeed(r.Hooks, r.Name, r.Namespace, hooks.PreInstall, req.Timeout, watchFeed); err != nil { return res, err } } else { diff --git a/pkg/tiller/release_server.go b/pkg/tiller/release_server.go index 19c021c32..1fb9486e3 100644 --- a/pkg/tiller/release_server.go +++ b/pkg/tiller/release_server.go @@ -365,7 +365,7 @@ func (s *ReleaseServer) recordRelease(r *release.Release, reuse bool) { } func (s *ReleaseServer) execHook(hs []*release.Hook, name, namespace, hook string, timeout int64) error { - return s.execHookWithWatchFeed(hs, name, namespace, hook, timeout, kube.WriteJobLogChunkFunc(func(*kube.JobLogChunk) error { return nil })) + return s.execHookWithWatchFeed(hs, name, namespace, hook, timeout, kube.WatchFeedStub) } func (s *ReleaseServer) execHookWithWatchFeed(hs []*release.Hook, name, namespace, hook string, timeout int64, watchFeed kube.WatchFeed) error {