Cleanup log, pre/post install hooks with extended watch

pull/3479/head
Timofey Kirillov 8 years ago
parent d3d9595f88
commit cb3a0fd90a

@ -389,6 +389,7 @@ func (h *Client) install(ctx context.Context, req *rls.InstallReleaseRequest) (*
} }
} }
formatJobHeader := func(jobName string, podName string, containerName string) string { formatJobHeader := func(jobName string, podName string, containerName string) string {
// tail -f on multiple files prints similar headers
return fmt.Sprintf("==> Job \"%s\", Pod \"%s\", Container \"%s\" <==", jobName, podName, containerName) return fmt.Sprintf("==> Job \"%s\", Pod \"%s\", Container \"%s\" <==", jobName, podName, containerName)
} }
@ -415,7 +416,7 @@ func (h *Client) install(ctx context.Context, req *rls.InstallReleaseRequest) (*
setLogHeader(formatJobHeader(jobPodError.JobName, jobPodError.PodName, jobPodError.ContainerName)) setLogHeader(formatJobHeader(jobPodError.JobName, jobPodError.PodName, jobPodError.ContainerName))
fmt.Fprintf(os.Stderr, "ERROR: %s", jobPodError.Message) fmt.Fprintf(os.Stderr, "Error: %s\n", jobPodError.Message)
} else { } else {
finalResp = resp // TODO verify/debug this code finalResp = resp // TODO verify/debug this code
} }

