diff --git a/_proto/hapi/release/watch_feed.proto b/_proto/hapi/release/watch_feed.proto new file mode 100644 index 000000000..183724dba --- /dev/null +++ b/_proto/hapi/release/watch_feed.proto @@ -0,0 +1,34 @@ +syntax = "proto3"; + +package hapi.release; + +option go_package = "release"; + +message LogLine { + string timestamp = 1; + string data = 2; +} + +message JobLogChunk { + string job_name = 1; + string pod_name = 2; + string container_name = 3; + repeated LogLine log_lines = 4; +} + +message WatchFeed { + JobLogChunk job_log_chunk = 1; + + /* + * WatchFeed could contain one or multiple events from tiller at the same time. + * All events are related to the process of making release and needed for the helm-client. + * For now there is only one event: JobLogChunk, needed for helm-client to show hooks logs during release. + * + * Suggestions for future events: + * JobStarted job_started = n; + * JobStarted job_finished = n; + * DeploymentStatus deployment_status = n; + * SomethingUseful something_useful = n; + * SomethingUseless something_useless = n; + */ +} diff --git a/_proto/hapi/services/tiller.proto b/_proto/hapi/services/tiller.proto index 6c44ce6e0..de706bcba 100644 --- a/_proto/hapi/services/tiller.proto +++ b/_proto/hapi/services/tiller.proto @@ -22,6 +22,7 @@ import "hapi/release/release.proto"; import "hapi/release/info.proto"; import "hapi/release/test_run.proto"; import "hapi/release/status.proto"; +import "hapi/release/watch_feed.proto"; import "hapi/version/version.proto"; option go_package = "services"; @@ -61,7 +62,7 @@ service ReleaseService { } // InstallRelease requests installation of a chart as a new release. - rpc InstallRelease(InstallReleaseRequest) returns (InstallReleaseResponse) { + rpc InstallRelease(InstallReleaseRequest) returns (stream InstallReleaseResponse) { // TODO: think about compatibility } // UninstallRelease requests deletion of a named release. @@ -286,6 +287,7 @@ message InstallReleaseRequest { // InstallReleaseResponse is the response from a release installation. message InstallReleaseResponse { hapi.release.Release release = 1; + hapi.release.WatchFeed watch_feed = 2; // TODO: think about compatibility } // UninstallReleaseRequest represents a request to uninstall a named release. diff --git a/pkg/helm/client.go b/pkg/helm/client.go index 771c7f3d1..3df0c8717 100644 --- a/pkg/helm/client.go +++ b/pkg/helm/client.go @@ -374,7 +374,35 @@ func (h *Client) install(ctx context.Context, req *rls.InstallReleaseRequest) (* defer c.Close() rlc := rls.NewReleaseServiceClient(c) - return rlc.InstallRelease(ctx, req) + + var finalResp *rls.InstallReleaseResponse + + stream, err := rlc.InstallRelease(ctx, req) + for { + resp, err := stream.Recv() + if err == io.EOF { + return finalResp, nil + } + if err != nil { + return resp, err + } + + if resp.WatchFeed.GetJobLogChunk() != nil { + for _, line := range resp.WatchFeed.GetJobLogChunk().LogLines { + fmt.Printf("%s %s\n", line.Timestamp, line.Data) // TODO: make normal formatting as follows. + // TODO The client could work like state machine: + // TODO when receiving job-pod-container log chunk print header "==> job X pod X container Y logs <==\n", + // TODO just like `tail -f *` works on multiple files at the same time. + // TODO When receiving job-pod-container log chunk for another pod or container or job, + // TODO client print new header and follow with log lines. + // TODO Also there will be other than log-chunk events: userspace-error or system-error. + // TODO The main reason to stream userspace-events like ImagePullBackOff or CrashLoopBackOff is + // TODO to give user enough info so that user can debug templates without accessing cluster using kubectl. + } + } else { + finalResp = resp // TODO verify/debug this code + } + } } // Executes tiller.UninstallRelease RPC. diff --git a/pkg/kube/job.go b/pkg/kube/job.go new file mode 100644 index 000000000..8538ffe1a --- /dev/null +++ b/pkg/kube/job.go @@ -0,0 +1,486 @@ +package kube + +import ( + "bytes" + _ "fmt" + "io" + "sort" + "strings" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/kubectl/resource" +) + +type WatchFeed interface { + WriteJobLogChunk(*JobLogChunk) error +} + +type LogLine struct { + Timestamp string + Data string +} + +type JobLogChunk struct { + JobName string + PodName string + ContainerName string + LogLines []LogLine +} + +type WriteJobLogChunkFunc func(*JobLogChunk) error + +func (f WriteJobLogChunkFunc) WriteJobLogChunk(chunk *JobLogChunk) error { + return f(chunk) +} + +type WatchMonitor struct { + kube *Client + timeout time.Duration + watchFeed WatchFeed + + Namespace string + ResourceName string + UID types.UID +} + +type PodWatchMonitor struct { + WatchMonitor + + Manifest *core.Pod + + InitContainersNames []string + ProcessedInitContainersNames []string + ProcessedInitContainersIDs []string + + ContainersNames []string + ProcessedContainersNames []string + ProcessedContainersIDs []string +} + +func (pod *PodWatchMonitor) GetMonitoredContainersNames() []string { + res := make([]string, 0) + +FilterProcessedContainersNames: + for _, name := range pod.ContainersNames { + for _, processedContainerName := range pod.ProcessedContainersNames { + if processedContainerName == name { + continue FilterProcessedContainersNames + } + } + res = append(res, name) + } + + return res +} + +func (pod *PodWatchMonitor) SetContainerProcessed(containerName string, containerID string) { + pod.ProcessedContainersNames = append(pod.ProcessedContainersNames, containerName) + pod.ProcessedContainersIDs = append(pod.ProcessedContainersIDs, containerID) +} + +func (pod *PodWatchMonitor) GetMonitoredInitContainersNames() []string { + res := make([]string, 0) + +FilterProcessedInitContainersNames: + for _, name := range pod.InitContainersNames { + for _, processedInitContainerName := range pod.ProcessedInitContainersNames { + if processedInitContainerName == name { + continue FilterProcessedInitContainersNames + } + } + res = append(res, name) + } + + return res +} + +func (pod *PodWatchMonitor) SetInitContainerProcessed(containerName string, containerID string) { + pod.ProcessedInitContainersNames = append(pod.ProcessedInitContainersNames, containerName) + pod.ProcessedInitContainersIDs = append(pod.ProcessedInitContainersIDs, containerID) +} + +func (pod *PodWatchMonitor) RefreshManifest() error { + client, err := pod.kube.ClientSet() + if err != nil { + return err + } + + manifest, err := client.Core(). + Pods(pod.Namespace). + Get(pod.ResourceName, metav1.GetOptions{}) + if err != nil { + return err + } + pod.Manifest = manifest + + return nil +} + +func (pod *PodWatchMonitor) GetReadyCondition() (res *core.PodCondition) { + for _, podCondition := range pod.Manifest.Status.Conditions { + if podCondition.Type == "Ready" { + res = &podCondition + break + } + } + return +} + +func (pod *PodWatchMonitor) GetInitContainerStatus(containerName string) (res *core.ContainerStatus) { + for _, containerStatus := range pod.Manifest.Status.ContainerStatuses { + if containerStatus.Name == containerName { + res = &containerStatus + break + } + } + return +} + +func (pod *PodWatchMonitor) GetContainerStatus(containerName string) (res *core.ContainerStatus) { + for _, containerStatus := range pod.Manifest.Status.ContainerStatuses { + if containerStatus.Name == containerName { + res = &containerStatus + break + } + } + return +} + +func (pod *PodWatchMonitor) GetContainerLogs(containerName string) ([]LogLine, error) { + client, err := pod.kube.ClientSet() + if err != nil { + return nil, err + } + + req := client.Core(). + Pods(pod.Namespace). + GetLogs(pod.ResourceName, &core.PodLogOptions{ + Container: containerName, + Timestamps: true, + }) + + readCloser, err := req.Stream() + if err != nil { + return nil, err + } + defer readCloser.Close() + + buf := bytes.Buffer{} + _, err = io.Copy(&buf, readCloser) + + res := make([]LogLine, 0) + for _, line := range strings.Split(strings.TrimSuffix(buf.String(), "\n"), "\n") { + lineParts := strings.SplitN(line, " ", 2) + // TODO: receive only new log lines, save state into PodWatchMonitor + if len(lineParts) == 2 { + ll := LogLine{ + Timestamp: lineParts[0], + Data: lineParts[1], + } + res = append(res, ll) + } + } + + return res, nil +} + +type JobWatchMonitor struct { + WatchMonitor + + MonitoredPodsQueue []*PodWatchMonitor + ProcessedPodsUIDs []types.UID +} + +func (job *JobWatchMonitor) WaitTillResourceVersionAdded(resourceVersion string, jobInfo *resource.Info) error { + w, err := resource. + NewHelper(jobInfo.Client, jobInfo.Mapping). + WatchSingle(job.Namespace, job.ResourceName, resourceVersion) + if err != nil { + return err + } + + _, err = watch.Until(job.timeout, w, func(e watch.Event) (bool, error) { + if e.Type == watch.Added { + return true, nil + } + return false, nil + }) + + return err +} + +func (job *JobWatchMonitor) TakeNextMonitoredPod() *PodWatchMonitor { + if len(job.MonitoredPodsQueue) == 0 { + return nil + } + + var res *PodWatchMonitor + res, job.MonitoredPodsQueue = job.MonitoredPodsQueue[0], job.MonitoredPodsQueue[1:] + return res +} + +func (job *JobWatchMonitor) SetPodProcessed(uid types.UID) { + job.ProcessedPodsUIDs = append(job.ProcessedPodsUIDs, uid) +} + +func (job *JobWatchMonitor) SchedulePodMonitoring(pod *PodWatchMonitor) { + job.MonitoredPodsQueue = append(job.MonitoredPodsQueue, pod) +} + +func (job *JobWatchMonitor) RefreshMonitoredPods() error { + job.kube.Log("RefreshMonitoredPods") // TODO: remove + + client, err := job.kube.ClientSet() + if err != nil { + return err + } + + jobManifest, err := client.Batch(). + Jobs(job.Namespace). + Get(job.ResourceName, metav1.GetOptions{}) + if err != nil { + return err + } + job.kube.Log("jobManifest: %+v", jobManifest) // TODO: remove + + selector, err := metav1.LabelSelectorAsSelector(jobManifest.Spec.Selector) + if err != nil { + return err + } + + podList, err := client.Core(). + Pods(job.Namespace). + List(metav1.ListOptions{LabelSelector: selector.String()}) + if err != nil { + return err + } + job.kube.Log("podList: %+v", podList) // TODO: remove + + // TODO filter out pods that does not belong to controller-uid=job-uid + + // Add new pods to monitor queue in chronological order by creation timestamp + podItems := make([]core.Pod, 0) + for _, item := range podList.Items { + podItems = append(podItems, item) + } + sort.Slice(podItems, func(i, j int) bool { + return podItems[i].CreationTimestamp.Time.Before(podItems[j].CreationTimestamp.Time) + }) + +searchNewPods: + for _, item := range podItems { + // filter out under processing + for _, monitoredPod := range job.MonitoredPodsQueue { + // TODO is there a need to check resource-version change? + if monitoredPod.UID == item.UID { + continue searchNewPods + } + } + // filter out already processed + for _, processedPodUID := range job.ProcessedPodsUIDs { + if processedPodUID == item.UID { + continue searchNewPods + } + } + + pod := &PodWatchMonitor{ + WatchMonitor: WatchMonitor{ + kube: job.kube, + timeout: job.timeout, + watchFeed: job.watchFeed, + + Namespace: job.Namespace, + ResourceName: item.Name, + UID: item.UID, + }, + } + if err = pod.RefreshManifest(); err != nil { + return err + } + for _, containerConf := range pod.Manifest.Spec.InitContainers { + pod.InitContainersNames = append(pod.InitContainersNames, containerConf.Name) + } + for _, containerConf := range pod.Manifest.Spec.Containers { + pod.ContainersNames = append(pod.ContainersNames, containerConf.Name) + } + + job.MonitoredPodsQueue = append(job.MonitoredPodsQueue, pod) + } + + job.kube.Log("RefreshMonitoredPods done: MonitoredPodsQueue: %+v", job.MonitoredPodsQueue) // TODO: remove + + return nil +} + +func (c *Client) watchJobTillDone(jobInfo *resource.Info, watchFeed WatchFeed, timeout time.Duration) error { + if jobInfo.Mapping.GroupVersionKind.Kind != "Job" { + return nil + } + + uid, err := jobInfo.Mapping.UID(jobInfo.Object) + if err != nil { + return err + } + + job := &JobWatchMonitor{ + WatchMonitor: WatchMonitor{ + kube: c, + timeout: timeout, + watchFeed: watchFeed, + + Namespace: jobInfo.Namespace, + ResourceName: jobInfo.Name, + UID: uid, + }, + } + + if err := job.WaitTillResourceVersionAdded(jobInfo.ResourceVersion, jobInfo); err != nil { + return err // TODO + } + + if err = job.RefreshMonitoredPods(); err != nil { + return err // TODO + } + + var processPod *PodWatchMonitor + + // TODO: split method into corresponding functions + +TakeNextMonitoredPod: + for { + if processPod = job.TakeNextMonitoredPod(); processPod == nil { + break + } + + if err := processPod.RefreshManifest(); err != nil { + c.Log("Pod %s refresh manifest failed: %s", processPod.ResourceName, err) + // TODO stream system-error to feed + job.SetPodProcessed(processPod.UID) + continue TakeNextMonitoredPod + } + + c.Log("Pod manifest refreshed, ResourceVersion: %s", processPod.Manifest.ResourceVersion) + + if processPod.Manifest.Status.Phase == core.PodPending { + c.Log("Pod %s is in PENDING state", processPod.ResourceName) + + if podReadyCondition := processPod.GetReadyCondition(); podReadyCondition != nil { + c.Log("Pod %s ready condition: %+v", processPod.ResourceName, podReadyCondition) + if podReadyCondition.Status != core.ConditionTrue { + // TODO: init-containers-statuses + for _, containerStatus := range processPod.Manifest.Status.ContainerStatuses { + if containerStatus.Ready { + continue + } + if containerStatus.State.Waiting != nil { + c.Log("Pod %s container %s is in waiting state: %s: %s", processPod.ResourceName, containerStatus.Name, containerStatus.State.Waiting.Reason, containerStatus.State.Waiting.Message) + + switch containerStatus.State.Waiting.Reason { + case "ImagePullBackOff", "ErrImagePull": + // TODO stream bad_image user-error + processPod.SetContainerProcessed(containerStatus.Name, containerStatus.ContainerID) + case "CrashLoopBackOff": + // TODO stream container_crash user-error + processPod.SetContainerProcessed(containerStatus.Name, containerStatus.ContainerID) + } + } + if containerStatus.State.Terminated != nil { + // TODO dig more, think more. + // TODO not necessary because in that container state we still able to reach containers logs + // TODO what about failed state? we should stream user-error about incorrectly terminated container + // TODO init-container should be finally terminated in normal situation + // TODO that error should follow logs and not preceede them + // TODO so it is needed to move that if into after send-logs-for-container section + + c.Log("Pod %s container %s (%s) is in terminated state: %s: %s", processPod.ResourceName, containerStatus.Name, containerStatus.State.Terminated.ContainerID, containerStatus.State.Terminated.Reason, containerStatus.State.Terminated.Message) + processPod.SetContainerProcessed(containerStatus.Name, containerStatus.ContainerID) + } + } + + job.SchedulePodMonitoring(processPod) + time.Sleep(time.Duration(1) * time.Second) // TODO remove this + continue TakeNextMonitoredPod + // TODO: fetch state and stream to feed userspace-error + } + } + } + + // TODO: init-containers + + ProcessContainers: + for _, containerName := range processPod.GetMonitoredContainersNames() { + c.Log("Process pod %s container %s", processPod.ResourceName, containerName) + if containerStatus := processPod.GetContainerStatus(containerName); containerStatus != nil { + c.Log("Process pod %s container %s status: %+v", processPod.ResourceName, containerName, containerStatus) + if containerStatus.State.Waiting != nil { + if containerStatus.State.Waiting.Reason == "RunContainerError" { + // TODO: stream userspace-error container_stuck to watch feed + processPod.SetContainerProcessed(containerName, containerStatus.ContainerID) + continue ProcessContainers + } + } + } else { + c.Log("Process pod %s container %s status not available", processPod.ResourceName, containerName) + } + + logLines, err := processPod.GetContainerLogs(containerName) + if err != nil { + c.Log("Error getting job %s pod %s container %s log chunk: %s", job.ResourceName, processPod.ResourceName, containerName, err) + } + + chunk := &JobLogChunk{ + JobName: job.ResourceName, + PodName: processPod.ResourceName, + ContainerName: containerName, + LogLines: logLines, + } + if err = job.watchFeed.WriteJobLogChunk(chunk); err != nil { + c.Log("Error writing job %s pod %s container %s log chunk to watch feed: %s", chunk.JobName, chunk.PodName, chunk.ContainerName, err) + } + } + + if len(processPod.GetMonitoredContainersNames()) > 0 { + job.SchedulePodMonitoring(processPod) + } + + time.Sleep(time.Duration(1) * time.Second) // TODO: show logs flawlessly without any suspension if there is something to show, also use ticker + } + + // TODO: wait till job done event + // TODO: make refresh for pods while waiting: job.RefreshMonitoredPods() + // TODO: it is not necessary to refresh list on every tick + + // TODO: add following event watch before ending this function + // switch e.Type { + // case watch.Added, watch.Modified: + // c.Log("o = %v", o) + // c.Log("o.Status = %v", o.Status) + // c.Log("o.Status.Conditions = %v", o.Status.Conditions) + // for _, c := range o.Status.Conditions { + // if c.Type == batchinternal.JobComplete && c.Status == core.ConditionTrue { + // return true, nil + // } else if c.Type == batchinternal.JobFailed && c.Status == core.ConditionTrue { + // return true, fmt.Errorf("Job failed: %s", c.Reason) + // } + // } + // c.Log("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", jobInfo.Name, o.Status.Active, o.Status.Failed, o.Status.Succeeded) + // return false, nil + // } + + return nil +} + +func (c *Client) WatchJobsTillDone(namespace string, reader io.Reader, watchFeed WatchFeed, timeout time.Duration) error { + infos, err := c.Build(namespace, reader) + if err != nil { + return err + } + + return perform(infos, func(info *resource.Info) error { + return c.watchJobTillDone(info, watchFeed, timeout) + }) +} diff --git a/pkg/proto/hapi/release/hook.pb.go b/pkg/proto/hapi/release/hook.pb.go index 0a44165c8..a12279b7d 100644 --- a/pkg/proto/hapi/release/hook.pb.go +++ b/pkg/proto/hapi/release/hook.pb.go @@ -11,6 +11,7 @@ It is generated from these files: hapi/release/status.proto hapi/release/test_run.proto hapi/release/test_suite.proto + hapi/release/watch_feed.proto It has these top-level messages: Hook @@ -19,6 +20,9 @@ It has these top-level messages: Status TestRun TestSuite + LogLine + JobLogChunk + WatchFeed */ package release diff --git a/pkg/proto/hapi/release/watch_feed.pb.go b/pkg/proto/hapi/release/watch_feed.pb.go new file mode 100644 index 000000000..6c34b03a2 --- /dev/null +++ b/pkg/proto/hapi/release/watch_feed.pb.go @@ -0,0 +1,122 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: hapi/release/watch_feed.proto + +package release + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +type LogLine struct { + Timestamp string `protobuf:"bytes,1,opt,name=timestamp" json:"timestamp,omitempty"` + Data string `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"` +} + +func (m *LogLine) Reset() { *m = LogLine{} } +func (m *LogLine) String() string { return proto.CompactTextString(m) } +func (*LogLine) ProtoMessage() {} +func (*LogLine) Descriptor() ([]byte, []int) { return fileDescriptor6, []int{0} } + +func (m *LogLine) GetTimestamp() string { + if m != nil { + return m.Timestamp + } + return "" +} + +func (m *LogLine) GetData() string { + if m != nil { + return m.Data + } + return "" +} + +type JobLogChunk struct { + JobName string `protobuf:"bytes,1,opt,name=job_name,json=jobName" json:"job_name,omitempty"` + PodName string `protobuf:"bytes,2,opt,name=pod_name,json=podName" json:"pod_name,omitempty"` + ContainerName string `protobuf:"bytes,3,opt,name=container_name,json=containerName" json:"container_name,omitempty"` + LogLines []*LogLine `protobuf:"bytes,4,rep,name=log_lines,json=logLines" json:"log_lines,omitempty"` +} + +func (m *JobLogChunk) Reset() { *m = JobLogChunk{} } +func (m *JobLogChunk) String() string { return proto.CompactTextString(m) } +func (*JobLogChunk) ProtoMessage() {} +func (*JobLogChunk) Descriptor() ([]byte, []int) { return fileDescriptor6, []int{1} } + +func (m *JobLogChunk) GetJobName() string { + if m != nil { + return m.JobName + } + return "" +} + +func (m *JobLogChunk) GetPodName() string { + if m != nil { + return m.PodName + } + return "" +} + +func (m *JobLogChunk) GetContainerName() string { + if m != nil { + return m.ContainerName + } + return "" +} + +func (m *JobLogChunk) GetLogLines() []*LogLine { + if m != nil { + return m.LogLines + } + return nil +} + +type WatchFeed struct { + JobLogChunk *JobLogChunk `protobuf:"bytes,1,opt,name=job_log_chunk,json=jobLogChunk" json:"job_log_chunk,omitempty"` +} + +func (m *WatchFeed) Reset() { *m = WatchFeed{} } +func (m *WatchFeed) String() string { return proto.CompactTextString(m) } +func (*WatchFeed) ProtoMessage() {} +func (*WatchFeed) Descriptor() ([]byte, []int) { return fileDescriptor6, []int{2} } + +func (m *WatchFeed) GetJobLogChunk() *JobLogChunk { + if m != nil { + return m.JobLogChunk + } + return nil +} + +func init() { + proto.RegisterType((*LogLine)(nil), "hapi.release.LogLine") + proto.RegisterType((*JobLogChunk)(nil), "hapi.release.JobLogChunk") + proto.RegisterType((*WatchFeed)(nil), "hapi.release.WatchFeed") +} + +func init() { proto.RegisterFile("hapi/release/watch_feed.proto", fileDescriptor6) } + +var fileDescriptor6 = []byte{ + // 258 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x90, 0x41, 0x4b, 0xc3, 0x40, + 0x14, 0x84, 0x89, 0x2d, 0xa6, 0x79, 0xb1, 0x1e, 0x16, 0x84, 0x14, 0x14, 0x4a, 0x40, 0xe8, 0x29, + 0x85, 0x78, 0x14, 0x2f, 0x0a, 0x1e, 0x4a, 0xf0, 0x90, 0x8b, 0xe0, 0x25, 0x6c, 0xb2, 0xcf, 0x64, + 0x63, 0xb2, 0x6f, 0x49, 0x56, 0xfc, 0x37, 0xfe, 0x56, 0xd9, 0x4d, 0xb0, 0xed, 0xed, 0xed, 0x7c, + 0xc3, 0x30, 0xb3, 0x70, 0xd7, 0x70, 0x2d, 0xf7, 0x03, 0x76, 0xc8, 0x47, 0xdc, 0xff, 0x70, 0x53, + 0x35, 0xc5, 0x27, 0xa2, 0x48, 0xf4, 0x40, 0x86, 0xd8, 0x95, 0xc5, 0xc9, 0x8c, 0xe3, 0x47, 0xf0, + 0x33, 0xaa, 0x33, 0xa9, 0x90, 0xdd, 0x42, 0x60, 0x64, 0x8f, 0xa3, 0xe1, 0xbd, 0x8e, 0xbc, 0xad, + 0xb7, 0x0b, 0xf2, 0xa3, 0xc0, 0x18, 0x2c, 0x05, 0x37, 0x3c, 0xba, 0x70, 0xc0, 0xdd, 0xf1, 0xaf, + 0x07, 0xe1, 0x81, 0xca, 0x8c, 0xea, 0x97, 0xe6, 0x5b, 0x7d, 0xb1, 0x0d, 0xac, 0x5a, 0x2a, 0x0b, + 0xc5, 0x7b, 0x9c, 0x03, 0xfc, 0x96, 0xca, 0x37, 0xde, 0xa3, 0x45, 0x9a, 0xc4, 0x84, 0xa6, 0x08, + 0x5f, 0x93, 0x70, 0xe8, 0x1e, 0xae, 0x2b, 0x52, 0x86, 0x4b, 0x85, 0xc3, 0x64, 0x58, 0x38, 0xc3, + 0xfa, 0x5f, 0x75, 0xb6, 0x14, 0x82, 0x8e, 0xea, 0xa2, 0x93, 0x0a, 0xc7, 0x68, 0xb9, 0x5d, 0xec, + 0xc2, 0xf4, 0x26, 0x39, 0xdd, 0x92, 0xcc, 0x43, 0xf2, 0x55, 0x37, 0x1d, 0x63, 0x7c, 0x80, 0xe0, + 0xdd, 0xee, 0x7f, 0x45, 0x14, 0xec, 0x09, 0xd6, 0xb6, 0x9d, 0x0d, 0xa9, 0x6c, 0x5d, 0x57, 0x31, + 0x4c, 0x37, 0xe7, 0x21, 0x27, 0x7b, 0xf2, 0xb0, 0x3d, 0x3e, 0x9e, 0x83, 0x0f, 0x7f, 0xf6, 0x94, + 0x97, 0xee, 0x27, 0x1f, 0xfe, 0x02, 0x00, 0x00, 0xff, 0xff, 0x6f, 0x51, 0x56, 0xb2, 0x6a, 0x01, + 0x00, 0x00, +} diff --git a/pkg/proto/hapi/services/tiller.pb.go b/pkg/proto/hapi/services/tiller.pb.go index 044d54e91..8168c29f8 100644 --- a/pkg/proto/hapi/services/tiller.pb.go +++ b/pkg/proto/hapi/services/tiller.pb.go @@ -41,6 +41,7 @@ import hapi_release5 "k8s.io/helm/pkg/proto/hapi/release" import hapi_release4 "k8s.io/helm/pkg/proto/hapi/release" import hapi_release1 "k8s.io/helm/pkg/proto/hapi/release" import hapi_release3 "k8s.io/helm/pkg/proto/hapi/release" +import hapi_release6 "k8s.io/helm/pkg/proto/hapi/release" import hapi_version "k8s.io/helm/pkg/proto/hapi/version" import ( @@ -710,7 +711,8 @@ func (m *InstallReleaseRequest) GetDescription() string { // InstallReleaseResponse is the response from a release installation. type InstallReleaseResponse struct { - Release *hapi_release5.Release `protobuf:"bytes,1,opt,name=release" json:"release,omitempty"` + Release *hapi_release5.Release `protobuf:"bytes,1,opt,name=release" json:"release,omitempty"` + WatchFeed *hapi_release6.WatchFeed `protobuf:"bytes,2,opt,name=watch_feed,json=watchFeed" json:"watch_feed,omitempty"` } func (m *InstallReleaseResponse) Reset() { *m = InstallReleaseResponse{} } @@ -725,6 +727,13 @@ func (m *InstallReleaseResponse) GetRelease() *hapi_release5.Release { return nil } +func (m *InstallReleaseResponse) GetWatchFeed() *hapi_release6.WatchFeed { + if m != nil { + return m.WatchFeed + } + return nil +} + // UninstallReleaseRequest represents a request to uninstall a named release. type UninstallReleaseRequest struct { // Name is the name of the release to delete. @@ -985,7 +994,7 @@ type ReleaseServiceClient interface { // UpdateRelease updates release content. UpdateRelease(ctx context.Context, in *UpdateReleaseRequest, opts ...grpc.CallOption) (*UpdateReleaseResponse, error) // InstallRelease requests installation of a chart as a new release. - InstallRelease(ctx context.Context, in *InstallReleaseRequest, opts ...grpc.CallOption) (*InstallReleaseResponse, error) + InstallRelease(ctx context.Context, in *InstallReleaseRequest, opts ...grpc.CallOption) (ReleaseService_InstallReleaseClient, error) // UninstallRelease requests deletion of a named release. UninstallRelease(ctx context.Context, in *UninstallReleaseRequest, opts ...grpc.CallOption) (*UninstallReleaseResponse, error) // GetVersion returns the current version of the server. @@ -1065,13 +1074,36 @@ func (c *releaseServiceClient) UpdateRelease(ctx context.Context, in *UpdateRele return out, nil } -func (c *releaseServiceClient) InstallRelease(ctx context.Context, in *InstallReleaseRequest, opts ...grpc.CallOption) (*InstallReleaseResponse, error) { - out := new(InstallReleaseResponse) - err := grpc.Invoke(ctx, "/hapi.services.tiller.ReleaseService/InstallRelease", in, out, c.cc, opts...) +func (c *releaseServiceClient) InstallRelease(ctx context.Context, in *InstallReleaseRequest, opts ...grpc.CallOption) (ReleaseService_InstallReleaseClient, error) { + stream, err := grpc.NewClientStream(ctx, &_ReleaseService_serviceDesc.Streams[1], c.cc, "/hapi.services.tiller.ReleaseService/InstallRelease", opts...) if err != nil { return nil, err } - return out, nil + x := &releaseServiceInstallReleaseClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type ReleaseService_InstallReleaseClient interface { + Recv() (*InstallReleaseResponse, error) + grpc.ClientStream +} + +type releaseServiceInstallReleaseClient struct { + grpc.ClientStream +} + +func (x *releaseServiceInstallReleaseClient) Recv() (*InstallReleaseResponse, error) { + m := new(InstallReleaseResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil } func (c *releaseServiceClient) UninstallRelease(ctx context.Context, in *UninstallReleaseRequest, opts ...grpc.CallOption) (*UninstallReleaseResponse, error) { @@ -1111,7 +1143,7 @@ func (c *releaseServiceClient) GetHistory(ctx context.Context, in *GetHistoryReq } func (c *releaseServiceClient) RunReleaseTest(ctx context.Context, in *TestReleaseRequest, opts ...grpc.CallOption) (ReleaseService_RunReleaseTestClient, error) { - stream, err := grpc.NewClientStream(ctx, &_ReleaseService_serviceDesc.Streams[1], c.cc, "/hapi.services.tiller.ReleaseService/RunReleaseTest", opts...) + stream, err := grpc.NewClientStream(ctx, &_ReleaseService_serviceDesc.Streams[2], c.cc, "/hapi.services.tiller.ReleaseService/RunReleaseTest", opts...) if err != nil { return nil, err } @@ -1157,7 +1189,7 @@ type ReleaseServiceServer interface { // UpdateRelease updates release content. UpdateRelease(context.Context, *UpdateReleaseRequest) (*UpdateReleaseResponse, error) // InstallRelease requests installation of a chart as a new release. - InstallRelease(context.Context, *InstallReleaseRequest) (*InstallReleaseResponse, error) + InstallRelease(*InstallReleaseRequest, ReleaseService_InstallReleaseServer) error // UninstallRelease requests deletion of a named release. UninstallRelease(context.Context, *UninstallReleaseRequest) (*UninstallReleaseResponse, error) // GetVersion returns the current version of the server. @@ -1249,22 +1281,25 @@ func _ReleaseService_UpdateRelease_Handler(srv interface{}, ctx context.Context, return interceptor(ctx, in, info, handler) } -func _ReleaseService_InstallRelease_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(InstallReleaseRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ReleaseServiceServer).InstallRelease(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/hapi.services.tiller.ReleaseService/InstallRelease", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ReleaseServiceServer).InstallRelease(ctx, req.(*InstallReleaseRequest)) +func _ReleaseService_InstallRelease_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(InstallReleaseRequest) + if err := stream.RecvMsg(m); err != nil { + return err } - return interceptor(ctx, in, info, handler) + return srv.(ReleaseServiceServer).InstallRelease(m, &releaseServiceInstallReleaseServer{stream}) +} + +type ReleaseService_InstallReleaseServer interface { + Send(*InstallReleaseResponse) error + grpc.ServerStream +} + +type releaseServiceInstallReleaseServer struct { + grpc.ServerStream +} + +func (x *releaseServiceInstallReleaseServer) Send(m *InstallReleaseResponse) error { + return x.ServerStream.SendMsg(m) } func _ReleaseService_UninstallRelease_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { @@ -1376,10 +1411,6 @@ var _ReleaseService_serviceDesc = grpc.ServiceDesc{ MethodName: "UpdateRelease", Handler: _ReleaseService_UpdateRelease_Handler, }, - { - MethodName: "InstallRelease", - Handler: _ReleaseService_InstallRelease_Handler, - }, { MethodName: "UninstallRelease", Handler: _ReleaseService_UninstallRelease_Handler, @@ -1403,6 +1434,11 @@ var _ReleaseService_serviceDesc = grpc.ServiceDesc{ Handler: _ReleaseService_ListReleases_Handler, ServerStreams: true, }, + { + StreamName: "InstallRelease", + Handler: _ReleaseService_InstallRelease_Handler, + ServerStreams: true, + }, { StreamName: "RunReleaseTest", Handler: _ReleaseService_RunReleaseTest_Handler, @@ -1415,85 +1451,87 @@ var _ReleaseService_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("hapi/services/tiller.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 1276 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x57, 0xdd, 0x6e, 0xe3, 0x44, - 0x14, 0x6e, 0xe2, 0xfc, 0x9e, 0x74, 0x43, 0x76, 0x36, 0xdb, 0x7a, 0xcd, 0x82, 0x82, 0x11, 0x6c, - 0x76, 0x61, 0x53, 0x08, 0xdc, 0x20, 0x21, 0xa4, 0x6e, 0x36, 0x6a, 0x0b, 0xa5, 0x2b, 0x39, 0xed, - 0x22, 0x21, 0x50, 0xe4, 0x26, 0x93, 0xd6, 0xac, 0x63, 0x07, 0xcf, 0xb8, 0x6c, 0x1f, 0x00, 0x24, - 0xde, 0x83, 0x07, 0xe1, 0x3d, 0x78, 0x0e, 0xee, 0x91, 0xe7, 0xc7, 0xf5, 0x38, 0x76, 0x6a, 0x7a, - 0xd3, 0x78, 0xe6, 0x9c, 0x39, 0x3f, 0xdf, 0x37, 0xe7, 0xcc, 0x29, 0x18, 0x97, 0xf6, 0xca, 0xd9, - 0x23, 0x38, 0xb8, 0x72, 0x66, 0x98, 0xec, 0x51, 0xc7, 0x75, 0x71, 0x30, 0x58, 0x05, 0x3e, 0xf5, - 0x51, 0x37, 0x92, 0x0d, 0xa4, 0x6c, 0xc0, 0x65, 0xc6, 0x0e, 0x3b, 0x31, 0xbb, 0xb4, 0x03, 0xca, - 0xff, 0x72, 0x6d, 0x63, 0x37, 0xb9, 0xef, 0x7b, 0x0b, 0xe7, 0x42, 0x08, 0xb8, 0x8b, 0x00, 0xbb, - 0xd8, 0x26, 0x58, 0xfe, 0x2a, 0x87, 0xa4, 0xcc, 0xf1, 0x16, 0xbe, 0x10, 0xbc, 0xab, 0x08, 0x28, - 0x26, 0x74, 0x1a, 0x84, 0x9e, 0x10, 0x3e, 0x52, 0x84, 0x84, 0xda, 0x34, 0x24, 0x8a, 0xb3, 0x2b, - 0x1c, 0x10, 0xc7, 0xf7, 0xe4, 0x2f, 0x97, 0x99, 0x7f, 0x97, 0xe1, 0xc1, 0xb1, 0x43, 0xa8, 0xc5, - 0x0f, 0x12, 0x0b, 0xff, 0x1a, 0x62, 0x42, 0x51, 0x17, 0xaa, 0xae, 0xb3, 0x74, 0xa8, 0x5e, 0xea, - 0x95, 0xfa, 0x9a, 0xc5, 0x17, 0x68, 0x07, 0x6a, 0xfe, 0x62, 0x41, 0x30, 0xd5, 0xcb, 0xbd, 0x52, - 0xbf, 0x69, 0x89, 0x15, 0xfa, 0x06, 0xea, 0xc4, 0x0f, 0xe8, 0xf4, 0xfc, 0x5a, 0xd7, 0x7a, 0xa5, - 0x7e, 0x7b, 0xf8, 0xd1, 0x20, 0x0b, 0xa7, 0x41, 0xe4, 0x69, 0xe2, 0x07, 0x74, 0x10, 0xfd, 0x79, - 0x71, 0x6d, 0xd5, 0x08, 0xfb, 0x8d, 0xec, 0x2e, 0x1c, 0x97, 0xe2, 0x40, 0xaf, 0x70, 0xbb, 0x7c, - 0x85, 0x0e, 0x00, 0x98, 0x5d, 0x3f, 0x98, 0xe3, 0x40, 0xaf, 0x32, 0xd3, 0xfd, 0x02, 0xa6, 0x5f, - 0x45, 0xfa, 0x56, 0x93, 0xc8, 0x4f, 0xf4, 0x35, 0x6c, 0x73, 0x48, 0xa6, 0x33, 0x7f, 0x8e, 0x89, - 0x5e, 0xeb, 0x69, 0xfd, 0xf6, 0xf0, 0x11, 0x37, 0x25, 0xe1, 0x9f, 0x70, 0xd0, 0x46, 0xfe, 0x1c, - 0x5b, 0x2d, 0xae, 0x1e, 0x7d, 0x13, 0xf4, 0x18, 0x9a, 0x9e, 0xbd, 0xc4, 0x64, 0x65, 0xcf, 0xb0, - 0x5e, 0x67, 0x11, 0xde, 0x6c, 0x98, 0x1e, 0x34, 0xa4, 0x73, 0xf3, 0x05, 0xd4, 0x78, 0x6a, 0xa8, - 0x05, 0xf5, 0xb3, 0x93, 0xef, 0x4e, 0x5e, 0xfd, 0x70, 0xd2, 0xd9, 0x42, 0x0d, 0xa8, 0x9c, 0xec, - 0x7f, 0x3f, 0xee, 0x94, 0xd0, 0x7d, 0xb8, 0x77, 0xbc, 0x3f, 0x39, 0x9d, 0x5a, 0xe3, 0xe3, 0xf1, - 0xfe, 0x64, 0xfc, 0xb2, 0x53, 0x46, 0x6d, 0x80, 0xd1, 0xe1, 0xbe, 0x75, 0x3a, 0x65, 0x2a, 0x9a, - 0xf9, 0x3e, 0x34, 0xe3, 0x1c, 0x50, 0x1d, 0xb4, 0xfd, 0xc9, 0x88, 0x9b, 0x78, 0x39, 0x9e, 0x8c, - 0x3a, 0x25, 0xf3, 0xcf, 0x12, 0x74, 0x55, 0xca, 0xc8, 0xca, 0xf7, 0x08, 0x8e, 0x38, 0x9b, 0xf9, - 0xa1, 0x17, 0x73, 0xc6, 0x16, 0x08, 0x41, 0xc5, 0xc3, 0x6f, 0x25, 0x63, 0xec, 0x3b, 0xd2, 0xa4, - 0x3e, 0xb5, 0x5d, 0xc6, 0x96, 0x66, 0xf1, 0x05, 0xfa, 0x1c, 0x1a, 0x02, 0x0a, 0xa2, 0x57, 0x7a, - 0x5a, 0xbf, 0x35, 0x7c, 0xa8, 0x02, 0x24, 0x3c, 0x5a, 0xb1, 0x9a, 0x79, 0x00, 0xbb, 0x07, 0x58, - 0x46, 0xc2, 0xf1, 0x93, 0x37, 0x28, 0xf2, 0x6b, 0x2f, 0x31, 0x0b, 0x26, 0xf2, 0x6b, 0x2f, 0x31, - 0xd2, 0xa1, 0x2e, 0xae, 0x1f, 0x0b, 0xa7, 0x6a, 0xc9, 0xa5, 0x49, 0x41, 0x5f, 0x37, 0x24, 0xf2, - 0xca, 0xb2, 0xf4, 0x31, 0x54, 0xa2, 0xca, 0x60, 0x66, 0x5a, 0x43, 0xa4, 0xc6, 0x79, 0xe4, 0x2d, - 0x7c, 0x8b, 0xc9, 0x55, 0xea, 0xb4, 0x34, 0x75, 0x87, 0x49, 0xaf, 0x23, 0xdf, 0xa3, 0xd8, 0xa3, - 0x77, 0x8b, 0xff, 0x18, 0x1e, 0x65, 0x58, 0x12, 0x09, 0xec, 0x41, 0x5d, 0x84, 0xc6, 0xac, 0xe5, - 0xe2, 0x2a, 0xb5, 0xcc, 0xdf, 0x35, 0xe8, 0x9e, 0xad, 0xe6, 0x36, 0xc5, 0x52, 0xb4, 0x21, 0xa8, - 0x27, 0x50, 0x65, 0x1d, 0x46, 0x60, 0x71, 0x9f, 0xdb, 0xe6, 0x6d, 0x68, 0x14, 0xfd, 0xb5, 0xb8, - 0x1c, 0x3d, 0x83, 0xda, 0x95, 0xed, 0x86, 0x98, 0x30, 0x20, 0x62, 0xd4, 0x84, 0x26, 0x6b, 0x4f, - 0x96, 0xd0, 0x40, 0xbb, 0x50, 0x9f, 0x07, 0xd7, 0x51, 0x7f, 0x61, 0x25, 0xd9, 0xb0, 0x6a, 0xf3, - 0xe0, 0xda, 0x0a, 0x3d, 0xf4, 0x21, 0xdc, 0x9b, 0x3b, 0xc4, 0x3e, 0x77, 0xf1, 0xf4, 0xd2, 0xf7, - 0xdf, 0x10, 0x56, 0x95, 0x0d, 0x6b, 0x5b, 0x6c, 0x1e, 0x46, 0x7b, 0xc8, 0x88, 0x6e, 0xd2, 0x2c, - 0xc0, 0x36, 0xc5, 0x7a, 0x8d, 0xc9, 0xe3, 0x75, 0x84, 0x21, 0x75, 0x96, 0xd8, 0x0f, 0x29, 0x2b, - 0x25, 0xcd, 0x92, 0x4b, 0xf4, 0x01, 0x6c, 0x07, 0x98, 0x60, 0x3a, 0x15, 0x51, 0x36, 0xd8, 0xc9, - 0x16, 0xdb, 0x7b, 0xcd, 0xc3, 0x42, 0x50, 0xf9, 0xcd, 0x76, 0xa8, 0xde, 0x64, 0x22, 0xf6, 0xcd, - 0x8f, 0x85, 0x04, 0xcb, 0x63, 0x20, 0x8f, 0x85, 0x04, 0x8b, 0x63, 0x5d, 0xa8, 0x2e, 0xfc, 0x60, - 0x86, 0xf5, 0x16, 0x93, 0xf1, 0x05, 0xea, 0x41, 0x6b, 0x8e, 0xc9, 0x2c, 0x70, 0x56, 0x34, 0x62, - 0x74, 0x9b, 0x61, 0x9a, 0xdc, 0x32, 0x0f, 0xe1, 0x61, 0x8a, 0x86, 0xbb, 0x32, 0xfa, 0x47, 0x19, - 0x76, 0x2c, 0xdf, 0x75, 0xcf, 0xed, 0xd9, 0x9b, 0x02, 0x9c, 0x26, 0xe0, 0x2f, 0x6f, 0x86, 0x5f, - 0xcb, 0x80, 0x3f, 0x71, 0x4d, 0x2b, 0xca, 0x35, 0x55, 0x88, 0xa9, 0xe6, 0x13, 0x53, 0x53, 0x89, - 0x91, 0xa8, 0xd7, 0x13, 0xa8, 0xc7, 0x90, 0x36, 0x36, 0x40, 0xda, 0x5c, 0x87, 0xf4, 0x5b, 0xd8, - 0x5d, 0xc3, 0xe1, 0xae, 0xa0, 0xfe, 0x5b, 0x86, 0x87, 0x47, 0x1e, 0xa1, 0xb6, 0xeb, 0xa6, 0x30, - 0x8d, 0x6b, 0xa2, 0x54, 0xb8, 0x26, 0xca, 0xff, 0xa7, 0x26, 0x34, 0x85, 0x14, 0xc9, 0x60, 0x25, - 0xc1, 0x60, 0xa1, 0x3a, 0x51, 0xba, 0x53, 0x2d, 0xd5, 0x9d, 0xd0, 0x7b, 0x00, 0xfc, 0x62, 0x33, - 0xe3, 0x1c, 0xfc, 0x26, 0xdb, 0x39, 0x11, 0xcd, 0x48, 0xf2, 0xd5, 0xc8, 0xe6, 0x2b, 0x59, 0x25, - 0x7d, 0xe8, 0xc8, 0x78, 0x66, 0xc1, 0x9c, 0xc5, 0x24, 0x2a, 0xa5, 0x2d, 0xf6, 0x47, 0xc1, 0x3c, - 0x8a, 0x2a, 0xcd, 0x61, 0x6b, 0x9d, 0xc3, 0x23, 0xd8, 0x49, 0xc3, 0x7e, 0x57, 0x0a, 0xff, 0x2a, - 0xc1, 0xee, 0x99, 0xe7, 0x64, 0x92, 0x98, 0x55, 0x18, 0x6b, 0xb0, 0x96, 0x33, 0x60, 0xed, 0x42, - 0x75, 0x15, 0x06, 0x17, 0x58, 0xd0, 0xc4, 0x17, 0x49, 0xbc, 0x2a, 0x2a, 0x5e, 0xa9, 0x8c, 0xab, - 0xeb, 0x19, 0x4f, 0x41, 0x5f, 0x8f, 0xf2, 0x8e, 0x39, 0x47, 0x79, 0xc5, 0x6f, 0x57, 0x93, 0xbf, - 0x53, 0xe6, 0x03, 0xb8, 0x7f, 0x80, 0xe9, 0x6b, 0x5e, 0xa6, 0x02, 0x00, 0x73, 0x0c, 0x28, 0xb9, - 0x79, 0xe3, 0x4f, 0x6c, 0xa9, 0xfe, 0xe4, 0x60, 0x27, 0xf5, 0xa5, 0x96, 0xf9, 0x15, 0xb3, 0x7d, - 0xe8, 0x10, 0xea, 0x07, 0xd7, 0x9b, 0xc0, 0xed, 0x80, 0xb6, 0xb4, 0xdf, 0x8a, 0xa7, 0x2d, 0xfa, - 0x34, 0x0f, 0x58, 0x04, 0xf1, 0x51, 0x11, 0x41, 0x72, 0x50, 0x28, 0x15, 0x1b, 0x14, 0x7e, 0x02, - 0x74, 0x8a, 0xe3, 0x99, 0xe5, 0x96, 0x37, 0x56, 0xd2, 0x54, 0x56, 0x69, 0xd2, 0xa1, 0x3e, 0x73, - 0xb1, 0xed, 0x85, 0x2b, 0x41, 0xac, 0x5c, 0x9a, 0x3f, 0xc3, 0x03, 0xc5, 0xba, 0x88, 0x33, 0xca, - 0x87, 0x5c, 0x08, 0xeb, 0xd1, 0x27, 0xfa, 0x12, 0x6a, 0x7c, 0xb0, 0x63, 0xb6, 0xdb, 0xc3, 0xc7, - 0x6a, 0xdc, 0xcc, 0x48, 0xe8, 0x89, 0x49, 0xd0, 0x12, 0xba, 0xc3, 0x7f, 0x1a, 0xd0, 0x96, 0xa3, - 0x09, 0x1f, 0x3b, 0x91, 0x03, 0xdb, 0xc9, 0x19, 0x0c, 0x3d, 0xcd, 0x9f, 0x4a, 0x53, 0xa3, 0xb5, - 0xf1, 0xac, 0x88, 0x2a, 0xcf, 0xc0, 0xdc, 0xfa, 0xac, 0x84, 0x08, 0x74, 0xd2, 0xa3, 0x11, 0x7a, - 0x9e, 0x6d, 0x23, 0x67, 0x16, 0x33, 0x06, 0x45, 0xd5, 0xa5, 0x5b, 0x74, 0xc5, 0xee, 0x8c, 0x3a, - 0xcf, 0xa0, 0x5b, 0xcd, 0xa8, 0x23, 0x94, 0xb1, 0x57, 0x58, 0x3f, 0xf6, 0xfb, 0x0b, 0xdc, 0x53, - 0x5e, 0x5c, 0x94, 0x83, 0x56, 0xd6, 0x74, 0x64, 0x7c, 0x52, 0x48, 0x37, 0xf6, 0xb5, 0x84, 0xb6, - 0xda, 0xc6, 0x50, 0x8e, 0x81, 0xcc, 0x37, 0xc6, 0xf8, 0xb4, 0x98, 0x72, 0xec, 0x8e, 0x40, 0x27, - 0xdd, 0x43, 0xf2, 0x78, 0xcc, 0xe9, 0x88, 0x79, 0x3c, 0xe6, 0xb5, 0x26, 0x73, 0x0b, 0xd9, 0x00, - 0x37, 0x2d, 0x04, 0x3d, 0xc9, 0x25, 0x44, 0xed, 0x3c, 0x46, 0xff, 0x76, 0xc5, 0xd8, 0xc5, 0x0a, - 0xde, 0x49, 0xbd, 0xe8, 0x28, 0x07, 0x9a, 0xec, 0x01, 0xc8, 0x78, 0x5e, 0x50, 0x3b, 0x95, 0x94, - 0xe8, 0x4a, 0x1b, 0x92, 0x52, 0x5b, 0xde, 0x86, 0xa4, 0x52, 0x0d, 0xce, 0xdc, 0x42, 0x0e, 0xb4, - 0xad, 0xd0, 0x13, 0xae, 0xa3, 0xb6, 0x80, 0x72, 0x4e, 0xaf, 0x77, 0x35, 0xe3, 0x69, 0x01, 0xcd, - 0x9b, 0xfa, 0x7e, 0x01, 0x3f, 0x36, 0xa4, 0xea, 0x79, 0x8d, 0xfd, 0x57, 0xfe, 0xc5, 0x7f, 0x01, - 0x00, 0x00, 0xff, 0xff, 0x38, 0x07, 0x4c, 0x12, 0x83, 0x10, 0x00, 0x00, + // 1310 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x57, 0x6d, 0x6f, 0xdc, 0x44, + 0x10, 0x8e, 0xcf, 0xf7, 0x3a, 0x97, 0x1e, 0xd7, 0x6d, 0x9a, 0xb8, 0xa6, 0x45, 0xc1, 0x08, 0x7a, + 0x2d, 0xf4, 0x02, 0x01, 0x21, 0x21, 0x21, 0xa4, 0xf4, 0x1a, 0x92, 0x42, 0x48, 0x25, 0xa7, 0x2f, + 0x12, 0x02, 0x9d, 0x1c, 0x7b, 0xaf, 0x31, 0xf5, 0xd9, 0x87, 0x77, 0x9d, 0x36, 0x1f, 0xf9, 0x00, + 0x12, 0xff, 0x83, 0x1f, 0xc2, 0x1f, 0xe1, 0x67, 0xf0, 0x1d, 0x79, 0x5f, 0x1c, 0xaf, 0xcf, 0xbe, + 0x9a, 0x7c, 0x39, 0xef, 0xee, 0xcc, 0xce, 0xcc, 0x3e, 0xcf, 0xce, 0xec, 0x1c, 0x98, 0x67, 0xce, + 0xc2, 0xdf, 0x21, 0x38, 0x3e, 0xf7, 0x5d, 0x4c, 0x76, 0xa8, 0x1f, 0x04, 0x38, 0x1e, 0x2f, 0xe2, + 0x88, 0x46, 0x68, 0x23, 0x95, 0x8d, 0xa5, 0x6c, 0xcc, 0x65, 0xe6, 0x26, 0xdb, 0xe1, 0x9e, 0x39, + 0x31, 0xe5, 0xbf, 0x5c, 0xdb, 0xdc, 0xca, 0xaf, 0x47, 0xe1, 0xcc, 0x7f, 0x29, 0x04, 0xdc, 0x45, + 0x8c, 0x03, 0xec, 0x10, 0x2c, 0xbf, 0xca, 0x26, 0x29, 0xf3, 0xc3, 0x59, 0x24, 0x04, 0xef, 0x2a, + 0x02, 0x8a, 0x09, 0x9d, 0xc6, 0x49, 0x28, 0x84, 0xb7, 0x14, 0x21, 0xa1, 0x0e, 0x4d, 0x88, 0x10, + 0xdd, 0x51, 0x44, 0xaf, 0x1d, 0xea, 0x9e, 0x4d, 0x67, 0x18, 0x7b, 0x4a, 0x2c, 0xe7, 0x38, 0x26, + 0x7e, 0x14, 0xca, 0x2f, 0x97, 0x59, 0x7f, 0x37, 0xe0, 0xc6, 0x91, 0x4f, 0xa8, 0xcd, 0x37, 0x13, + 0x1b, 0xff, 0x9a, 0x60, 0x42, 0xd1, 0x06, 0xb4, 0x02, 0x7f, 0xee, 0x53, 0x43, 0xdb, 0xd6, 0x46, + 0xba, 0xcd, 0x27, 0x68, 0x13, 0xda, 0xd1, 0x6c, 0x46, 0x30, 0x35, 0x1a, 0xdb, 0xda, 0xa8, 0x67, + 0x8b, 0x19, 0xfa, 0x06, 0x3a, 0x24, 0x8a, 0xe9, 0xf4, 0xf4, 0xc2, 0xd0, 0xb7, 0xb5, 0xd1, 0x60, + 0xf7, 0xc3, 0x71, 0x19, 0x8c, 0xe3, 0xd4, 0xd3, 0x49, 0x14, 0xd3, 0x71, 0xfa, 0xf3, 0xf0, 0xc2, + 0x6e, 0x13, 0xf6, 0x4d, 0xed, 0xce, 0xfc, 0x80, 0xe2, 0xd8, 0x68, 0x72, 0xbb, 0x7c, 0x86, 0x0e, + 0x00, 0x98, 0xdd, 0x28, 0xf6, 0x70, 0x6c, 0xb4, 0x98, 0xe9, 0x51, 0x0d, 0xd3, 0x4f, 0x52, 0x7d, + 0xbb, 0x47, 0xe4, 0x10, 0x7d, 0x0d, 0xeb, 0x1c, 0xb1, 0xa9, 0x1b, 0x79, 0x98, 0x18, 0xed, 0x6d, + 0x7d, 0x34, 0xd8, 0xbd, 0xc5, 0x4d, 0x49, 0x76, 0x4e, 0x38, 0xa6, 0x93, 0xc8, 0xc3, 0x76, 0x9f, + 0xab, 0xa7, 0x63, 0x82, 0x6e, 0x43, 0x2f, 0x74, 0xe6, 0x98, 0x2c, 0x1c, 0x17, 0x1b, 0x1d, 0x16, + 0xe1, 0xe5, 0x82, 0x15, 0x42, 0x57, 0x3a, 0xb7, 0x1e, 0x42, 0x9b, 0x1f, 0x0d, 0xf5, 0xa1, 0xf3, + 0xec, 0xf8, 0xfb, 0xe3, 0x27, 0x2f, 0x8e, 0x87, 0x6b, 0xa8, 0x0b, 0xcd, 0xe3, 0xbd, 0x1f, 0xf6, + 0x87, 0x1a, 0xba, 0x0e, 0xd7, 0x8e, 0xf6, 0x4e, 0x9e, 0x4e, 0xed, 0xfd, 0xa3, 0xfd, 0xbd, 0x93, + 0xfd, 0x47, 0xc3, 0x06, 0x1a, 0x00, 0x4c, 0x0e, 0xf7, 0xec, 0xa7, 0x53, 0xa6, 0xa2, 0x5b, 0xef, + 0x41, 0x2f, 0x3b, 0x03, 0xea, 0x80, 0xbe, 0x77, 0x32, 0xe1, 0x26, 0x1e, 0xed, 0x9f, 0x4c, 0x86, + 0x9a, 0xf5, 0xa7, 0x06, 0x1b, 0x2a, 0x65, 0x64, 0x11, 0x85, 0x04, 0xa7, 0x9c, 0xb9, 0x51, 0x12, + 0x66, 0x9c, 0xb1, 0x09, 0x42, 0xd0, 0x0c, 0xf1, 0x1b, 0xc9, 0x18, 0x1b, 0xa7, 0x9a, 0x34, 0xa2, + 0x4e, 0xc0, 0xd8, 0xd2, 0x6d, 0x3e, 0x41, 0x9f, 0x41, 0x57, 0x40, 0x41, 0x8c, 0xe6, 0xb6, 0x3e, + 0xea, 0xef, 0xde, 0x54, 0x01, 0x12, 0x1e, 0xed, 0x4c, 0xcd, 0x3a, 0x80, 0xad, 0x03, 0x2c, 0x23, + 0xe1, 0xf8, 0xc9, 0x1b, 0x94, 0xfa, 0x75, 0xe6, 0x98, 0x05, 0x93, 0xfa, 0x75, 0xe6, 0x18, 0x19, + 0xd0, 0x11, 0xd7, 0x8f, 0x85, 0xd3, 0xb2, 0xe5, 0xd4, 0xa2, 0x60, 0x2c, 0x1b, 0x12, 0xe7, 0x2a, + 0xb3, 0xf4, 0x11, 0x34, 0xd3, 0xc4, 0x61, 0x66, 0xfa, 0xbb, 0x48, 0x8d, 0xf3, 0x71, 0x38, 0x8b, + 0x6c, 0x26, 0x57, 0xa9, 0xd3, 0x8b, 0xd4, 0x1d, 0xe6, 0xbd, 0x4e, 0xa2, 0x90, 0xe2, 0x90, 0x5e, + 0x2d, 0xfe, 0x23, 0xb8, 0x55, 0x62, 0x49, 0x1c, 0x60, 0x07, 0x3a, 0x22, 0x34, 0x66, 0xad, 0x12, + 0x57, 0xa9, 0x65, 0xfd, 0xae, 0xc3, 0xc6, 0xb3, 0x85, 0xe7, 0x50, 0x2c, 0x45, 0x2b, 0x82, 0xba, + 0x0b, 0x2d, 0x56, 0x80, 0x04, 0x16, 0xd7, 0xb9, 0x6d, 0x5e, 0xa5, 0x26, 0xe9, 0xaf, 0xcd, 0xe5, + 0xe8, 0x3e, 0xb4, 0xcf, 0x9d, 0x20, 0xc1, 0x84, 0x01, 0x91, 0xa1, 0x26, 0x34, 0x59, 0xf5, 0xb2, + 0x85, 0x06, 0xda, 0x82, 0x8e, 0x17, 0x5f, 0xa4, 0xe5, 0x87, 0xa5, 0x64, 0xd7, 0x6e, 0x7b, 0xf1, + 0x85, 0x9d, 0x84, 0xe8, 0x03, 0xb8, 0xe6, 0xf9, 0xc4, 0x39, 0x0d, 0xf0, 0xf4, 0x2c, 0x8a, 0x5e, + 0x11, 0x96, 0x95, 0x5d, 0x7b, 0x5d, 0x2c, 0x1e, 0xa6, 0x6b, 0xc8, 0x4c, 0x6f, 0x92, 0x1b, 0x63, + 0x87, 0x62, 0xa3, 0xcd, 0xe4, 0xd9, 0x3c, 0xc5, 0x90, 0xfa, 0x73, 0x1c, 0x25, 0x94, 0xa5, 0x92, + 0x6e, 0xcb, 0x29, 0x7a, 0x1f, 0xd6, 0x63, 0x4c, 0x30, 0x9d, 0x8a, 0x28, 0xbb, 0x6c, 0x67, 0x9f, + 0xad, 0x3d, 0xe7, 0x61, 0x21, 0x68, 0xbe, 0x76, 0x7c, 0x6a, 0xf4, 0x98, 0x88, 0x8d, 0xf9, 0xb6, + 0x84, 0x60, 0xb9, 0x0d, 0xe4, 0xb6, 0x84, 0x60, 0xb1, 0x6d, 0x03, 0x5a, 0xb3, 0x28, 0x76, 0xb1, + 0xd1, 0x67, 0x32, 0x3e, 0x41, 0xdb, 0xd0, 0xf7, 0x30, 0x71, 0x63, 0x7f, 0x41, 0x53, 0x46, 0xd7, + 0x19, 0xa6, 0xf9, 0x25, 0xeb, 0x10, 0x6e, 0x16, 0x68, 0xb8, 0x2a, 0xa3, 0x7f, 0x34, 0x60, 0xd3, + 0x8e, 0x82, 0xe0, 0xd4, 0x71, 0x5f, 0xd5, 0xe0, 0x34, 0x07, 0x7f, 0x63, 0x35, 0xfc, 0x7a, 0x09, + 0xfc, 0xb9, 0x6b, 0xda, 0x54, 0xae, 0xa9, 0x42, 0x4c, 0xab, 0x9a, 0x98, 0xb6, 0x4a, 0x8c, 0x44, + 0xbd, 0x93, 0x43, 0x3d, 0x83, 0xb4, 0xbb, 0x02, 0xd2, 0xde, 0x32, 0xa4, 0xdf, 0xc1, 0xd6, 0x12, + 0x0e, 0x57, 0x05, 0xf5, 0xdf, 0x06, 0xdc, 0x7c, 0x1c, 0x12, 0xea, 0x04, 0x41, 0x01, 0xd3, 0x2c, + 0x27, 0xb4, 0xda, 0x39, 0xd1, 0xf8, 0x3f, 0x39, 0xa1, 0x2b, 0xa4, 0x48, 0x06, 0x9b, 0x39, 0x06, + 0x6b, 0xe5, 0x89, 0x52, 0x9d, 0xda, 0x85, 0xea, 0x84, 0xee, 0x00, 0xf0, 0x8b, 0xcd, 0x8c, 0x73, + 0xf0, 0x7b, 0x6c, 0xe5, 0x58, 0x14, 0x23, 0xc9, 0x57, 0xb7, 0x9c, 0xaf, 0x7c, 0x96, 0x8c, 0x60, + 0x28, 0xe3, 0x71, 0x63, 0x8f, 0xc5, 0x24, 0x32, 0x65, 0x20, 0xd6, 0x27, 0xb1, 0x97, 0x46, 0x55, + 0xe4, 0xb0, 0xbf, 0xcc, 0xe1, 0x6f, 0x1a, 0x6c, 0x16, 0x71, 0xbf, 0x22, 0x87, 0xe8, 0x4b, 0x80, + 0xcb, 0x86, 0x45, 0x90, 0xb0, 0xa5, 0xee, 0x79, 0x91, 0xca, 0xbf, 0xc5, 0xd8, 0xb3, 0x7b, 0xaf, + 0xe5, 0xd0, 0xfa, 0x4b, 0x83, 0xad, 0x67, 0xa1, 0x5f, 0xca, 0x7e, 0x59, 0x46, 0x2d, 0xf1, 0xd1, + 0x28, 0xe1, 0x63, 0x03, 0x5a, 0x8b, 0x24, 0x7e, 0x89, 0x05, 0xbf, 0x7c, 0x92, 0x07, 0xba, 0xa9, + 0x02, 0x5d, 0x80, 0xaa, 0xb5, 0x0c, 0xd5, 0x14, 0x8c, 0xe5, 0x28, 0xaf, 0x8a, 0x15, 0xca, 0x3d, + 0x7a, 0x3d, 0xfe, 0xc0, 0x59, 0x37, 0xe0, 0xfa, 0x01, 0xa6, 0xcf, 0x79, 0x7e, 0x0b, 0x00, 0xac, + 0x7d, 0x40, 0xf9, 0xc5, 0x4b, 0x7f, 0x62, 0x49, 0xf5, 0x27, 0x3b, 0x42, 0xa9, 0x2f, 0xb5, 0xac, + 0xaf, 0x98, 0xed, 0x43, 0x9f, 0xd0, 0x28, 0xbe, 0x58, 0x05, 0xee, 0x10, 0xf4, 0xb9, 0xf3, 0x46, + 0xbc, 0x89, 0xe9, 0xd0, 0x3a, 0x60, 0x11, 0x64, 0x5b, 0x45, 0x04, 0xf9, 0x0e, 0x43, 0xab, 0xd7, + 0x61, 0xfc, 0x04, 0xe8, 0x29, 0xce, 0x9a, 0x9d, 0xb7, 0x3c, 0xce, 0x92, 0xa6, 0x86, 0x4a, 0x93, + 0x01, 0x1d, 0x37, 0xc0, 0x4e, 0x98, 0x2c, 0x04, 0xb1, 0x72, 0x6a, 0xfd, 0x0c, 0x37, 0x14, 0xeb, + 0x22, 0xce, 0xf4, 0x3c, 0xe4, 0xa5, 0xb0, 0x9e, 0x0e, 0xd1, 0x17, 0xd0, 0xe6, 0x1d, 0x21, 0xb3, + 0x3d, 0xd8, 0xbd, 0xad, 0xc6, 0xcd, 0x8c, 0x24, 0xa1, 0x68, 0x21, 0x6d, 0xa1, 0xbb, 0xfb, 0x4f, + 0x17, 0x06, 0xb2, 0xa7, 0xe1, 0xfd, 0x2a, 0xf2, 0x61, 0x3d, 0xdf, 0xbc, 0xa1, 0x7b, 0xd5, 0xed, + 0x6c, 0xa1, 0x27, 0x37, 0xef, 0xd7, 0x51, 0xe5, 0x27, 0xb0, 0xd6, 0x3e, 0xd5, 0x10, 0x81, 0x61, + 0xb1, 0xa7, 0x42, 0x0f, 0xca, 0x6d, 0x54, 0x34, 0x71, 0xe6, 0xb8, 0xae, 0xba, 0x74, 0x8b, 0xce, + 0xd9, 0x9d, 0x51, 0x1b, 0x21, 0xf4, 0x56, 0x33, 0x6a, 0xef, 0x65, 0xee, 0xd4, 0xd6, 0xcf, 0xfc, + 0xfe, 0x02, 0xd7, 0x94, 0xa7, 0x1a, 0x55, 0xa0, 0x55, 0xd6, 0x56, 0x99, 0x1f, 0xd7, 0xd2, 0xcd, + 0x7c, 0x45, 0x30, 0x50, 0xcb, 0x1f, 0xaa, 0x30, 0x50, 0xfa, 0x38, 0x99, 0x9f, 0xd4, 0x53, 0x56, + 0x99, 0x2c, 0x56, 0x91, 0x2a, 0x26, 0x2b, 0x6a, 0x62, 0x15, 0x93, 0x55, 0xc5, 0xc9, 0x5a, 0x43, + 0x0e, 0xc0, 0x65, 0x11, 0x41, 0x77, 0x2b, 0x29, 0x51, 0x6b, 0x8f, 0x39, 0x7a, 0xbb, 0x62, 0xe6, + 0x62, 0x01, 0xef, 0x14, 0x9a, 0x01, 0x54, 0x01, 0x4e, 0x79, 0xef, 0x64, 0x3e, 0xa8, 0xa9, 0x5d, + 0x38, 0x94, 0xa8, 0x4b, 0x2b, 0x0e, 0xa5, 0x16, 0xbd, 0x15, 0x87, 0x2a, 0x94, 0x38, 0x6b, 0x0d, + 0xf9, 0x30, 0xb0, 0x93, 0x50, 0xb8, 0x4e, 0x0b, 0x03, 0xaa, 0xd8, 0xbd, 0x5c, 0xd7, 0xcc, 0x7b, + 0x35, 0x34, 0x2f, 0xef, 0xc5, 0x43, 0xf8, 0xb1, 0x2b, 0x55, 0x4f, 0xdb, 0xec, 0x0f, 0xfd, 0xe7, + 0xff, 0x05, 0x00, 0x00, 0xff, 0xff, 0x8e, 0x6e, 0xfd, 0x45, 0xdd, 0x10, 0x00, 0x00, } diff --git a/pkg/tiller/environment/environment.go b/pkg/tiller/environment/environment.go index c9ddab3ab..a154f5104 100644 --- a/pkg/tiller/environment/environment.go +++ b/pkg/tiller/environment/environment.go @@ -141,6 +141,8 @@ type KubeClient interface { // WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase // and returns said phase (PodSucceeded or PodFailed qualify). WaitAndGetCompletedPodPhase(namespace string, reader io.Reader, timeout time.Duration) (core.PodPhase, error) + + WatchJobsTillDone(namespace string, reader io.Reader, watchFeed kube.WatchFeed, timeout time.Duration) error } // PrintingKubeClient implements KubeClient, but simply prints the reader to diff --git a/pkg/tiller/release_install.go b/pkg/tiller/release_install.go index 973da3581..17daf79e7 100644 --- a/pkg/tiller/release_install.go +++ b/pkg/tiller/release_install.go @@ -18,12 +18,14 @@ package tiller import ( "fmt" + // "io" "strings" - ctx "golang.org/x/net/context" + // ctx "golang.org/x/net/context" "k8s.io/helm/pkg/chartutil" "k8s.io/helm/pkg/hooks" + "k8s.io/helm/pkg/kube" "k8s.io/helm/pkg/proto/hapi/release" "k8s.io/helm/pkg/proto/hapi/services" relutil "k8s.io/helm/pkg/releaseutil" @@ -31,7 +33,7 @@ import ( ) // InstallRelease installs a release and stores the release record. -func (s *ReleaseServer) InstallRelease(c ctx.Context, req *services.InstallReleaseRequest) (*services.InstallReleaseResponse, error) { +func (s *ReleaseServer) InstallRelease(req *services.InstallReleaseRequest, stream services.ReleaseService_InstallReleaseServer) error { s.Log("preparing install for %s", req.Name) rel, err := s.prepareRelease(req) if err != nil { @@ -43,15 +45,22 @@ func (s *ReleaseServer) InstallRelease(c ctx.Context, req *services.InstallRelea if req.DryRun && strings.HasPrefix(err.Error(), "YAML parse error") { err = fmt.Errorf("%s\n%s", err, rel.Manifest) } - return res, err + + if sendErr := stream.Send(res); sendErr != nil { + return sendErr + } + return err } s.Log("performing install for %s", req.Name) - res, err := s.performRelease(rel, req) + res, err := s.performRelease(rel, req, stream) if err != nil { s.Log("failed install perform step: %s", err) } - return res, err + if sendErr := stream.Send(res); sendErr != nil { + return sendErr + } + return err } // prepareRelease builds a release for an install operation. @@ -142,7 +151,7 @@ func hasCRDHook(hs []*release.Hook) bool { } // performRelease runs a release. -func (s *ReleaseServer) performRelease(r *release.Release, req *services.InstallReleaseRequest) (*services.InstallReleaseResponse, error) { +func (s *ReleaseServer) performRelease(r *release.Release, req *services.InstallReleaseRequest, stream services.ReleaseService_InstallReleaseServer) (*services.InstallReleaseResponse, error) { res := &services.InstallReleaseResponse{Release: r} manifestDoc := []byte(r.Manifest) @@ -182,7 +191,28 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install // pre-install hooks if !req.DisableHooks { - if err := s.execHook(r.Hooks, r.Name, r.Namespace, hooks.PreInstall, req.Timeout); err != nil { + handleLogChunk := kube.WriteJobLogChunkFunc(func(chunk *kube.JobLogChunk) error { + chunkResp := &services.InstallReleaseResponse{ + WatchFeed: &release.WatchFeed{ + JobLogChunk: &release.JobLogChunk{}, + }, + } + chunkResp.WatchFeed.JobLogChunk.LogLines = make([]*release.LogLine, 0) + + for _, line := range chunk.LogLines { + ll := &release.LogLine{ + Timestamp: line.Timestamp, + Data: line.Data, + } + chunkResp.WatchFeed.JobLogChunk.LogLines = append(chunkResp.WatchFeed.JobLogChunk.LogLines, ll) + } + + return stream.Send(chunkResp) + }) + + // TODO watch job with feed only if job have annotation "helm/watch-logs": "true" + // TODO otherwise watch as ordinary hook just like before, using WatchUntilReady + if err := s.execHookWithWatchFeed(r.Hooks, r.Name, r.Namespace, hooks.PreInstall, req.Timeout, handleLogChunk); err != nil { return res, err } } else { diff --git a/pkg/tiller/release_server.go b/pkg/tiller/release_server.go index e562be203..19c021c32 100644 --- a/pkg/tiller/release_server.go +++ b/pkg/tiller/release_server.go @@ -23,6 +23,7 @@ import ( "path" "regexp" "strings" + "time" "github.com/technosophos/moniker" "gopkg.in/yaml.v2" @@ -32,6 +33,7 @@ import ( "k8s.io/helm/pkg/chartutil" "k8s.io/helm/pkg/hooks" + "k8s.io/helm/pkg/kube" "k8s.io/helm/pkg/proto/hapi/chart" "k8s.io/helm/pkg/proto/hapi/release" "k8s.io/helm/pkg/proto/hapi/services" @@ -363,6 +365,10 @@ func (s *ReleaseServer) recordRelease(r *release.Release, reuse bool) { } func (s *ReleaseServer) execHook(hs []*release.Hook, name, namespace, hook string, timeout int64) error { + return s.execHookWithWatchFeed(hs, name, namespace, hook, timeout, kube.WriteJobLogChunkFunc(func(*kube.JobLogChunk) error { return nil })) +} + +func (s *ReleaseServer) execHookWithWatchFeed(hs []*release.Hook, name, namespace, hook string, timeout int64, watchFeed kube.WatchFeed) error { kubeCli := s.env.KubeClient code, ok := events[hook] if !ok { @@ -397,14 +403,13 @@ func (s *ReleaseServer) execHook(hs []*release.Hook, name, namespace, hook strin // We can't watch CRDs if hook != hooks.CRDInstall { - if err := kubeCli.WatchUntilReady(namespace, b, timeout, false); err != nil { + if err := kubeCli.WatchJobsTillDone(namespace, b, watchFeed, time.Duration(timeout)*time.Second); err != nil { s.Log("warning: Release %s %s %s could not complete: %s", name, hook, h.Path, err) // If a hook is failed, checkout the annotation of the hook to determine whether the hook should be deleted // under failed condition. If so, then clear the corresponding resource object in the hook if err := s.deleteHookByPolicy(h, hooks.HookFailed, name, namespace, hook, kubeCli); err != nil { return err } - return err } } }