add filtering mechanism to storage and embed Driver directly into Storage

pull/1007/head
fibonacci1729 9 years ago
parent 061b534885
commit bfd6712c94

@ -0,0 +1,51 @@
package storage
import rspb "k8s.io/helm/pkg/proto/hapi/release"
// FilterFunc returns true if the release object satisfies
// the predicate of the underlying func.
type FilterFunc func(*rspb.Release) bool
// Check applies the FilterFunc to the release object.
func (fn FilterFunc) Check(rls *rspb.Release) bool {
if rls == nil {
return false
}
return fn(rls)
}
// Any returns a FilterFunc that filters a list of releases
// determined by the predicate 'f0 || f1 || ... || fn'.
func Any(filters ...FilterFunc) FilterFunc {
return func(rls *rspb.Release) bool {
for _, filter := range filters {
if filter(rls) {
return true
}
}
return false
}
}
// All returns a FilterFunc that filters a list of releases
// determined by the predicate 'f0 && f1 && ... && fn'.
func All(filters ...FilterFunc) FilterFunc {
return func(rls *rspb.Release) bool {
for _, filter := range filters {
if !filter(rls) {
return false
}
}
return true
}
}
// StatusFilter filters a set of releases by status code.
func StatusFilter(status rspb.Status_Code) FilterFunc {
return FilterFunc(func(rls *rspb.Release) bool {
if rls == nil {
return true
}
return rls.GetInfo().GetStatus().Code == status
})
}

@ -21,28 +21,76 @@ import (
) )
type Storage struct { type Storage struct {
drv driver.Driver driver.Driver
} }
func (st *Storage) StoreRelease(rls *rspb.Release) error { // Create creates a new storage entry holding the release.
return st.drv.Create(rls) // An error is returned if storage failed to store the release.
//
// TODO: Is marking the release as deployed the only task here?
// What happens if an identical release already exists?
func (s *Storage) Create(rls *rspb.Release) error {
return s.Driver.Create(rls)
} }
func (st *Storage) UpdateRelease(rls *rspb.Release) error { // Update update the release in storage. An error is returned if the
return st.drv.Update(rls) // storage backend fails to update the release or if the release does not exist.
//
// TODO: Fetch most recent deployed release, if it exists, mark
// as SUPERSEDED, then store both new and old.
func (s *Storage) Update(rls *rspb.Release) error {
return s.Driver.Update(rls)
} }
func (st *Storage) QueryRelease(key string) (*rspb.Release, error) { // Delete deletes the release from storage. An error is returned if the
return st.drv.Get(key) // storage backend fails to delete the release or if the release does not exist.
//
// TODO: The release status should be modified to reflect the DELETED status.
func (s *Storage) Delete(key string) (*rspb.Release, error) {
return s.Driver.Delete(key)
} }
func (st *Storage) DeleteRelease(key string) (*rspb.Release, error) { // ListReleases returns all releases from storage. An error is returned if the
return st.drv.Delete(key) // storage backend fails to retrieve the releases.
func (s *Storage) ListReleases() ([]*rspb.Release, error) {
return s.Driver.List(func(_ *rspb.Release) bool { return true })
} }
func Init(drv driver.Driver) *Storage { // ListDeleted returns all releases with Status == DELETED. An error is returned
if drv == nil { // if the storage backend fails to retrieve the releases.
drv = driver.NewMemory() func (s *Storage) ListDeleted() ([]*rspb.Release, error) {
} return s.Driver.List(func(rls *rspb.Release) bool {
return &Storage{drv: drv} return StatusFilter(rspb.Status_DELETED).Check(rls)
})
} }
// ListDeployed returns all releases with Status == DEPLOYED. An error is returned
// if the storage backend fails to retrieve the releases.
func (s *Storage) ListDeployed() ([]*rspb.Release, error) {
return s.Driver.List(func(rls *rspb.Release) bool {
return StatusFilter(rspb.Status_DEPLOYED).Check(rls)
})
}
// ListFilterAll returns the set of releases satisfying satisfying the predicate
// (filter0 && filter1 && ... && filterN), i.e. a Release is included in the results
// if and only if all filters return true.
func (s *Storage) ListFilterAll(filters ...FilterFunc) ([]*rspb.Release, error) {
return s.Driver.List(func(rls *rspb.Release) bool {
return All(filters...).Check(rls)
})
}
// ListFilterAny returns the set of releases satisfying satisfying the predicate
// (filter0 || filter1 || ... || filterN), i.e. a Release is included in the results
// if at least one of the filters returns true.
func (s *Storage) ListFilterAny(filters ...FilterFunc) ([]*rspb.Release, error) {
return s.Driver.List(func(rls *rspb.Release) bool {
return Any(filters...).Check(rls)
})
}
func Init(d driver.Driver) *Storage {
if d == nil { d = driver.NewMemory() }
return &Storage{Driver: d}
}