@ -14,7 +14,7 @@ import (
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/kubectl/resource" "k8s.io/kubernetes/pkg/kubectl/genericclioptions/resource"
) )
var ( var (
@ -198,15 +198,20 @@ func (pod *PodWatchMonitor) Watch() error {
} }
_, err = watch.Until(pod.Timeout, watcher, func(e watch.Event) (bool, error) { _, err = watch.Until(pod.Timeout, watcher, func(e watch.Event) (bool, error) {
pod.Kube.Log("[DEBUG] Pod %s event: %+v", pod.ResourceName, e)
object, ok := e.Object.(*core.Pod) object, ok := e.Object.(*core.Pod)
if !ok { if !ok {
return true, fmt.Errorf("Expected %s to be a *core.Pod, got %T", pod.ResourceName, e.Object) return true, fmt.Errorf("Expected %s to be a *core.Pod, got %T", pod.ResourceName, e.Object)
} }
// TODO: enable InitContainerStatuses allContainerStatuses := make([]core.ContainerStatus, 0)
for _, cs := range object.Status.InitContainerStatuses {
allContainerStatuses = append(allContainerStatuses, cs)
}
for _, cs := range object.Status.ContainerStatuses { for _, cs := range object.Status.ContainerStatuses {
allContainerStatuses = append(allContainerStatuses, cs)
}
for _, cs := range allContainerStatuses {
oldState := pod.ContainerMonitorStates[cs.Name] oldState := pod.ContainerMonitorStates[cs.Name]
if cs.State.Waiting != nil { if cs.State.Waiting != nil {
@ -252,6 +257,8 @@ type JobWatchMonitor struct {
PodError chan PodError PodError chan PodError
MonitoredPods []*PodWatchMonitor MonitoredPods []*PodWatchMonitor
FinalJobStatus batch.JobStatus
} }
func (job *JobWatchMonitor) Watch() error { func (job *JobWatchMonitor) Watch() error {
@ -271,18 +278,15 @@ func (job *JobWatchMonitor) Watch() error {
} }
_, err = watch.Until(job.Timeout, watcher, func(e watch.Event) (bool, error) { _, err = watch.Until(job.Timeout, watcher, func(e watch.Event) (bool, error) {
job.Kube.Log("[DEBUG] Job %s event: %+v", job.ResourceName, e)
switch job.State { switch job.State {
case "": case "":
if e.Type == watch.Added { if e.Type == watch.Added {
job.Started <- true job.Started <- true
oldState := job.State
job.State = "Started" job.State = "Started"
job.Kube.Log("[DEBUG] Job %s watcher state changed %v -> %v", job.ResourceName, oldState, job.State)
job.Kube.Log("[DEBUG] Starting job %s pods watcher", job.ResourceName) job.Kube.Log("Starting to watch job %s pods", job.ResourceName)
go func() { go func() {
err := job.WatchPods() err := job.WatchPods()
if err != nil { if err != nil {
@ -299,19 +303,15 @@ func (job *JobWatchMonitor) Watch() error {
for _, c := range object.Status.Conditions { for _, c := range object.Status.Conditions {
if c.Type == batch.JobComplete && c.Status == core.ConditionTrue { if c.Type == batch.JobComplete && c.Status == core.ConditionTrue {
oldState := job.State
job.State = "Succeeded" job.State = "Succeeded"
job.Kube.Log("[DEBUG] Job %s watcher state changed %v -> %v", job.ResourceName, oldState, job.State)
job.Kube.Log("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", job.ResourceName, object.Status.Active, object.Status.Failed, object.Status.Succeeded) job.FinalJobStatus = object.Status
job.Succeeded <- true job.Succeeded <- true
return true, nil return true, nil
} else if c.Type == batch.JobFailed && c.Status == core.ConditionTrue { } else if c.Type == batch.JobFailed && c.Status == core.ConditionTrue {
oldState := job.State
job.State = "Failed" job.State = "Failed"
job.Kube.Log("[DEBUG] Job %s watcher state changed %v -> %v", job.ResourceName, oldState, job.State)
return true, fmt.Errorf("Job failed: %s", c.Reason) return true, fmt.Errorf("Job failed: %s", c.Reason)
} }
@ -359,10 +359,8 @@ func (job *JobWatchMonitor) WatchPods() error {
return err return err
} }
// TODO calculate timeout since job-watch started // TODO: calculate timeout since job-watch started
_, err = watch.Until(job.Timeout, podListWatcher, func(e watch.Event) (bool, error) { _, err = watch.Until(job.Timeout, podListWatcher, func(e watch.Event) (bool, error) {
job.Kube.Log("[DEBUG] Job %s pods list event: %+v", job.ResourceName, e)
podObject, ok := e.Object.(*core.Pod) podObject, ok := e.Object.(*core.Pod)
if !ok { if !ok {
return true, fmt.Errorf("Expected %s to be a *core.Pod, got %T", job.ResourceName, e.Object) return true, fmt.Errorf("Expected %s to be a *core.Pod, got %T", job.ResourceName, e.Object)
@ -375,7 +373,7 @@ func (job *JobWatchMonitor) WatchPods() error {
} }
} }
// TODO constructor from job & podObject // TODO: constructor from job & podObject
pod := &PodWatchMonitor{ pod := &PodWatchMonitor{
WatchMonitor: WatchMonitor{ WatchMonitor: WatchMonitor{
Kube: job.Kube, Kube: job.Kube,
@ -383,7 +381,7 @@ func (job *JobWatchMonitor) WatchPods() error {
Namespace: job.Namespace, Namespace: job.Namespace,
ResourceName: podObject.Name, ResourceName: podObject.Name,
InitialResourceVersion: "", InitialResourceVersion: "", // this will make PodWatchMonitor receive podObject again and handle its state properly by itself
}, },
PodLogChunk: job.PodLogChunk, PodLogChunk: job.PodLogChunk,
@ -418,18 +416,18 @@ func (job *JobWatchMonitor) WatchPods() error {
return nil return nil
} }
func (c *Client) WatchJobsTillDone(namespace string, reader io.Reader, watchFeed WatchFeed, timeout time.Duration) error { func (c *Client) WatchJobsUntilReady(namespace string, reader io.Reader, watchFeed WatchFeed, timeout time.Duration) error {
infos, err := c.Build(namespace, reader) infos, err := c.Build(namespace, reader)
if err != nil { if err != nil {
return err return err
} }
return perform(infos, func(info *resource.Info) error { return perform(infos, func(info *resource.Info) error {
return c.watchJobTillDone(info, watchFeed, timeout) return c.watchJobUntilReady(info, watchFeed, timeout)
}) })
} }
func (c *Client) watchJobTillDone(jobInfo *resource.Info, watchFeed WatchFeed, timeout time.Duration) error { func (c *Client) watchJobUntilReady(jobInfo *resource.Info, watchFeed WatchFeed, timeout time.Duration) error {
if jobInfo.Mapping.GroupVersionKind.Kind != "Job" { if jobInfo.Mapping.GroupVersionKind.Kind != "Job" {
return nil return nil
} }
@ -454,7 +452,6 @@ func (c *Client) watchJobTillDone(jobInfo *resource.Info, watchFeed WatchFeed, t
Error: make(chan error, 0), Error: make(chan error, 0),
} }
c.Log("[DEBUG] Starting job %s watcher", job.ResourceName)
go func() { go func() {
err := job.Watch() err := job.Watch()
if err != nil { if err != nil {
@ -467,7 +464,7 @@ func (c *Client) watchJobTillDone(jobInfo *resource.Info, watchFeed WatchFeed, t
case <-job.Started: case <-job.Started:
c.Log("Job %s started", job.ResourceName) c.Log("Job %s started", job.ResourceName)
case <-job.Succeeded: case <-job.Succeeded:
c.Log("Job %s succeeded", job.ResourceName) c.Log("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", job.ResourceName, job.FinalJobStatus.Active, job.FinalJobStatus.Failed, job.FinalJobStatus.Succeeded)
return nil return nil
case err := <-job.Error: case err := <-job.Error:
return err return err

@ -142,7 +142,7 @@ type KubeClient interface {
// and returns said phase (PodSucceeded or PodFailed qualify). // and returns said phase (PodSucceeded or PodFailed qualify).
WaitAndGetCompletedPodPhase(namespace string, reader io.Reader, timeout time.Duration) (core.PodPhase, error) WaitAndGetCompletedPodPhase(namespace string, reader io.Reader, timeout time.Duration) (core.PodPhase, error)
WatchJobsTillDone(namespace string, reader io.Reader, watchFeed kube.WatchFeed, timeout time.Duration) error WatchJobsUntilReady(namespace string, reader io.Reader, watchFeed kube.WatchFeed, timeout time.Duration) error
} }
// PrintingKubeClient implements KubeClient, but simply prints the reader to // PrintingKubeClient implements KubeClient, but simply prints the reader to

@ -155,6 +155,44 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install
res := &services.InstallReleaseResponse{Release: r} res := &services.InstallReleaseResponse{Release: r}
manifestDoc := []byte(r.Manifest) manifestDoc := []byte(r.Manifest)
watchFeed := &kube.WatchFeedProto{
WriteJobLogChunkFunc: func(chunk kube.JobLogChunk) error {
chunkResp := &services.InstallReleaseResponse{
WatchFeed: &release.WatchFeed{
JobLogChunk: &release.JobLogChunk{
JobName: chunk.JobName,
PodName: chunk.PodName,
ContainerName: chunk.ContainerName,
LogLines: make([]*release.LogLine, 0),
},
},
}
for _, line := range chunk.LogLines {
ll := &release.LogLine{
Timestamp: line.Timestamp,
Data: line.Data,
}
chunkResp.WatchFeed.JobLogChunk.LogLines = append(chunkResp.WatchFeed.JobLogChunk.LogLines, ll)
}
return stream.Send(chunkResp)
},
WriteJobPodErrorFunc: func(obj kube.JobPodError) error {
chunkResp := &services.InstallReleaseResponse{
WatchFeed: &release.WatchFeed{
JobPodError: &release.JobPodError{
JobName: obj.JobName,
PodName: obj.PodName,
ContainerName: obj.ContainerName,
Message: obj.Message,
},
},
}
return stream.Send(chunkResp)
},
}
if req.DryRun { if req.DryRun {
s.Log("dry run for %s", r.Name) s.Log("dry run for %s", r.Name)
@ -191,46 +229,6 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install
// pre-install hooks // pre-install hooks
if !req.DisableHooks { if !req.DisableHooks {
watchFeed := &kube.WatchFeedProto{
WriteJobLogChunkFunc: func(chunk kube.JobLogChunk) error {
chunkResp := &services.InstallReleaseResponse{
WatchFeed: &release.WatchFeed{
JobLogChunk: &release.JobLogChunk{
JobName: chunk.JobName,
PodName: chunk.PodName,
ContainerName: chunk.ContainerName,
LogLines: make([]*release.LogLine, 0),
},
},
}
for _, line := range chunk.LogLines {
ll := &release.LogLine{
Timestamp: line.Timestamp,
Data: line.Data,
}
chunkResp.WatchFeed.JobLogChunk.LogLines = append(chunkResp.WatchFeed.JobLogChunk.LogLines, ll)
}
return stream.Send(chunkResp)
},
WriteJobPodErrorFunc: func(obj kube.JobPodError) error {
chunkResp := &services.InstallReleaseResponse{
WatchFeed: &release.WatchFeed{
JobPodError: &release.JobPodError{
JobName: obj.JobName,
PodName: obj.PodName,
ContainerName: obj.ContainerName,
Message: obj.Message,
},
},
}
return stream.Send(chunkResp)
},
}
// TODO watch job with feed only if job have annotation "helm/watch-logs": "true"
// TODO otherwise watch as ordinary hook just like before, using WatchUntilReady
if err := s.execHookWithWatchFeed(r.Hooks, r.Name, r.Namespace, hooks.PreInstall, req.Timeout, watchFeed); err != nil { if err := s.execHookWithWatchFeed(r.Hooks, r.Name, r.Namespace, hooks.PreInstall, req.Timeout, watchFeed); err != nil {
return res, err return res, err
} }
@ -288,7 +286,7 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install
// post-install hooks // post-install hooks
if !req.DisableHooks { if !req.DisableHooks {
if err := s.execHook(r.Hooks, r.Name, r.Namespace, hooks.PostInstall, req.Timeout); err != nil { if err := s.execHookWithWatchFeed(r.Hooks, r.Name, r.Namespace, hooks.PostInstall, req.Timeout, watchFeed); err != nil {
msg := fmt.Sprintf("Release %q failed post-install: %s", r.Name, err) msg := fmt.Sprintf("Release %q failed post-install: %s", r.Name, err)
s.Log("warning: %s", msg) s.Log("warning: %s", msg)
r.Info.Status.Code = release.Status_FAILED r.Info.Status.Code = release.Status_FAILED

@ -403,7 +403,17 @@ func (s *ReleaseServer) execHookWithWatchFeed(hs []*release.Hook, name, namespac
// We can't watch CRDs // We can't watch CRDs
if hook != hooks.CRDInstall { if hook != hooks.CRDInstall {
if err := kubeCli.WatchJobsTillDone(namespace, b, watchFeed, time.Duration(timeout)*time.Second); err != nil { var err error
if true {
// TODO: Watch with watchFeed only if helm/watch=true annotation is set,
// TODO: because this code is new and experimental, so WatchUntilReady
// TODO: will be used by default.
err = kubeCli.WatchJobsUntilReady(namespace, b, watchFeed, time.Duration(timeout)*time.Second)
} else {
err = kubeCli.WatchUntilReady(namespace, b, timeout, false)
}
if err != nil {
s.Log("warning: Release %s %s %s could not complete: %s", name, hook, h.Path, err) s.Log("warning: Release %s %s %s could not complete: %s", name, hook, h.Path, err)
// If a hook is failed, checkout the annotation of the hook to determine whether the hook should be deleted // If a hook is failed, checkout the annotation of the hook to determine whether the hook should be deleted
// under failed condition. If so, then clear the corresponding resource object in the hook // under failed condition. If so, then clear the corresponding resource object in the hook

Loading…
Cancel
Save