From ad99c1b749df43c6dcdbc1df278a88e8093290fb Mon Sep 17 00:00:00 2001 From: fibonacci1729 Date: Fri, 30 Mar 2018 09:28:42 -0600 Subject: [PATCH] release list paritioning --- pkg/tiller/release_list.go | 51 ++++++++++++++++++++++++++++++-------- 1 file changed, 41 insertions(+), 10 deletions(-) diff --git a/pkg/tiller/release_list.go b/pkg/tiller/release_list.go index ec4dbfb39..72c21d97c 100644 --- a/pkg/tiller/release_list.go +++ b/pkg/tiller/release_list.go @@ -18,11 +18,11 @@ package tiller import ( "fmt" - "regexp" - + "github.com/golang/protobuf/proto" "k8s.io/helm/pkg/proto/hapi/release" "k8s.io/helm/pkg/proto/hapi/services" relutil "k8s.io/helm/pkg/releaseutil" + "regexp" ) // ListReleases lists the releases found by the server. @@ -107,21 +107,52 @@ func (s *ReleaseServer) ListReleases(req *services.ListReleasesRequest, stream s rels = rels[0:req.Limit] l = int64(len(rels)) } - - for i := 0; i < min(len(rels), int(req.Limit)); i++ { - res := &services.ListReleasesResponse{ - Next: next, - Count: l, - Total: total, - Releases: []*release.Release{rels[i]}, - } + res := &services.ListReleasesResponse{ + Next: next, + Count: l, + Total: total, + } + chunks := s.partition(rels[:min(len(rels), int(req.Limit))], maxMsgSize-proto.Size(res)) + for res.Releases = range chunks { if err := stream.Send(res); err != nil { + for range chunks { // drain + } return err } } return nil } +// partition packs releases into slices upto the capacity cap in bytes. +func (s *ReleaseServer) partition(rels []*release.Release, cap int) <-chan []*release.Release { + chunks := make(chan []*release.Release, 1) + go func() { + var ( + fill = 0 // fill is space available to fill + size int // size is size of a release + ) + var chunk []*release.Release + for _, rls := range rels { + if size = proto.Size(rls); size+fill > cap { + // Over-cap, push chunk onto channel to send over gRPC stream + s.Log("partitioned at %d with %d releases (cap=%d)", fill, len(chunk), cap) + chunks <- chunk + // reset paritioning state + chunk = chunk[:0] + fill = 0 + } + chunk = append(chunk, rls) + fill += size + } + if len(chunk) > 0 { + // send remaining if any + chunks <- chunk + } + close(chunks) + }() + return chunks +} + func filterByNamespace(namespace string, rels []*release.Release) ([]*release.Release, error) { matches := []*release.Release{} for _, r := range rels {