Apply label selector at server side

Signed-off-by: Nandor Kollar <nkollar@cloudera.com>
pull/11881/head
Nandor Kollar 3 years ago
parent e007c900ce
commit 98d04fae8a

@ -159,14 +159,21 @@ func (l *List) Run() ([]*release.Release, error) {
} }
} }
results, err := l.cfg.Releases.List(func(rel *release.Release) bool { filterFunction := func(rel *release.Release) bool {
// Skip anything that doesn't match the filter. // Skip anything that doesn't match the filter.
if filter != nil && !filter.MatchString(rel.Name) { if filter != nil && !filter.MatchString(rel.Name) {
return false return false
} }
return true return true
}) }
var results []*release.Release
var err error
if l.cfg.Releases.CanPushDownLabelSelector() {
results, err = l.cfg.Releases.ListWithSelector(filterFunction, l.Selector)
} else {
results, err = l.cfg.Releases.List(filterFunction)
}
if err != nil { if err != nil {
return nil, err return nil, err
@ -187,12 +194,16 @@ func (l *List) Run() ([]*release.Release, error) {
// latest releases, otherwise outdated entries can be returned // latest releases, otherwise outdated entries can be returned
results = l.filterStateMask(results) results = l.filterStateMask(results)
// If storage layer can't perform label selector filtering,
// then fall back to client side filtering
if !l.cfg.Releases.CanPushDownLabelSelector() {
// Skip anything that doesn't match the selector // Skip anything that doesn't match the selector
selectorObj, err := labels.Parse(l.Selector) selectorObj, err := labels.Parse(l.Selector)
if err != nil { if err != nil {
return nil, err return nil, err
} }
results = l.filterSelector(results, selectorObj) results = l.filterSelector(results, selectorObj)
}
// Unfortunately, we have to sort before truncating, which can incur substantial overhead // Unfortunately, we have to sort before truncating, which can incur substantial overhead
l.sort(results) l.sort(results)

@ -350,6 +350,14 @@ func TestSelectorList(t *testing.T) {
is.Error(err) is.Error(err)
}) })
t.Run("should select two releases with set selector", func(t *testing.T) {
lister.Selector = "key in (value1, value2)"
res, _ := lister.Run()
expectedFilteredList := []*release.Release{r1, r2}
assert.ElementsMatch(t, expectedFilteredList, res)
})
t.Run("should select one release with matching label", func(t *testing.T) { t.Run("should select one release with matching label", func(t *testing.T) {
lister.Selector = "key==value1" lister.Selector = "key==value1"
res, _ := lister.Run() res, _ := lister.Run()

@ -45,6 +45,10 @@ type ConfigMaps struct {
Log func(string, ...interface{}) Log func(string, ...interface{})
} }
func (cfgmaps *ConfigMaps) CanPushDownLabelSelector() bool {
return true
}
// NewConfigMaps initializes a new ConfigMaps wrapping an implementation of // NewConfigMaps initializes a new ConfigMaps wrapping an implementation of
// the kubernetes ConfigMapsInterface. // the kubernetes ConfigMapsInterface.
func NewConfigMaps(impl corev1.ConfigMapInterface) *ConfigMaps { func NewConfigMaps(impl corev1.ConfigMapInterface) *ConfigMaps {
@ -82,12 +86,8 @@ func (cfgmaps *ConfigMaps) Get(key string) (*rspb.Release, error) {
return r, nil return r, nil
} }
// List fetches all releases and returns the list releases such func (cfgmaps *ConfigMaps) listInternal(filter func(*rspb.Release) bool, selector string) ([]*rspb.Release, error) {
// that filter(release) == true. An error is returned if the opts := metav1.ListOptions{LabelSelector: selector}
// configmap fails to retrieve the releases.
func (cfgmaps *ConfigMaps) List(filter func(*rspb.Release) bool) ([]*rspb.Release, error) {
lsel := kblabels.Set{"owner": "helm"}.AsSelector()
opts := metav1.ListOptions{LabelSelector: lsel.String()}
list, err := cfgmaps.impl.List(context.Background(), opts) list, err := cfgmaps.impl.List(context.Background(), opts)
if err != nil { if err != nil {
@ -115,6 +115,23 @@ func (cfgmaps *ConfigMaps) List(filter func(*rspb.Release) bool) ([]*rspb.Releas
return results, nil return results, nil
} }
func (cfgmaps *ConfigMaps) ListWithSelector(filter func(*rspb.Release) bool, selector string) ([]*rspb.Release, error) {
lsel := kblabels.Set{"owner": "helm"}.AsSelector()
labelSelector := lsel.String()
if selector != "" {
labelSelector += "," + selector
}
return cfgmaps.listInternal(filter, labelSelector)
}
// List fetches all releases and returns the list releases such
// that filter(release) == true. An error is returned if the
// configmap fails to retrieve the releases.
func (cfgmaps *ConfigMaps) List(filter func(*rspb.Release) bool) ([]*rspb.Release, error) {
lsel := kblabels.Set{"owner": "helm"}.AsSelector()
return cfgmaps.listInternal(filter, lsel.String())
}
// Query fetches all releases that match the provided map of labels. // Query fetches all releases that match the provided map of labels.
// An error is returned if the configmap fails to retrieve the releases. // An error is returned if the configmap fails to retrieve the releases.
func (cfgmaps *ConfigMaps) Query(labels map[string]string) ([]*rspb.Release, error) { func (cfgmaps *ConfigMaps) Query(labels map[string]string) ([]*rspb.Release, error) {
@ -226,7 +243,6 @@ func (cfgmaps *ConfigMaps) Delete(key string) (rls *rspb.Release, err error) {
// "status" - status of the release (see pkg/release/status.go for variants) // "status" - status of the release (see pkg/release/status.go for variants)
// "owner" - owner of the configmap, currently "helm". // "owner" - owner of the configmap, currently "helm".
// "name" - name of the release. // "name" - name of the release.
//
func newConfigMapsObject(key string, rls *rspb.Release, lbs labels) (*v1.ConfigMap, error) { func newConfigMapsObject(key string, rls *rspb.Release, lbs labels) (*v1.ConfigMap, error) {
const owner = "helm" const owner = "helm"

@ -128,6 +128,26 @@ func TestConfigMapList(t *testing.T) {
if len(ssd) != 2 { if len(ssd) != 2 {
t.Errorf("Expected 2 superseded, got %d", len(ssd)) t.Errorf("Expected 2 superseded, got %d", len(ssd))
} }
// list all uninstalled releases with label selectors
delSel, err := cfgmaps.ListWithSelector(func(rel *rspb.Release) bool { return true }, "status=uninstalled")
// check
if err != nil {
t.Errorf("Failed to list uninstalled: %s", err)
}
if len(delSel) != 2 {
t.Errorf("Expected 2 uninstalled, got %d:\n%v\n", len(del), del)
}
// list all uninstalled or deployed releases with label selectors
dplOrDelSel, err := cfgmaps.ListWithSelector(func(rel *rspb.Release) bool { return true }, "status in (uninstalled, deployed)")
// check
if err != nil {
t.Errorf("Failed to list uninstalled or deployed: %s", err)
}
if len(dplOrDelSel) != 4 {
t.Errorf("Expected 4 uninstalled or deployed, got %d:\n%v\n", len(del), del)
}
} }
func TestConfigMapQuery(t *testing.T) { func TestConfigMapQuery(t *testing.T) {

@ -85,11 +85,16 @@ type Deletor interface {
// //
// List returns the set of all releases that satisfy the filter predicate. // List returns the set of all releases that satisfy the filter predicate.
// //
// ListWithSelector returns the set of all releases that satisfy the filter predicate.
// and label selector expression.
//
// Query returns the set of all releases that match the provided label set. // Query returns the set of all releases that match the provided label set.
type Queryor interface { type Queryor interface {
Get(key string) (*rspb.Release, error) Get(key string) (*rspb.Release, error)
List(filter func(*rspb.Release) bool) ([]*rspb.Release, error) List(filter func(*rspb.Release) bool) ([]*rspb.Release, error)
ListWithSelector(filter func(*rspb.Release) bool, selector string) ([]*rspb.Release, error)
Query(labels map[string]string) ([]*rspb.Release, error) Query(labels map[string]string) ([]*rspb.Release, error)
CanPushDownLabelSelector() bool
} }
// Driver is the interface composed of Creator, Updator, Deletor, and Queryor // Driver is the interface composed of Creator, Updator, Deletor, and Queryor

@ -17,6 +17,7 @@ limitations under the License.
package driver package driver
import ( import (
"fmt"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -44,6 +45,14 @@ type Memory struct {
cache map[string]memReleases cache map[string]memReleases
} }
func (mem *Memory) ListWithSelector(filter func(*rspb.Release) bool, selector string) ([]*rspb.Release, error) {
return nil, fmt.Errorf("selector based release list is not supported by this storage")
}
func (mem *Memory) CanPushDownLabelSelector() bool {
return false
}
// NewMemory initializes a new memory driver. // NewMemory initializes a new memory driver.
func NewMemory() *Memory { func NewMemory() *Memory {
return &Memory{cache: map[string]memReleases{}, namespace: "default"} return &Memory{cache: map[string]memReleases{}, namespace: "default"}

@ -45,6 +45,10 @@ type Secrets struct {
Log func(string, ...interface{}) Log func(string, ...interface{})
} }
func (secrets *Secrets) CanPushDownLabelSelector() bool {
return true
}
// NewSecrets initializes a new Secrets wrapping an implementation of // NewSecrets initializes a new Secrets wrapping an implementation of
// the kubernetes SecretsInterface. // the kubernetes SecretsInterface.
func NewSecrets(impl corev1.SecretInterface) *Secrets { func NewSecrets(impl corev1.SecretInterface) *Secrets {
@ -75,13 +79,25 @@ func (secrets *Secrets) Get(key string) (*rspb.Release, error) {
return r, errors.Wrapf(err, "get: failed to decode data %q", key) return r, errors.Wrapf(err, "get: failed to decode data %q", key)
} }
func (secrets *Secrets) ListWithSelector(filter func(*rspb.Release) bool, selector string) ([]*rspb.Release, error) {
lsel := kblabels.Set{"owner": "helm"}.AsSelector()
labelSelector := lsel.String()
if selector != "" {
labelSelector += "," + selector
}
return secrets.listInternal(filter, labelSelector)
}
// List fetches all releases and returns the list releases such // List fetches all releases and returns the list releases such
// that filter(release) == true. An error is returned if the // that filter(release) == true. An error is returned if the
// secret fails to retrieve the releases. // secret fails to retrieve the releases.
func (secrets *Secrets) List(filter func(*rspb.Release) bool) ([]*rspb.Release, error) { func (secrets *Secrets) List(filter func(*rspb.Release) bool) ([]*rspb.Release, error) {
lsel := kblabels.Set{"owner": "helm"}.AsSelector() lsel := kblabels.Set{"owner": "helm"}.AsSelector()
opts := metav1.ListOptions{LabelSelector: lsel.String()} return secrets.listInternal(filter, lsel.String())
}
func (secrets *Secrets) listInternal(filter func(*rspb.Release) bool, selector string) ([]*rspb.Release, error) {
opts := metav1.ListOptions{LabelSelector: selector}
list, err := secrets.impl.List(context.Background(), opts) list, err := secrets.impl.List(context.Background(), opts)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "list: failed to list") return nil, errors.Wrap(err, "list: failed to list")
@ -208,7 +224,6 @@ func (secrets *Secrets) Delete(key string) (rls *rspb.Release, err error) {
// "status" - status of the release (see pkg/release/status.go for variants) // "status" - status of the release (see pkg/release/status.go for variants)
// "owner" - owner of the secret, currently "helm". // "owner" - owner of the secret, currently "helm".
// "name" - name of the release. // "name" - name of the release.
//
func newSecretsObject(key string, rls *rspb.Release, lbs labels) (*v1.Secret, error) { func newSecretsObject(key string, rls *rspb.Release, lbs labels) (*v1.Secret, error) {
const owner = "helm" const owner = "helm"

@ -128,6 +128,26 @@ func TestSecretList(t *testing.T) {
if len(ssd) != 2 { if len(ssd) != 2 {
t.Errorf("Expected 2 superseded, got %d", len(ssd)) t.Errorf("Expected 2 superseded, got %d", len(ssd))
} }
// list all uninstalled releases with label selectors
delSel, err := secrets.ListWithSelector(func(rel *rspb.Release) bool { return true }, "status=uninstalled")
// check
if err != nil {
t.Errorf("Failed to list uninstalled: %s", err)
}
if len(delSel) != 2 {
t.Errorf("Expected 2 uninstalled, got %d:\n%v\n", len(del), del)
}
// list all uninstalled or deployed releases with label selectors
dplOrDelSel, err := secrets.ListWithSelector(func(rel *rspb.Release) bool { return true }, "status in (uninstalled, deployed)")
// check
if err != nil {
t.Errorf("Failed to list uninstalled or deployed: %s", err)
}
if len(dplOrDelSel) != 4 {
t.Errorf("Expected 4 uninstalled or deployed, got %d:\n%v\n", len(del), del)
}
} }
func TestSecretQuery(t *testing.T) { func TestSecretQuery(t *testing.T) {

@ -77,6 +77,14 @@ type SQL struct {
Log func(string, ...interface{}) Log func(string, ...interface{})
} }
func (s *SQL) ListWithSelector(filter func(*rspb.Release) bool, selector string) ([]*rspb.Release, error) {
return nil, fmt.Errorf("selector based release list is not supported by this storage")
}
func (s *SQL) CanPushDownLabelSelector() bool {
return false
}
// Name returns the name of the driver. // Name returns the name of the driver.
func (s *SQL) Name() string { func (s *SQL) Name() string {
return SQLDriverName return SQLDriverName

@ -308,6 +308,12 @@ func (d *MaxHistoryMockDriver) Query(labels map[string]string) ([]*rspb.Release,
func (d *MaxHistoryMockDriver) Name() string { func (d *MaxHistoryMockDriver) Name() string {
return d.Driver.Name() return d.Driver.Name()
} }
func (d *MaxHistoryMockDriver) ListWithSelector(filter func(*rspb.Release) bool, selector string) ([]*rspb.Release, error) {
return d.Driver.ListWithSelector(filter, selector)
}
func (d *MaxHistoryMockDriver) CanPushDownLabelSelector() bool {
return d.Driver.CanPushDownLabelSelector()
}
func TestMaxHistoryErrorHandling(t *testing.T) { func TestMaxHistoryErrorHandling(t *testing.T) {
//func TestStorageRemoveLeastRecentWithError(t *testing.T) { //func TestStorageRemoveLeastRecentWithError(t *testing.T) {

Loading…
Cancel
Save