ref(*): bypass grpc when invoking helm list

TODO reimplement any paging
pull/3945/head
Adam Reese 7 years ago
parent 62804848c6
commit 5715ee43d6
No known key found for this signature in database
GPG Key ID: 06F35E60A7A18DD6

@ -148,15 +148,11 @@ func (l *listCmd) run() error {
return prettyError(err) return prettyError(err)
} }
if len(res.GetReleases()) == 0 { if len(res) == 0 {
return nil return nil
} }
if res.Next != "" && !l.short { rels := filterList(res)
fmt.Fprintf(l.out, "\tnext: %s\n", res.Next)
}
rels := filterList(res.Releases)
if l.short { if l.short {
for _, r := range rels { for _, r := range rels {

@ -30,16 +30,20 @@ import (
rls "k8s.io/helm/pkg/proto/hapi/services" rls "k8s.io/helm/pkg/proto/hapi/services"
"k8s.io/helm/pkg/storage" "k8s.io/helm/pkg/storage"
"k8s.io/helm/pkg/storage/driver" "k8s.io/helm/pkg/storage/driver"
"k8s.io/helm/pkg/tiller"
) )
// maxMsgSize use 20MB as the default message size limit. // maxMsgSize use 20MB as the default message size limit.
// grpc library default is 4MB // grpc library default is 4MB
const maxMsgSize = 1024 * 1024 * 20 const maxMsgSize = 1024 * 1024 * 20
type Tiller = tiller.ReleaseServer
// Client manages client side of the Helm-Tiller protocol. // Client manages client side of the Helm-Tiller protocol.
type Client struct { type Client struct {
opts options opts options
store *storage.Storage store *storage.Storage
tiller *Tiller
} }
// NewClient creates a new client. // NewClient creates a new client.
@ -60,7 +64,7 @@ func (h *Client) Option(opts ...Option) *Client {
} }
// ListReleases lists the current releases. // ListReleases lists the current releases.
func (h *Client) ListReleases(opts ...ReleaseListOption) (*rls.ListReleasesResponse, error) { func (h *Client) ListReleases(opts ...ReleaseListOption) ([]*release.Release, error) {
reqOpts := h.opts reqOpts := h.opts
for _, opt := range opts { for _, opt := range opts {
opt(&reqOpts) opt(&reqOpts)
@ -73,7 +77,7 @@ func (h *Client) ListReleases(opts ...ReleaseListOption) (*rls.ListReleasesRespo
return nil, err return nil, err
} }
} }
return h.list(ctx, req) return h.tiller.ListReleases(req)
} }
// InstallRelease loads a chart from chstr, installs it, and returns the release response. // InstallRelease loads a chart from chstr, installs it, and returns the release response.

@ -48,12 +48,8 @@ var _ Interface = &FakeClient{}
var _ Interface = (*FakeClient)(nil) var _ Interface = (*FakeClient)(nil)
// ListReleases lists the current releases // ListReleases lists the current releases
func (c *FakeClient) ListReleases(opts ...ReleaseListOption) (*rls.ListReleasesResponse, error) { func (c *FakeClient) ListReleases(opts ...ReleaseListOption) ([]*release.Release, error) {
resp := &rls.ListReleasesResponse{ return c.Rels, nil
Count: int64(len(c.Rels)),
Releases: c.Rels,
}
return resp, nil
} }
// InstallRelease creates a new release and returns a InstallReleaseResponse containing that release // InstallRelease creates a new release and returns a InstallReleaseResponse containing that release

@ -24,7 +24,7 @@ import (
// Interface for helm client for mocking in tests // Interface for helm client for mocking in tests
type Interface interface { type Interface interface {
ListReleases(opts ...ReleaseListOption) (*rls.ListReleasesResponse, error) ListReleases(opts ...ReleaseListOption) ([]*release.Release, error)
InstallRelease(chStr, namespace string, opts ...InstallOption) (*rls.InstallReleaseResponse, error) InstallRelease(chStr, namespace string, opts ...InstallOption) (*rls.InstallReleaseResponse, error)
InstallReleaseFromChart(chart *chart.Chart, namespace string, opts ...InstallOption) (*rls.InstallReleaseResponse, error) InstallReleaseFromChart(chart *chart.Chart, namespace string, opts ...InstallOption) (*rls.InstallReleaseResponse, error)
DeleteRelease(rlsName string, opts ...DeleteOption) (*rls.UninstallReleaseResponse, error) DeleteRelease(rlsName string, opts ...DeleteOption) (*rls.UninstallReleaseResponse, error)

@ -17,21 +17,19 @@ limitations under the License.
package tiller package tiller
import ( import (
"fmt" "regexp"
"github.com/golang/protobuf/proto"
"k8s.io/helm/pkg/proto/hapi/release" "k8s.io/helm/pkg/proto/hapi/release"
"k8s.io/helm/pkg/proto/hapi/services" "k8s.io/helm/pkg/proto/hapi/services"
relutil "k8s.io/helm/pkg/releaseutil" relutil "k8s.io/helm/pkg/releaseutil"
"regexp"
) )
// ListReleases lists the releases found by the server. // ListReleases lists the releases found by the server.
func (s *ReleaseServer) ListReleases(req *services.ListReleasesRequest, stream services.ReleaseService_ListReleasesServer) error { func (s *ReleaseServer) ListReleases(req *services.ListReleasesRequest) ([]*release.Release, error) {
if len(req.StatusCodes) == 0 { if len(req.StatusCodes) == 0 {
req.StatusCodes = []release.Status_Code{release.Status_DEPLOYED} req.StatusCodes = []release.Status_Code{release.Status_DEPLOYED}
} }
//rels, err := s.env.Releases.ListDeployed()
rels, err := s.env.Releases.ListFilterAll(func(r *release.Release) bool { rels, err := s.env.Releases.ListFilterAll(func(r *release.Release) bool {
for _, sc := range req.StatusCodes { for _, sc := range req.StatusCodes {
if sc == r.Info.Status.Code { if sc == r.Info.Status.Code {
@ -41,25 +39,20 @@ func (s *ReleaseServer) ListReleases(req *services.ListReleasesRequest, stream s
return false return false
}) })
if err != nil { if err != nil {
return err return nil, err
} }
if req.Namespace != "" { if req.Namespace != "" {
rels, err = filterByNamespace(req.Namespace, rels) rels = filterByNamespace(req.Namespace, rels)
if err != nil {
return err
}
} }
if len(req.Filter) != 0 { if len(req.Filter) != 0 {
rels, err = filterReleases(req.Filter, rels) rels, err = filterReleases(req.Filter, rels)
if err != nil { if err != nil {
return err return nil, err
} }
} }
total := int64(len(rels))
switch req.SortBy { switch req.SortBy {
case services.ListSort_NAME: case services.ListSort_NAME:
relutil.SortByName(rels) relutil.SortByName(rels)
@ -76,91 +69,17 @@ func (s *ReleaseServer) ListReleases(req *services.ListReleasesRequest, stream s
rels = rr rels = rr
} }
l := int64(len(rels)) return rels, nil
if req.Offset != "" {
i := -1
for ii, cur := range rels {
if cur.Name == req.Offset {
i = ii
}
}
if i == -1 {
return fmt.Errorf("offset %q not found", req.Offset)
}
if len(rels) < i {
return fmt.Errorf("no items after %q", req.Offset)
}
rels = rels[i:]
l = int64(len(rels))
}
if req.Limit == 0 {
req.Limit = ListDefaultLimit
}
next := ""
if l > req.Limit {
next = rels[req.Limit].Name
rels = rels[0:req.Limit]
l = int64(len(rels))
}
res := &services.ListReleasesResponse{
Next: next,
Count: l,
Total: total,
}
chunks := s.partition(rels[:min(len(rels), int(req.Limit))], maxMsgSize-proto.Size(res))
for res.Releases = range chunks {
if err := stream.Send(res); err != nil {
for range chunks { // drain
}
return err
}
}
return nil
}
// partition packs releases into slices upto the capacity cap in bytes.
func (s *ReleaseServer) partition(rels []*release.Release, cap int) <-chan []*release.Release {
chunks := make(chan []*release.Release, 1)
go func() {
var (
fill = 0 // fill is space available to fill
size int // size is size of a release
)
var chunk []*release.Release
for _, rls := range rels {
if size = proto.Size(rls); size+fill > cap {
// Over-cap, push chunk onto channel to send over gRPC stream
s.Log("partitioned at %d with %d releases (cap=%d)", fill, len(chunk), cap)
chunks <- chunk
// reset paritioning state
chunk = chunk[:0]
fill = 0
}
chunk = append(chunk, rls)
fill += size
}
if len(chunk) > 0 {
// send remaining if any
chunks <- chunk
}
close(chunks)
}()
return chunks
} }
func filterByNamespace(namespace string, rels []*release.Release) ([]*release.Release, error) { func filterByNamespace(namespace string, rels []*release.Release) []*release.Release {
matches := []*release.Release{} matches := []*release.Release{}
for _, r := range rels { for _, r := range rels {
if namespace == r.Namespace { if namespace == r.Namespace {
matches = append(matches, r) matches = append(matches, r)
} }
} }
return matches, nil return matches
} }
func filterReleases(filter string, rels []*release.Release) ([]*release.Release, error) { func filterReleases(filter string, rels []*release.Release) ([]*release.Release, error) {

@ -35,13 +35,13 @@ func TestListReleases(t *testing.T) {
} }
} }
mrs := &mockListServer{} rels, err := rs.ListReleases(&services.ListReleasesRequest{})
if err := rs.ListReleases(&services.ListReleasesRequest{Offset: "", Limit: 64}, mrs); err != nil { if err != nil {
t.Fatalf("Failed listing: %s", err) t.Fatalf("Failed listing: %s", err)
} }
if len(mrs.val.Releases) != num { if len(rels) != num {
t.Errorf("Expected %d releases, got %d", num, len(mrs.val.Releases)) t.Errorf("Expected %d releases, got %d", num, len(rels))
} }
} }
@ -87,18 +87,18 @@ func TestListReleasesByStatus(t *testing.T) {
} }
for i, tt := range tests { for i, tt := range tests {
mrs := &mockListServer{} rels, err := rs.ListReleases(&services.ListReleasesRequest{StatusCodes: tt.statusCodes, Offset: "", Limit: 64})
if err := rs.ListReleases(&services.ListReleasesRequest{StatusCodes: tt.statusCodes, Offset: "", Limit: 64}, mrs); err != nil { if err != nil {
t.Fatalf("Failed listing %d: %s", i, err) t.Fatalf("Failed listing %d: %s", i, err)
} }
if len(tt.names) != len(mrs.val.Releases) { if len(tt.names) != len(rels) {
t.Fatalf("Expected %d releases, got %d", len(tt.names), len(mrs.val.Releases)) t.Fatalf("Expected %d releases, got %d", len(tt.names), len(rels))
} }
for _, name := range tt.names { for _, name := range tt.names {
found := false found := false
for _, rel := range mrs.val.Releases { for _, rel := range rels {
if rel.Name == name { if rel.Name == name {
found = true found = true
} }
@ -125,24 +125,24 @@ func TestListReleasesSort(t *testing.T) {
} }
limit := 6 limit := 6
mrs := &mockListServer{}
req := &services.ListReleasesRequest{ req := &services.ListReleasesRequest{
Offset: "", Offset: "",
Limit: int64(limit), Limit: int64(limit),
SortBy: services.ListSort_NAME, SortBy: services.ListSort_NAME,
} }
if err := rs.ListReleases(req, mrs); err != nil { rels, err := rs.ListReleases(req)
if err != nil {
t.Fatalf("Failed listing: %s", err) t.Fatalf("Failed listing: %s", err)
} }
if len(mrs.val.Releases) != limit { // if len(rels) != limit {
t.Errorf("Expected %d releases, got %d", limit, len(mrs.val.Releases)) // t.Errorf("Expected %d releases, got %d", limit, len(rels))
} // }
for i := 0; i < limit; i++ { for i := 0; i < limit; i++ {
n := fmt.Sprintf("rel-%d", i+1) n := fmt.Sprintf("rel-%d", i+1)
if mrs.val.Releases[i].Name != n { if rels[i].Name != n {
t.Errorf("Expected %q, got %q", n, mrs.val.Releases[i].Name) t.Errorf("Expected %q, got %q", n, rels[i].Name)
} }
} }
} }
@ -167,26 +167,26 @@ func TestListReleasesFilter(t *testing.T) {
} }
} }
mrs := &mockListServer{}
req := &services.ListReleasesRequest{ req := &services.ListReleasesRequest{
Offset: "", Offset: "",
Limit: 64, Limit: 64,
Filter: "neuro[a-z]+", Filter: "neuro[a-z]+",
SortBy: services.ListSort_NAME, SortBy: services.ListSort_NAME,
} }
if err := rs.ListReleases(req, mrs); err != nil { rels, err := rs.ListReleases(req)
if err != nil {
t.Fatalf("Failed listing: %s", err) t.Fatalf("Failed listing: %s", err)
} }
if len(mrs.val.Releases) != 2 { if len(rels) != 2 {
t.Errorf("Expected 2 releases, got %d", len(mrs.val.Releases)) t.Errorf("Expected 2 releases, got %d", len(rels))
} }
if mrs.val.Releases[0].Name != "neuroglia" { if rels[0].Name != "neuroglia" {
t.Errorf("Unexpected sort order: %v.", mrs.val.Releases) t.Errorf("Unexpected sort order: %v.", rels)
} }
if mrs.val.Releases[1].Name != "neuron" { if rels[1].Name != "neuron" {
t.Errorf("Unexpected sort order: %v.", mrs.val.Releases) t.Errorf("Unexpected sort order: %v.", rels)
} }
} }
@ -216,18 +216,18 @@ func TestReleasesNamespace(t *testing.T) {
} }
} }
mrs := &mockListServer{}
req := &services.ListReleasesRequest{ req := &services.ListReleasesRequest{
Offset: "", Offset: "",
Limit: 64, Limit: 64,
Namespace: "test123", Namespace: "test123",
} }
if err := rs.ListReleases(req, mrs); err != nil { rels, err := rs.ListReleases(req)
if err != nil {
t.Fatalf("Failed listing: %s", err) t.Fatalf("Failed listing: %s", err)
} }
if len(mrs.val.Releases) != 2 { if len(rels) != 2 {
t.Errorf("Expected 2 releases, got %d", len(mrs.val.Releases)) t.Errorf("Expected 2 releases, got %d", len(rels))
} }
} }

@ -429,22 +429,6 @@ func (h *hookFailingKubeClient) WatchUntilReady(ns string, r io.Reader, timeout
return errors.New("Failed watch") return errors.New("Failed watch")
} }
type mockListServer struct {
val *services.ListReleasesResponse
}
func (l *mockListServer) Send(res *services.ListReleasesResponse) error {
l.val = res
return nil
}
func (l *mockListServer) Context() context.Context { return context.TODO() }
func (l *mockListServer) SendMsg(v interface{}) error { return nil }
func (l *mockListServer) RecvMsg(v interface{}) error { return nil }
func (l *mockListServer) SendHeader(m metadata.MD) error { return nil }
func (l *mockListServer) SetTrailer(m metadata.MD) {}
func (l *mockListServer) SetHeader(m metadata.MD) error { return nil }
type mockRunReleaseTestServer struct{} type mockRunReleaseTestServer struct{}
func (rs mockRunReleaseTestServer) Send(m *services.TestReleaseResponse) error { func (rs mockRunReleaseTestServer) Send(m *services.TestReleaseResponse) error {

Loading…
Cancel
Save