From 5672d5d3baca2b4c5872fe8bf8ae3e74d6d9bef3 Mon Sep 17 00:00:00 2001 From: John Welsh Date: Thu, 4 May 2017 15:28:24 -0700 Subject: [PATCH] Cleaned up stream closure --- cmd/helm/logs.go | 1 + pkg/helm/client.go | 29 +++++++++-------------------- 2 files changed, 10 insertions(+), 20 deletions(-) diff --git a/cmd/helm/logs.go b/cmd/helm/logs.go index 7dd320f86..4bba63bb3 100644 --- a/cmd/helm/logs.go +++ b/cmd/helm/logs.go @@ -78,6 +78,7 @@ func (l *logsCmd) run() error { return nil } fmt.Println(l.Log.Log) + done<- struct{}{} } } diff --git a/pkg/helm/client.go b/pkg/helm/client.go index 100a4dd48..9392e0700 100644 --- a/pkg/helm/client.go +++ b/pkg/helm/client.go @@ -396,46 +396,35 @@ func (h *Client) logs(ctx context.Context, req *rls.GetReleaseLogsRequest, done rlc := rls.NewReleaseServiceClient(c) s, err := rlc.GetReleaseLogs(ctx) - fmt.Println("Got s: ", s, " err: ", err) if err != nil { return nil, err } - s.Send(req) - fmt.Println("Sent req") - out := make(chan *rls.GetReleaseLogsResponse) + go func() { + <-done + s.CloseSend() + }() + go func() { defer close(out) defer c.Close() for { - fmt.Println("Waiting on recv") rs, err := s.Recv() - fmt.Println("Got rs: ", s, " err: ", err) if err == io.EOF { return } if err != nil { - fmt.Println() + fmt.Println("gRPC error streaming logs: ", grpc.ErrorDesc(err)) + return } out <- rs - //select { - ////case rs, err := s.Recv(): - //// if err == io.EOF { - //// return - //// } - //// if err != nil { - //// fmt.Println("gRPC error streaming logs: ", grpc.ErrorDesc(err)) - //// return - //// } - //// out <- rs - //case <-done: - // return - //} } }() + s.Send(req) + return out, nil }