mirror of https://github.com/helm/helm
Data flow: * Kube-client returns logs to ReleaseServer using provided callbacks. * ReleaseServer returns logs to ReleaseClient using grpc server-side streaming. TODO: backward compatible grpc callspull/3479/head
parent
2ef61fdf5c
commit
910b80fd1e
@ -0,0 +1,34 @@
|
|||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
package hapi.release;
|
||||||
|
|
||||||
|
option go_package = "release";
|
||||||
|
|
||||||
|
message LogLine {
|
||||||
|
string timestamp = 1;
|
||||||
|
string data = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message JobLogChunk {
|
||||||
|
string job_name = 1;
|
||||||
|
string pod_name = 2;
|
||||||
|
string container_name = 3;
|
||||||
|
repeated LogLine log_lines = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
message WatchFeed {
|
||||||
|
JobLogChunk job_log_chunk = 1;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* WatchFeed could contain one or multiple events from tiller at the same time.
|
||||||
|
* All events are related to the process of making release and needed for the helm-client.
|
||||||
|
* For now there is only one event: JobLogChunk, needed for helm-client to show hooks logs during release.
|
||||||
|
*
|
||||||
|
* Suggestions for future events:
|
||||||
|
* JobStarted job_started = n;
|
||||||
|
* JobStarted job_finished = n;
|
||||||
|
* DeploymentStatus deployment_status = n;
|
||||||
|
* SomethingUseful something_useful = n;
|
||||||
|
* SomethingUseless something_useless = n;
|
||||||
|
*/
|
||||||
|
}
|
@ -0,0 +1,486 @@
|
|||||||
|
package kube
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
_ "fmt"
|
||||||
|
"io"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
|
"k8s.io/kubernetes/pkg/apis/core"
|
||||||
|
"k8s.io/kubernetes/pkg/kubectl/resource"
|
||||||
|
)
|
||||||
|
|
||||||
|
type WatchFeed interface {
|
||||||
|
WriteJobLogChunk(*JobLogChunk) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type LogLine struct {
|
||||||
|
Timestamp string
|
||||||
|
Data string
|
||||||
|
}
|
||||||
|
|
||||||
|
type JobLogChunk struct {
|
||||||
|
JobName string
|
||||||
|
PodName string
|
||||||
|
ContainerName string
|
||||||
|
LogLines []LogLine
|
||||||
|
}
|
||||||
|
|
||||||
|
type WriteJobLogChunkFunc func(*JobLogChunk) error
|
||||||
|
|
||||||
|
func (f WriteJobLogChunkFunc) WriteJobLogChunk(chunk *JobLogChunk) error {
|
||||||
|
return f(chunk)
|
||||||
|
}
|
||||||
|
|
||||||
|
type WatchMonitor struct {
|
||||||
|
kube *Client
|
||||||
|
timeout time.Duration
|
||||||
|
watchFeed WatchFeed
|
||||||
|
|
||||||
|
Namespace string
|
||||||
|
ResourceName string
|
||||||
|
UID types.UID
|
||||||
|
}
|
||||||
|
|
||||||
|
type PodWatchMonitor struct {
|
||||||
|
WatchMonitor
|
||||||
|
|
||||||
|
Manifest *core.Pod
|
||||||
|
|
||||||
|
InitContainersNames []string
|
||||||
|
ProcessedInitContainersNames []string
|
||||||
|
ProcessedInitContainersIDs []string
|
||||||
|
|
||||||
|
ContainersNames []string
|
||||||
|
ProcessedContainersNames []string
|
||||||
|
ProcessedContainersIDs []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pod *PodWatchMonitor) GetMonitoredContainersNames() []string {
|
||||||
|
res := make([]string, 0)
|
||||||
|
|
||||||
|
FilterProcessedContainersNames:
|
||||||
|
for _, name := range pod.ContainersNames {
|
||||||
|
for _, processedContainerName := range pod.ProcessedContainersNames {
|
||||||
|
if processedContainerName == name {
|
||||||
|
continue FilterProcessedContainersNames
|
||||||
|
}
|
||||||
|
}
|
||||||
|
res = append(res, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pod *PodWatchMonitor) SetContainerProcessed(containerName string, containerID string) {
|
||||||
|
pod.ProcessedContainersNames = append(pod.ProcessedContainersNames, containerName)
|
||||||
|
pod.ProcessedContainersIDs = append(pod.ProcessedContainersIDs, containerID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pod *PodWatchMonitor) GetMonitoredInitContainersNames() []string {
|
||||||
|
res := make([]string, 0)
|
||||||
|
|
||||||
|
FilterProcessedInitContainersNames:
|
||||||
|
for _, name := range pod.InitContainersNames {
|
||||||
|
for _, processedInitContainerName := range pod.ProcessedInitContainersNames {
|
||||||
|
if processedInitContainerName == name {
|
||||||
|
continue FilterProcessedInitContainersNames
|
||||||
|
}
|
||||||
|
}
|
||||||
|
res = append(res, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pod *PodWatchMonitor) SetInitContainerProcessed(containerName string, containerID string) {
|
||||||
|
pod.ProcessedInitContainersNames = append(pod.ProcessedInitContainersNames, containerName)
|
||||||
|
pod.ProcessedInitContainersIDs = append(pod.ProcessedInitContainersIDs, containerID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pod *PodWatchMonitor) RefreshManifest() error {
|
||||||
|
client, err := pod.kube.ClientSet()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
manifest, err := client.Core().
|
||||||
|
Pods(pod.Namespace).
|
||||||
|
Get(pod.ResourceName, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
pod.Manifest = manifest
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pod *PodWatchMonitor) GetReadyCondition() (res *core.PodCondition) {
|
||||||
|
for _, podCondition := range pod.Manifest.Status.Conditions {
|
||||||
|
if podCondition.Type == "Ready" {
|
||||||
|
res = &podCondition
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pod *PodWatchMonitor) GetInitContainerStatus(containerName string) (res *core.ContainerStatus) {
|
||||||
|
for _, containerStatus := range pod.Manifest.Status.ContainerStatuses {
|
||||||
|
if containerStatus.Name == containerName {
|
||||||
|
res = &containerStatus
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pod *PodWatchMonitor) GetContainerStatus(containerName string) (res *core.ContainerStatus) {
|
||||||
|
for _, containerStatus := range pod.Manifest.Status.ContainerStatuses {
|
||||||
|
if containerStatus.Name == containerName {
|
||||||
|
res = &containerStatus
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pod *PodWatchMonitor) GetContainerLogs(containerName string) ([]LogLine, error) {
|
||||||
|
client, err := pod.kube.ClientSet()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req := client.Core().
|
||||||
|
Pods(pod.Namespace).
|
||||||
|
GetLogs(pod.ResourceName, &core.PodLogOptions{
|
||||||
|
Container: containerName,
|
||||||
|
Timestamps: true,
|
||||||
|
})
|
||||||
|
|
||||||
|
readCloser, err := req.Stream()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer readCloser.Close()
|
||||||
|
|
||||||
|
buf := bytes.Buffer{}
|
||||||
|
_, err = io.Copy(&buf, readCloser)
|
||||||
|
|
||||||
|
res := make([]LogLine, 0)
|
||||||
|
for _, line := range strings.Split(strings.TrimSuffix(buf.String(), "\n"), "\n") {
|
||||||
|
lineParts := strings.SplitN(line, " ", 2)
|
||||||
|
// TODO: receive only new log lines, save state into PodWatchMonitor
|
||||||
|
if len(lineParts) == 2 {
|
||||||
|
ll := LogLine{
|
||||||
|
Timestamp: lineParts[0],
|
||||||
|
Data: lineParts[1],
|
||||||
|
}
|
||||||
|
res = append(res, ll)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type JobWatchMonitor struct {
|
||||||
|
WatchMonitor
|
||||||
|
|
||||||
|
MonitoredPodsQueue []*PodWatchMonitor
|
||||||
|
ProcessedPodsUIDs []types.UID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (job *JobWatchMonitor) WaitTillResourceVersionAdded(resourceVersion string, jobInfo *resource.Info) error {
|
||||||
|
w, err := resource.
|
||||||
|
NewHelper(jobInfo.Client, jobInfo.Mapping).
|
||||||
|
WatchSingle(job.Namespace, job.ResourceName, resourceVersion)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = watch.Until(job.timeout, w, func(e watch.Event) (bool, error) {
|
||||||
|
if e.Type == watch.Added {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (job *JobWatchMonitor) TakeNextMonitoredPod() *PodWatchMonitor {
|
||||||
|
if len(job.MonitoredPodsQueue) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var res *PodWatchMonitor
|
||||||
|
res, job.MonitoredPodsQueue = job.MonitoredPodsQueue[0], job.MonitoredPodsQueue[1:]
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
func (job *JobWatchMonitor) SetPodProcessed(uid types.UID) {
|
||||||
|
job.ProcessedPodsUIDs = append(job.ProcessedPodsUIDs, uid)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (job *JobWatchMonitor) SchedulePodMonitoring(pod *PodWatchMonitor) {
|
||||||
|
job.MonitoredPodsQueue = append(job.MonitoredPodsQueue, pod)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (job *JobWatchMonitor) RefreshMonitoredPods() error {
|
||||||
|
job.kube.Log("RefreshMonitoredPods") // TODO: remove
|
||||||
|
|
||||||
|
client, err := job.kube.ClientSet()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
jobManifest, err := client.Batch().
|
||||||
|
Jobs(job.Namespace).
|
||||||
|
Get(job.ResourceName, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
job.kube.Log("jobManifest: %+v", jobManifest) // TODO: remove
|
||||||
|
|
||||||
|
selector, err := metav1.LabelSelectorAsSelector(jobManifest.Spec.Selector)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
podList, err := client.Core().
|
||||||
|
Pods(job.Namespace).
|
||||||
|
List(metav1.ListOptions{LabelSelector: selector.String()})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
job.kube.Log("podList: %+v", podList) // TODO: remove
|
||||||
|
|
||||||
|
// TODO filter out pods that does not belong to controller-uid=job-uid
|
||||||
|
|
||||||
|
// Add new pods to monitor queue in chronological order by creation timestamp
|
||||||
|
podItems := make([]core.Pod, 0)
|
||||||
|
for _, item := range podList.Items {
|
||||||
|
podItems = append(podItems, item)
|
||||||
|
}
|
||||||
|
sort.Slice(podItems, func(i, j int) bool {
|
||||||
|
return podItems[i].CreationTimestamp.Time.Before(podItems[j].CreationTimestamp.Time)
|
||||||
|
})
|
||||||
|
|
||||||
|
searchNewPods:
|
||||||
|
for _, item := range podItems {
|
||||||
|
// filter out under processing
|
||||||
|
for _, monitoredPod := range job.MonitoredPodsQueue {
|
||||||
|
// TODO is there a need to check resource-version change?
|
||||||
|
if monitoredPod.UID == item.UID {
|
||||||
|
continue searchNewPods
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// filter out already processed
|
||||||
|
for _, processedPodUID := range job.ProcessedPodsUIDs {
|
||||||
|
if processedPodUID == item.UID {
|
||||||
|
continue searchNewPods
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pod := &PodWatchMonitor{
|
||||||
|
WatchMonitor: WatchMonitor{
|
||||||
|
kube: job.kube,
|
||||||
|
timeout: job.timeout,
|
||||||
|
watchFeed: job.watchFeed,
|
||||||
|
|
||||||
|
Namespace: job.Namespace,
|
||||||
|
ResourceName: item.Name,
|
||||||
|
UID: item.UID,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err = pod.RefreshManifest(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, containerConf := range pod.Manifest.Spec.InitContainers {
|
||||||
|
pod.InitContainersNames = append(pod.InitContainersNames, containerConf.Name)
|
||||||
|
}
|
||||||
|
for _, containerConf := range pod.Manifest.Spec.Containers {
|
||||||
|
pod.ContainersNames = append(pod.ContainersNames, containerConf.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
job.MonitoredPodsQueue = append(job.MonitoredPodsQueue, pod)
|
||||||
|
}
|
||||||
|
|
||||||
|
job.kube.Log("RefreshMonitoredPods done: MonitoredPodsQueue: %+v", job.MonitoredPodsQueue) // TODO: remove
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) watchJobTillDone(jobInfo *resource.Info, watchFeed WatchFeed, timeout time.Duration) error {
|
||||||
|
if jobInfo.Mapping.GroupVersionKind.Kind != "Job" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
uid, err := jobInfo.Mapping.UID(jobInfo.Object)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
job := &JobWatchMonitor{
|
||||||
|
WatchMonitor: WatchMonitor{
|
||||||
|
kube: c,
|
||||||
|
timeout: timeout,
|
||||||
|
watchFeed: watchFeed,
|
||||||
|
|
||||||
|
Namespace: jobInfo.Namespace,
|
||||||
|
ResourceName: jobInfo.Name,
|
||||||
|
UID: uid,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := job.WaitTillResourceVersionAdded(jobInfo.ResourceVersion, jobInfo); err != nil {
|
||||||
|
return err // TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = job.RefreshMonitoredPods(); err != nil {
|
||||||
|
return err // TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
var processPod *PodWatchMonitor
|
||||||
|
|
||||||
|
// TODO: split method into corresponding functions
|
||||||
|
|
||||||
|
TakeNextMonitoredPod:
|
||||||
|
for {
|
||||||
|
if processPod = job.TakeNextMonitoredPod(); processPod == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := processPod.RefreshManifest(); err != nil {
|
||||||
|
c.Log("Pod %s refresh manifest failed: %s", processPod.ResourceName, err)
|
||||||
|
// TODO stream system-error to feed
|
||||||
|
job.SetPodProcessed(processPod.UID)
|
||||||
|
continue TakeNextMonitoredPod
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Log("Pod manifest refreshed, ResourceVersion: %s", processPod.Manifest.ResourceVersion)
|
||||||
|
|
||||||
|
if processPod.Manifest.Status.Phase == core.PodPending {
|
||||||
|
c.Log("Pod %s is in PENDING state", processPod.ResourceName)
|
||||||
|
|
||||||
|
if podReadyCondition := processPod.GetReadyCondition(); podReadyCondition != nil {
|
||||||
|
c.Log("Pod %s ready condition: %+v", processPod.ResourceName, podReadyCondition)
|
||||||
|
if podReadyCondition.Status != core.ConditionTrue {
|
||||||
|
// TODO: init-containers-statuses
|
||||||
|
for _, containerStatus := range processPod.Manifest.Status.ContainerStatuses {
|
||||||
|
if containerStatus.Ready {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if containerStatus.State.Waiting != nil {
|
||||||
|
c.Log("Pod %s container %s is in waiting state: %s: %s", processPod.ResourceName, containerStatus.Name, containerStatus.State.Waiting.Reason, containerStatus.State.Waiting.Message)
|
||||||
|
|
||||||
|
switch containerStatus.State.Waiting.Reason {
|
||||||
|
case "ImagePullBackOff", "ErrImagePull":
|
||||||
|
// TODO stream bad_image user-error
|
||||||
|
processPod.SetContainerProcessed(containerStatus.Name, containerStatus.ContainerID)
|
||||||
|
case "CrashLoopBackOff":
|
||||||
|
// TODO stream container_crash user-error
|
||||||
|
processPod.SetContainerProcessed(containerStatus.Name, containerStatus.ContainerID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if containerStatus.State.Terminated != nil {
|
||||||
|
// TODO dig more, think more.
|
||||||
|
// TODO not necessary because in that container state we still able to reach containers logs
|
||||||
|
// TODO what about failed state? we should stream user-error about incorrectly terminated container
|
||||||
|
// TODO init-container should be finally terminated in normal situation
|
||||||
|
// TODO that error should follow logs and not preceede them
|
||||||
|
// TODO so it is needed to move that if into after send-logs-for-container section
|
||||||
|
|
||||||
|
c.Log("Pod %s container %s (%s) is in terminated state: %s: %s", processPod.ResourceName, containerStatus.Name, containerStatus.State.Terminated.ContainerID, containerStatus.State.Terminated.Reason, containerStatus.State.Terminated.Message)
|
||||||
|
processPod.SetContainerProcessed(containerStatus.Name, containerStatus.ContainerID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
job.SchedulePodMonitoring(processPod)
|
||||||
|
time.Sleep(time.Duration(1) * time.Second) // TODO remove this
|
||||||
|
continue TakeNextMonitoredPod
|
||||||
|
// TODO: fetch state and stream to feed userspace-error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: init-containers
|
||||||
|
|
||||||
|
ProcessContainers:
|
||||||
|
for _, containerName := range processPod.GetMonitoredContainersNames() {
|
||||||
|
c.Log("Process pod %s container %s", processPod.ResourceName, containerName)
|
||||||
|
if containerStatus := processPod.GetContainerStatus(containerName); containerStatus != nil {
|
||||||
|
c.Log("Process pod %s container %s status: %+v", processPod.ResourceName, containerName, containerStatus)
|
||||||
|
if containerStatus.State.Waiting != nil {
|
||||||
|
if containerStatus.State.Waiting.Reason == "RunContainerError" {
|
||||||
|
// TODO: stream userspace-error container_stuck to watch feed
|
||||||
|
processPod.SetContainerProcessed(containerName, containerStatus.ContainerID)
|
||||||
|
continue ProcessContainers
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
c.Log("Process pod %s container %s status not available", processPod.ResourceName, containerName)
|
||||||
|
}
|
||||||
|
|
||||||
|
logLines, err := processPod.GetContainerLogs(containerName)
|
||||||
|
if err != nil {
|
||||||
|
c.Log("Error getting job %s pod %s container %s log chunk: %s", job.ResourceName, processPod.ResourceName, containerName, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
chunk := &JobLogChunk{
|
||||||
|
JobName: job.ResourceName,
|
||||||
|
PodName: processPod.ResourceName,
|
||||||
|
ContainerName: containerName,
|
||||||
|
LogLines: logLines,
|
||||||
|
}
|
||||||
|
if err = job.watchFeed.WriteJobLogChunk(chunk); err != nil {
|
||||||
|
c.Log("Error writing job %s pod %s container %s log chunk to watch feed: %s", chunk.JobName, chunk.PodName, chunk.ContainerName, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(processPod.GetMonitoredContainersNames()) > 0 {
|
||||||
|
job.SchedulePodMonitoring(processPod)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(time.Duration(1) * time.Second) // TODO: show logs flawlessly without any suspension if there is something to show, also use ticker
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: wait till job done event
|
||||||
|
// TODO: make refresh for pods while waiting: job.RefreshMonitoredPods()
|
||||||
|
// TODO: it is not necessary to refresh list on every tick
|
||||||
|
|
||||||
|
// TODO: add following event watch before ending this function
|
||||||
|
// switch e.Type {
|
||||||
|
// case watch.Added, watch.Modified:
|
||||||
|
// c.Log("o = %v", o)
|
||||||
|
// c.Log("o.Status = %v", o.Status)
|
||||||
|
// c.Log("o.Status.Conditions = %v", o.Status.Conditions)
|
||||||
|
// for _, c := range o.Status.Conditions {
|
||||||
|
// if c.Type == batchinternal.JobComplete && c.Status == core.ConditionTrue {
|
||||||
|
// return true, nil
|
||||||
|
// } else if c.Type == batchinternal.JobFailed && c.Status == core.ConditionTrue {
|
||||||
|
// return true, fmt.Errorf("Job failed: %s", c.Reason)
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// c.Log("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", jobInfo.Name, o.Status.Active, o.Status.Failed, o.Status.Succeeded)
|
||||||
|
// return false, nil
|
||||||
|
// }
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) WatchJobsTillDone(namespace string, reader io.Reader, watchFeed WatchFeed, timeout time.Duration) error {
|
||||||
|
infos, err := c.Build(namespace, reader)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return perform(infos, func(info *resource.Info) error {
|
||||||
|
return c.watchJobTillDone(info, watchFeed, timeout)
|
||||||
|
})
|
||||||
|
}
|
@ -0,0 +1,122 @@
|
|||||||
|
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||||
|
// source: hapi/release/watch_feed.proto
|
||||||
|
|
||||||
|
package release
|
||||||
|
|
||||||
|
import proto "github.com/golang/protobuf/proto"
|
||||||
|
import fmt "fmt"
|
||||||
|
import math "math"
|
||||||
|
|
||||||
|
// Reference imports to suppress errors if they are not otherwise used.
|
||||||
|
var _ = proto.Marshal
|
||||||
|
var _ = fmt.Errorf
|
||||||
|
var _ = math.Inf
|
||||||
|
|
||||||
|
type LogLine struct {
|
||||||
|
Timestamp string `protobuf:"bytes,1,opt,name=timestamp" json:"timestamp,omitempty"`
|
||||||
|
Data string `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *LogLine) Reset() { *m = LogLine{} }
|
||||||
|
func (m *LogLine) String() string { return proto.CompactTextString(m) }
|
||||||
|
func (*LogLine) ProtoMessage() {}
|
||||||
|
func (*LogLine) Descriptor() ([]byte, []int) { return fileDescriptor6, []int{0} }
|
||||||
|
|
||||||
|
func (m *LogLine) GetTimestamp() string {
|
||||||
|
if m != nil {
|
||||||
|
return m.Timestamp
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *LogLine) GetData() string {
|
||||||
|
if m != nil {
|
||||||
|
return m.Data
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
type JobLogChunk struct {
|
||||||
|
JobName string `protobuf:"bytes,1,opt,name=job_name,json=jobName" json:"job_name,omitempty"`
|
||||||
|
PodName string `protobuf:"bytes,2,opt,name=pod_name,json=podName" json:"pod_name,omitempty"`
|
||||||
|
ContainerName string `protobuf:"bytes,3,opt,name=container_name,json=containerName" json:"container_name,omitempty"`
|
||||||
|
LogLines []*LogLine `protobuf:"bytes,4,rep,name=log_lines,json=logLines" json:"log_lines,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *JobLogChunk) Reset() { *m = JobLogChunk{} }
|
||||||
|
func (m *JobLogChunk) String() string { return proto.CompactTextString(m) }
|
||||||
|
func (*JobLogChunk) ProtoMessage() {}
|
||||||
|
func (*JobLogChunk) Descriptor() ([]byte, []int) { return fileDescriptor6, []int{1} }
|
||||||
|
|
||||||
|
func (m *JobLogChunk) GetJobName() string {
|
||||||
|
if m != nil {
|
||||||
|
return m.JobName
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *JobLogChunk) GetPodName() string {
|
||||||
|
if m != nil {
|
||||||
|
return m.PodName
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *JobLogChunk) GetContainerName() string {
|
||||||
|
if m != nil {
|
||||||
|
return m.ContainerName
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *JobLogChunk) GetLogLines() []*LogLine {
|
||||||
|
if m != nil {
|
||||||
|
return m.LogLines
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type WatchFeed struct {
|
||||||
|
JobLogChunk *JobLogChunk `protobuf:"bytes,1,opt,name=job_log_chunk,json=jobLogChunk" json:"job_log_chunk,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *WatchFeed) Reset() { *m = WatchFeed{} }
|
||||||
|
func (m *WatchFeed) String() string { return proto.CompactTextString(m) }
|
||||||
|
func (*WatchFeed) ProtoMessage() {}
|
||||||
|
func (*WatchFeed) Descriptor() ([]byte, []int) { return fileDescriptor6, []int{2} }
|
||||||
|
|
||||||
|
func (m *WatchFeed) GetJobLogChunk() *JobLogChunk {
|
||||||
|
if m != nil {
|
||||||
|
return m.JobLogChunk
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
proto.RegisterType((*LogLine)(nil), "hapi.release.LogLine")
|
||||||
|
proto.RegisterType((*JobLogChunk)(nil), "hapi.release.JobLogChunk")
|
||||||
|
proto.RegisterType((*WatchFeed)(nil), "hapi.release.WatchFeed")
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() { proto.RegisterFile("hapi/release/watch_feed.proto", fileDescriptor6) }
|
||||||
|
|
||||||
|
var fileDescriptor6 = []byte{
|
||||||
|
// 258 bytes of a gzipped FileDescriptorProto
|
||||||
|
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x90, 0x41, 0x4b, 0xc3, 0x40,
|
||||||
|
0x14, 0x84, 0x89, 0x2d, 0xa6, 0x79, 0xb1, 0x1e, 0x16, 0x84, 0x14, 0x14, 0x4a, 0x40, 0xe8, 0x29,
|
||||||
|
0x85, 0x78, 0x14, 0x2f, 0x0a, 0x1e, 0x4a, 0xf0, 0x90, 0x8b, 0xe0, 0x25, 0x6c, 0xb2, 0xcf, 0x64,
|
||||||
|
0x63, 0xb2, 0x6f, 0x49, 0x56, 0xfc, 0x37, 0xfe, 0x56, 0xd9, 0x4d, 0xb0, 0xed, 0xed, 0xed, 0x7c,
|
||||||
|
0xc3, 0x30, 0xb3, 0x70, 0xd7, 0x70, 0x2d, 0xf7, 0x03, 0x76, 0xc8, 0x47, 0xdc, 0xff, 0x70, 0x53,
|
||||||
|
0x35, 0xc5, 0x27, 0xa2, 0x48, 0xf4, 0x40, 0x86, 0xd8, 0x95, 0xc5, 0xc9, 0x8c, 0xe3, 0x47, 0xf0,
|
||||||
|
0x33, 0xaa, 0x33, 0xa9, 0x90, 0xdd, 0x42, 0x60, 0x64, 0x8f, 0xa3, 0xe1, 0xbd, 0x8e, 0xbc, 0xad,
|
||||||
|
0xb7, 0x0b, 0xf2, 0xa3, 0xc0, 0x18, 0x2c, 0x05, 0x37, 0x3c, 0xba, 0x70, 0xc0, 0xdd, 0xf1, 0xaf,
|
||||||
|
0x07, 0xe1, 0x81, 0xca, 0x8c, 0xea, 0x97, 0xe6, 0x5b, 0x7d, 0xb1, 0x0d, 0xac, 0x5a, 0x2a, 0x0b,
|
||||||
|
0xc5, 0x7b, 0x9c, 0x03, 0xfc, 0x96, 0xca, 0x37, 0xde, 0xa3, 0x45, 0x9a, 0xc4, 0x84, 0xa6, 0x08,
|
||||||
|
0x5f, 0x93, 0x70, 0xe8, 0x1e, 0xae, 0x2b, 0x52, 0x86, 0x4b, 0x85, 0xc3, 0x64, 0x58, 0x38, 0xc3,
|
||||||
|
0xfa, 0x5f, 0x75, 0xb6, 0x14, 0x82, 0x8e, 0xea, 0xa2, 0x93, 0x0a, 0xc7, 0x68, 0xb9, 0x5d, 0xec,
|
||||||
|
0xc2, 0xf4, 0x26, 0x39, 0xdd, 0x92, 0xcc, 0x43, 0xf2, 0x55, 0x37, 0x1d, 0x63, 0x7c, 0x80, 0xe0,
|
||||||
|
0xdd, 0xee, 0x7f, 0x45, 0x14, 0xec, 0x09, 0xd6, 0xb6, 0x9d, 0x0d, 0xa9, 0x6c, 0x5d, 0x57, 0x31,
|
||||||
|
0x4c, 0x37, 0xe7, 0x21, 0x27, 0x7b, 0xf2, 0xb0, 0x3d, 0x3e, 0x9e, 0x83, 0x0f, 0x7f, 0xf6, 0x94,
|
||||||
|
0x97, 0xee, 0x27, 0x1f, 0xfe, 0x02, 0x00, 0x00, 0xff, 0xff, 0x6f, 0x51, 0x56, 0xb2, 0x6a, 0x01,
|
||||||
|
0x00, 0x00,
|
||||||
|
}
|
Loading…
Reference in new issue