Merge pull request #3782 from fibonacci1729/master

stream releases when listing
pull/3812/head
Brian 7 years ago committed by GitHub
commit 58e4b6ac61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -148,7 +148,7 @@ func (l *listCmd) run() error {
return prettyError(err) return prettyError(err)
} }
if len(res.Releases) == 0 { if len(res.GetReleases()) == 0 {
return nil return nil
} }
@ -239,12 +239,16 @@ func formatList(rels []*release.Release, colWidth uint) string {
table.MaxColWidth = colWidth table.MaxColWidth = colWidth
table.AddRow("NAME", "REVISION", "UPDATED", "STATUS", "CHART", "NAMESPACE") table.AddRow("NAME", "REVISION", "UPDATED", "STATUS", "CHART", "NAMESPACE")
for _, r := range rels { for _, r := range rels {
c := fmt.Sprintf("%s-%s", r.Chart.Metadata.Name, r.Chart.Metadata.Version) md := r.GetChart().GetMetadata()
t := timeconv.String(r.Info.LastDeployed) c := fmt.Sprintf("%s-%s", md.GetName(), md.GetVersion())
s := r.Info.Status.Code.String() t := "-"
v := r.Version if tspb := r.GetInfo().GetLastDeployed(); tspb != nil {
n := r.Namespace t = timeconv.String(tspb)
table.AddRow(r.Name, v, t, s, c, n) }
s := r.GetInfo().GetStatus().GetCode().String()
v := r.GetVersion()
n := r.GetNamespace()
table.AddRow(r.GetName(), v, t, s, c, n)
} }
return table.String() return table.String()
} }

@ -346,8 +346,22 @@ func (h *Client) list(ctx context.Context, req *rls.ListReleasesRequest) (*rls.L
if err != nil { if err != nil {
return nil, err return nil, err
} }
var resp *rls.ListReleasesResponse
return s.Recv() for {
r, err := s.Recv()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
if resp == nil {
resp = r
continue
}
resp.Releases = append(resp.Releases, r.GetReleases()[0])
}
return resp, nil
} }
// Executes tiller.InstallRelease RPC. // Executes tiller.InstallRelease RPC.

@ -18,11 +18,11 @@ package tiller
import ( import (
"fmt" "fmt"
"regexp" "github.com/golang/protobuf/proto"
"k8s.io/helm/pkg/proto/hapi/release" "k8s.io/helm/pkg/proto/hapi/release"
"k8s.io/helm/pkg/proto/hapi/services" "k8s.io/helm/pkg/proto/hapi/services"
relutil "k8s.io/helm/pkg/releaseutil" relutil "k8s.io/helm/pkg/releaseutil"
"regexp"
) )
// ListReleases lists the releases found by the server. // ListReleases lists the releases found by the server.
@ -107,14 +107,50 @@ func (s *ReleaseServer) ListReleases(req *services.ListReleasesRequest, stream s
rels = rels[0:req.Limit] rels = rels[0:req.Limit]
l = int64(len(rels)) l = int64(len(rels))
} }
res := &services.ListReleasesResponse{ res := &services.ListReleasesResponse{
Next: next, Next: next,
Count: l, Count: l,
Total: total, Total: total,
Releases: rels,
} }
return stream.Send(res) chunks := s.partition(rels[:min(len(rels), int(req.Limit))], maxMsgSize-proto.Size(res))
for res.Releases = range chunks {
if err := stream.Send(res); err != nil {
for range chunks { // drain
}
return err
}
}
return nil
}
// partition packs releases into slices upto the capacity cap in bytes.
func (s *ReleaseServer) partition(rels []*release.Release, cap int) <-chan []*release.Release {
chunks := make(chan []*release.Release, 1)
go func() {
var (
fill = 0 // fill is space available to fill
size int // size is size of a release
)
var chunk []*release.Release
for _, rls := range rels {
if size = proto.Size(rls); size+fill > cap {
// Over-cap, push chunk onto channel to send over gRPC stream
s.Log("partitioned at %d with %d releases (cap=%d)", fill, len(chunk), cap)
chunks <- chunk
// reset paritioning state
chunk = chunk[:0]
fill = 0
}
chunk = append(chunk, rls)
fill += size
}
if len(chunk) > 0 {
// send remaining if any
chunks <- chunk
}
close(chunks)
}()
return chunks
} }
func filterByNamespace(namespace string, rels []*release.Release) ([]*release.Release, error) { func filterByNamespace(namespace string, rels []*release.Release) ([]*release.Release, error) {

Loading…
Cancel
Save