@ -19,123 +19,159 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"testing" "testing"
"time"
rspb "k8s.io/helm/pkg/proto/hapi/release" rspb "k8s.io/helm/pkg/proto/hapi/release"
"k8s.io/helm/pkg/storage/driver" "k8s.io/helm/pkg/storage/driver"
tspb "github.com/golang/protobuf/ptypes"
) )
var storage = Init(driver.NewMemory()) var storage = Init(driver.NewMemory())
func releaseData() *rspb.Release { func TestStorageCreate(t *testing.T) {
var manifest = `apiVersion: v1 // create fake release
kind: ConfigMap rls := ReleaseTestData{Name: "angry-beaver"}.ToRelease()
metadata: assertErrNil(t.Fatal, storage.Create(rls), "StoreRelease")
name: configmap-storage-test
data:
count: "100"
limit: "200"
state: "new"
token: "abc"
`
tm, _ := tspb.TimestampProto(time.Now()) // fetch the release
return &rspb.Release{ res, err := storage.Get(rls.Name)
Name: "hungry-hippo", assertErrNil(t.Fatal, err, "QueryRelease")
Info: &rspb.Info{
FirstDeployed: tm, // verify the fetched and created release are the same
LastDeployed: tm, if !reflect.DeepEqual(rls, res) {
Status: &rspb.Status{Code: rspb.Status_DEPLOYED}, t.Fatalf("Expected %q, got %q", rls, res)
},
Version: 2,
Manifest: manifest,
Namespace: "kube-system",
} }
} }
func TestStoreRelease(t *testing.T) { func TestStorageUpdate(t *testing.T) {
ckerr := func(err error, msg string) { // create fake release
if err != nil { rls := ReleaseTestData{Name: "angry-beaver"}.ToRelease()
t.Fatalf(fmt.Sprintf("Failed to %s: %q", msg, err)) assertErrNil(t.Fatal, storage.Create(rls), "StoreRelease")
}
}
rls := releaseData() // modify the release
ckerr(storage.StoreRelease(rls), "StoreRelease") rls.Version = 2
rls.Manifest = "new-manifest"
assertErrNil(t.Fatal, storage.Update(rls), "UpdateRelease")
res, err := storage.QueryRelease(rls.Name) // retrieve the updated release
ckerr(err, "QueryRelease") res, err := storage.Get(rls.Name)
assertErrNil(t.Fatal, err, "QueryRelease")
// verify updated and fetched releases are the same.
if !reflect.DeepEqual(rls, res) { if !reflect.DeepEqual(rls, res) {
t.Fatalf("Expected %q, got %q", rls, res) t.Fatalf("Expected %q, got %q", rls, res)
} }
} }
func TestQueryRelease(t *testing.T) { func TestStorageDelete(t *testing.T) {
ckerr := func(err error, msg string) { // create fake release
if err != nil { rls := ReleaseTestData{Name: "angry-beaver"}.ToRelease()
t.Fatalf(fmt.Sprintf("Failed to %s: %q", msg, err)) assertErrNil(t.Fatal, storage.Create(rls), "StoreRelease")
}
}
rls := releaseData()
ckerr(storage.StoreRelease(rls), "StoreRelease")
res, err := storage.QueryRelease(rls.Name) // delete the release
ckerr(err, "QueryRelease") res, err := storage.Delete(rls.Name)
assertErrNil(t.Fatal, err, "DeleteRelease")
// verify updated and fetched releases are the same.
if !reflect.DeepEqual(rls, res) { if !reflect.DeepEqual(rls, res) {
t.Fatalf("Expected %q, got %q", rls, res) t.Fatalf("Expected %q, got %q", rls, res)
} }
} }
func TestDeleteRelease(t *testing.T) { func TestStorageList(t *testing.T) {
ckerr := func(err error, msg string) { // setup storage with test releases
if err != nil { setup := func() {
t.Fatalf(fmt.Sprintf("Failed to %s: %q", msg, err)) // release records
} rls0 := ReleaseTestData{Name: "happy-catdog", Status: rspb.Status_SUPERSEDED}.ToRelease()
rls1 := ReleaseTestData{Name: "livid-human", Status: rspb.Status_SUPERSEDED}.ToRelease()
rls2 := ReleaseTestData{Name: "relaxed-cat", Status: rspb.Status_SUPERSEDED}.ToRelease()
rls3 := ReleaseTestData{Name: "hungry-hippo", Status: rspb.Status_DEPLOYED}.ToRelease()
rls4 := ReleaseTestData{Name: "angry-beaver", Status: rspb.Status_DEPLOYED}.ToRelease()
rls5 := ReleaseTestData{Name: "opulent-frog", Status: rspb.Status_DELETED}.ToRelease()
rls6 := ReleaseTestData{Name: "happy-liger", Status: rspb.Status_DELETED}.ToRelease()
// create the release records in the storage
assertErrNil(t.Fatal, storage.Create(rls0), "Storing release 'rls0'")
assertErrNil(t.Fatal, storage.Create(rls1), "Storing release 'rls1'")
assertErrNil(t.Fatal, storage.Create(rls2), "Storing release 'rls2'")
assertErrNil(t.Fatal, storage.Create(rls3), "Storing release 'rls3'")
assertErrNil(t.Fatal, storage.Create(rls4), "Storing release 'rls4'")
assertErrNil(t.Fatal, storage.Create(rls5), "Storing release 'rls5'")
assertErrNil(t.Fatal, storage.Create(rls6), "Storing release 'rls6'")
} }
rls := releaseData() var listTests = []struct{
ckerr(storage.StoreRelease(rls), "StoreRelease") Description string
NumExpected int
res, err := storage.DeleteRelease(rls.Name) ListFunc func() ([]*rspb.Release,error)
ckerr(err, "DeleteRelease") }{
{"ListDeleted", 2, storage.ListDeleted},
if !reflect.DeepEqual(rls, res) { {"ListDeployed", 2, storage.ListDeployed},
t.Fatalf("Expected %q, got %q", rls, res) {"ListReleases", 7, storage.ListReleases},
} }
}
func TestUpdateRelease(t *testing.T) { setup()
ckeql := func(got, want interface{}, msg string) {
if !reflect.DeepEqual(got, want) { for _, tt := range listTests {
t.Fatalf(fmt.Sprintf("%s: got %T, want %T", msg, got, want)) list, err := tt.ListFunc()
assertErrNil(t.Fatal, err, tt.Description)
// verify the count of releases returned
if len(list) != tt.NumExpected {
t.Errorf("ListReleases(%s): expected %d, actual %d",
tt.Description,
tt.NumExpected,
len(list))
} }
} }
}
ckerr := func(err error, msg string) { type ReleaseTestData struct {
if err != nil { Name string
t.Fatalf(fmt.Sprintf("Failed to %s: %q", msg, err)) Version int32
} Manifest string
Namespace string
Status rspb.Status_Code
}
func (test ReleaseTestData) ToRelease() *rspb.Release {
return &rspb.Release{
Name: test.Name,
Version: test.Version,
Manifest: test.Manifest,
Namespace: test.Namespace,
Info: &rspb.Info{Status: &rspb.Status{Code: test.Status}},
} }
}
rls := releaseData() func assertErrNil(eh func(args ...interface{}), err error, message string) {
ckerr(storage.StoreRelease(rls), "StoreRelease") if err != nil {
eh(fmt.Sprintf("%s: %q", message, err))
}
}
rls.Name = "hungry-hippo"
rls.Version = 2
rls.Manifest = "old-manifest"
err := storage.UpdateRelease(rls) /*
ckerr(err, "UpdateRelease") func releaseData() *rspb.Release {
var manifest = `apiVersion: v1
kind: ConfigMap
metadata:
name: configmap-storage-test
data:
count: "100"
limit: "200"
state: "new"
token: "abc"
`
res, err := storage.QueryRelease(rls.Name) tm, _ := tspb.TimestampProto(time.Now())
ckerr(err, "QueryRelease") return &rspb.Release{
ckeql(res, rls, "Expected Release") Name: "hungry-hippo",
ckeql(res.Name, rls.Name, "Expected Name") Info: &rspb.Info{
ckeql(res.Version, rls.Version, "Expected Version") FirstDeployed: tm,
ckeql(res.Manifest, rls.Manifest, "Expected Manifest") LastDeployed: tm,
Status: &rspb.Status{Code: rspb.Status_DEPLOYED},
},
Version: 2,
Manifest: manifest,
Namespace: "kube-system",
}
} }
*/
Loading…
Cancel
Save