Store release data in custom TPR (#12)

pull/2160/head
Tamal Saha 9 years ago committed by GitHub
parent e9e6dd3927
commit d8cc1d3dc9

@ -0,0 +1,7 @@
metadata:
name: release.helm.sh
apiVersion: extensions/v1beta1
kind: ThirdPartyResource
description: "A specification of Helm release"
versions:
- name: v1beta1

@ -0,0 +1,24 @@
package install
import (
aci "k8s.io/helm/api"
"k8s.io/kubernetes/pkg/apimachinery/announced"
"k8s.io/kubernetes/pkg/util/sets"
)
func init() {
if err := announced.NewGroupMetaFactory(
&announced.GroupMetaFactoryArgs{
GroupName: aci.GroupName,
VersionPreferenceOrder: []string{aci.V1beta1SchemeGroupVersion.Version},
ImportPrefix: "k8s.io/helm/api",
RootScopedKinds: sets.NewString("ThirdPartyResource"),
AddInternalObjectsToScheme: aci.AddToScheme,
},
announced.VersionToSchemeFunc{
aci.V1beta1SchemeGroupVersion.Version: aci.V1betaAddToScheme,
},
).Announce().RegisterAndEnable(); err != nil {
panic(err)
}
}

@ -0,0 +1,43 @@
package api
import (
"k8s.io/kubernetes/pkg/api"
schema "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
)
// GroupName is the group name use in this package
const GroupName = "helm.sh"
// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: runtime.APIVersionInternal}
// Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}
// Resource takes an unqualified resource and returns back a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
var (
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme
)
// Adds the list of known types to api.Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&Release{},
&ReleaseList{},
&api.ListOptions{},
&api.DeleteOptions{},
)
return nil
}
func (obj *Release) GetObjectKind() schema.ObjectKind { return &obj.TypeMeta }
func (obj *ReleaseList) GetObjectKind() schema.ObjectKind { return &obj.TypeMeta }

@ -0,0 +1,29 @@
package api
import (
schema "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/runtime"
versionedwatch "k8s.io/kubernetes/pkg/watch/versioned"
)
// SchemeGroupVersion is group version used to register these objects
var V1beta1SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1beta1"}
var (
V1beta1SchemeBuilder = runtime.NewSchemeBuilder(v1addKnownTypes)
V1betaAddToScheme = V1beta1SchemeBuilder.AddToScheme
)
// Adds the list of known types to api.Scheme.
func v1addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(V1beta1SchemeGroupVersion,
&Release{},
&ReleaseList{},
&v1.ListOptions{},
&v1.DeleteOptions{},
)
versionedwatch.AddToGroupVersion(scheme, V1beta1SchemeGroupVersion)
return nil
}

@ -0,0 +1,68 @@
package api
import (
hapi_chart "k8s.io/helm/pkg/proto/hapi/chart"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
)
// Release captures the state of a individual release and are immutable.
// Release replaces the version wise configmaps used by Tiller 2.0
type Release struct {
unversioned.TypeMeta `json:",inline,omitempty"`
api.ObjectMeta `json:"metadata,omitempty"`
Spec ReleaseSpec `json:"spec,omitempty"`
Status ReleaseStatus `json:"status,omitempty"`
}
type ReleaseSpec struct {
// Description is human-friendly "log entry" about this release.
Description string `json:"Description,omitempty"`
// Chart is the chart that was released.
ChartMetadata *hapi_chart.Metadata `json:"chartMetadata,omitempty"`
// Config is the set of extra Values added to the chart.
// These values override the default values inside of the chart.
Config *hapi_chart.Config `json:"config,omitempty"`
// Version is an int32 which represents the version of the release.
Version int32 `json:"version,omitempty"`
// TODO(tamal): Store in proper namespace
// Namespace is the kubernetes namespace of the release.
// Namespace string `json:"namespace,omitempty"`
// The ChartSource represents the location and type of a chart to install.
// This is modelled like Volume in Pods, which allows specifying a chart
// inline (like today) or pulling a chart object from a (potentially private)
// chart registry similar to pulling a Docker image.
Data ReleaseData `json:"data,omitempty"`
}
//-------------------------------------------------------------------------------------------
// Chart represents a chart that is installed in a Release.
// The ChartSource represents the location and type of a chart to install.
// This is modelled like Volume in Pods, which allows specifying a chart
// inline (like today) or pulling a chart object from a (potentially private) chart registry similar to pulling a Docker image.
// +optional
type ReleaseData struct {
// Inline charts are what is done today with Helm cli. Release request
// contains the chart definition in the release spec, sent by Helm cli.
Inline string `json:"inline,omitempty"`
}
type ReleaseStatus struct {
Code string `json:"code,omitempty"`
// Cluster resources as kubectl would print them.
Resources string `json:"resources,omitempty"`
// Contains the rendered templates/NOTES.txt if available
Notes string `json:"notes,omitempty"`
FirstDeployed unversioned.Time `json:"first_deployed,omitempty"`
LastDeployed unversioned.Time `json:"last_deployed,omitempty"`
// Deleted tracks when this object was deleted.
Deleted unversioned.Time `json:"deleted,omitempty"`
}
type ReleaseList struct {
unversioned.TypeMeta `json:",inline"`
unversioned.ListMeta `json:"metadata,omitempty"`
Items []Release `json:"items,omitempty"`
}

@ -0,0 +1,163 @@
package clientset
import (
"encoding/json"
"io"
"net/url"
"reflect"
"strings"
"github.com/ghodss/yaml"
aci "k8s.io/helm/api"
"k8s.io/kubernetes/pkg/api"
schema "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
kubejson "k8s.io/kubernetes/pkg/runtime/serializer/json"
"log"
)
// TODO(@sadlil): Find a better way to replace ExtendedCodec to encode and decode objects.
// Follow the guide to replace it with api.Codec and api.ParameterCodecs.
var ExtendedCodec = &extendedCodec{}
// DirectCodecFactory provides methods for retrieving "DirectCodec"s, which do not do conversion.
type DirectCodecFactory struct {
*extendedCodec
}
// EncoderForVersion returns an encoder that does not do conversion. gv is ignored.
func (f DirectCodecFactory) EncoderForVersion(serializer runtime.Encoder, _ runtime.GroupVersioner) runtime.Encoder {
return serializer
}
// DecoderToVersion returns an decoder that does not do conversion. gv is ignored.
func (f DirectCodecFactory) DecoderToVersion(serializer runtime.Decoder, _ runtime.GroupVersioner) runtime.Decoder {
return serializer
}
// SupportedMediaTypes returns the RFC2046 media types that this factory has serializers for.
func (f DirectCodecFactory) SupportedMediaTypes() []runtime.SerializerInfo {
return []runtime.SerializerInfo{
{
MediaType: "application/json",
EncodesAsText: true,
Serializer: &extendedCodec{},
PrettySerializer: &extendedCodec{pretty: true},
StreamSerializer: &runtime.StreamSerializerInfo{
Framer: kubejson.Framer,
EncodesAsText: true,
Serializer: &extendedCodec{},
},
},
{
MediaType: "application/yaml",
EncodesAsText: true,
Serializer: &extendedCodec{yaml: true},
PrettySerializer: &extendedCodec{yaml: true},
},
}
}
type extendedCodec struct {
pretty bool
yaml bool
}
func (e *extendedCodec) Decode(data []byte, gvk *schema.GroupVersionKind, obj runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
if e.yaml {
altered, err := yaml.YAMLToJSON(data)
if err != nil {
return nil, nil, err
}
data = altered
}
if obj == nil {
metadata := &schema.TypeMeta{}
err := json.Unmarshal(data, metadata)
if err != nil {
return obj, gvk, err
}
log.Println("Detected metadata type for nil object, got", metadata.APIVersion, metadata.Kind)
obj, err = setDefaultType(metadata)
if err != nil {
return obj, gvk, err
}
}
err := json.Unmarshal(data, obj)
if err != nil {
return obj, gvk, err
}
return obj, gvk, nil
}
func (e *extendedCodec) Encode(obj runtime.Object, w io.Writer) error {
setDefaultVersionKind(obj)
if e.yaml {
json, err := json.Marshal(obj)
if err != nil {
return err
}
data, err := yaml.JSONToYAML(json)
if err != nil {
return err
}
_, err = w.Write(data)
}
if e.pretty {
data, err := json.MarshalIndent(obj, "", " ")
if err != nil {
return err
}
_, err = w.Write(data)
return err
}
return json.NewEncoder(w).Encode(obj)
}
// DecodeParameters converts the provided url.Values into an object of type From with the kind of into, and then
// converts that object to into (if necessary). Returns an error if the operation cannot be completed.
func (*extendedCodec) DecodeParameters(parameters url.Values, from schema.GroupVersion, into runtime.Object) error {
if len(parameters) == 0 {
return nil
}
_, okDelete := into.(*api.DeleteOptions)
if _, okList := into.(*api.ListOptions); okList || okDelete {
from = schema.GroupVersion{Version: "v1"}
}
return runtime.NewParameterCodec(api.Scheme).DecodeParameters(parameters, from, into)
}
// EncodeParameters converts the provided object into the to version, then converts that object to url.Values.
// Returns an error if conversion is not possible.
func (c *extendedCodec) EncodeParameters(obj runtime.Object, to schema.GroupVersion) (url.Values, error) {
result := url.Values{}
if obj == nil {
return result, nil
}
_, okDelete := obj.(*api.DeleteOptions)
if _, okList := obj.(*api.ListOptions); okList || okDelete {
to = schema.GroupVersion{Version: "v1"}
}
return runtime.NewParameterCodec(api.Scheme).EncodeParameters(obj, to)
}
func setDefaultVersionKind(obj runtime.Object) {
// Check the values can are In type Extended Ingress
defaultGVK := schema.GroupVersionKind{
Group: aci.V1beta1SchemeGroupVersion.Group,
Version: aci.V1beta1SchemeGroupVersion.Version,
}
fullyQualifiedKind := reflect.ValueOf(obj).Type().String()
lastIndexOfDot := strings.LastIndex(fullyQualifiedKind, ".")
if lastIndexOfDot > 0 {
defaultGVK.Kind = fullyQualifiedKind[lastIndexOfDot+1:]
}
obj.GetObjectKind().SetGroupVersionKind(defaultGVK)
}
func setDefaultType(metadata *schema.TypeMeta) (runtime.Object, error) {
return api.Scheme.New(metadata.GroupVersionKind())
}

@ -0,0 +1,99 @@
package clientset
import (
"fmt"
schema "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apimachinery/registered"
rest "k8s.io/kubernetes/pkg/client/restclient"
)
const (
defaultAPIPath = "/apis"
)
type ExtensionInterface interface {
RESTClient() rest.Interface
ReleaseNamespacer
}
// ExtensionsClient is used to interact with experimental Kubernetes features.
// Features of Extensions group are not supported and may be changed or removed in
// incompatible ways at any time.
type ExtensionsClient struct {
restClient rest.Interface
}
func (a *ExtensionsClient) Release(namespace string) ReleaseInterface {
return newRelease(a, namespace)
}
// NewExtensions creates a new ExtensionsClient for the given config. This client
// provides access to experimental Kubernetes features.
// Features of Extensions group are not supported and may be changed or removed in
// incompatible ways at any time.
func NewExtensionsForConfig(c *rest.Config) (*ExtensionsClient, error) {
config := *c
if err := setExtensionsDefaults(&config); err != nil {
return nil, err
}
client, err := rest.RESTClientFor(&config)
if err != nil {
return nil, err
}
return &ExtensionsClient{client}, nil
}
// NewExtensionsOrDie creates a new ExtensionsClient for the given config and
// panics if there is an error in the config.
// Features of Extensions group are not supported and may be changed or removed in
// incompatible ways at any time.
func NewExtensionsForConfigOrDie(c *rest.Config) *ExtensionsClient {
client, err := NewExtensionsForConfig(c)
if err != nil {
panic(err)
}
return client
}
// New creates a new ExtensionsV1beta1Client for the given RESTClient.
func NewNewExtensions(c rest.Interface) *ExtensionsClient {
return &ExtensionsClient{c}
}
func setExtensionsDefaults(config *rest.Config) error {
gv, err := schema.ParseGroupVersion("helm.sh/v1beta1")
if err != nil {
return err
}
// if helm.sh/v1beta1 is not enabled, return an error
if !registered.IsEnabledVersion(gv) {
return fmt.Errorf("helm.sh/v1beta1 is not enabled")
}
config.APIPath = defaultAPIPath
if config.UserAgent == "" {
config.UserAgent = rest.DefaultKubernetesUserAgent()
}
if config.GroupVersion == nil || config.GroupVersion.Group != "helm.sh" {
g, err := registered.Group("helm.sh")
if err != nil {
return err
}
copyGroupVersion := g.GroupVersion
config.GroupVersion = &copyGroupVersion
}
config.NegotiatedSerializer = DirectCodecFactory{extendedCodec: ExtendedCodec}
return nil
}
// RESTClient returns a RESTClient that is used to communicate
// with API server by this client implementation.
func (c *ExtensionsClient) RESTClient() rest.Interface {
if c == nil {
return nil
}
return c.restClient
}

@ -0,0 +1,3 @@
## fakes
This package is a fake kube resource implementation. Should Only be used for Testing purpose.

@ -0,0 +1,19 @@
package fake
import (
_ "github.com/appscode/log"
"k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/fake"
"k8s.io/kubernetes/pkg/runtime"
)
type ClientSets struct {
*fake.Clientset
ExtensionClient *FakeExtensionClient
}
func NewFakeClient(objects ...runtime.Object) *ClientSets {
return &ClientSets{
Clientset: fake.NewSimpleClientset(objects...),
ExtensionClient: NewFakeExtensionClient(objects...),
}
}

@ -0,0 +1,33 @@
package fake
import (
"k8s.io/helm/client/clientset"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apimachinery/registered"
testing "k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
)
type FakeExtensionClient struct {
*testing.Fake
}
func NewFakeExtensionClient(objects ...runtime.Object) *FakeExtensionClient {
o := testing.NewObjectTracker(api.Scheme, api.Codecs.UniversalDecoder())
for _, obj := range objects {
if obj.GetObjectKind().GroupVersionKind().Group == "helm.sh" {
if err := o.Add(obj); err != nil {
panic(err)
}
}
}
fakePtr := testing.Fake{}
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o, registered.RESTMapper()))
fakePtr.AddWatchReactor("*", testing.DefaultWatchReactor(watch.NewFake(), nil))
return &FakeExtensionClient{&fakePtr}
}
func (m *FakeExtensionClient) Releases(ns string) clientset.ReleaseInterface {
return &FakeRelease{Fake: m.Fake, ns: ns}
}

@ -0,0 +1,95 @@
package fake
import (
aci "k8s.io/helm/api"
"k8s.io/kubernetes/pkg/api"
schema "k8s.io/kubernetes/pkg/api/unversioned"
testing "k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/watch"
)
type FakeRelease struct {
Fake *testing.Fake
ns string
}
var certResource = schema.GroupVersionResource{Group: "helm.sh", Version: "v1beta1", Resource: "releases"}
// Get returns the Release by name.
func (mock *FakeRelease) Get(name string) (*aci.Release, error) {
obj, err := mock.Fake.
Invokes(testing.NewGetAction(certResource, mock.ns, name), &aci.Release{})
if obj == nil {
return nil, err
}
return obj.(*aci.Release), err
}
// List returns the a of Releases.
func (mock *FakeRelease) List(opts api.ListOptions) (*aci.ReleaseList, error) {
obj, err := mock.Fake.
Invokes(testing.NewListAction(certResource, mock.ns, opts), &aci.Release{})
if obj == nil {
return nil, err
}
label, _, _ := testing.ExtractFromListOptions(opts)
if label == nil {
label = labels.Everything()
}
list := &aci.ReleaseList{}
for _, item := range obj.(*aci.ReleaseList).Items {
if label.Matches(labels.Set(item.Labels)) {
list.Items = append(list.Items, item)
}
}
return list, err
}
// Create creates a new Release.
func (mock *FakeRelease) Create(svc *aci.Release) (*aci.Release, error) {
obj, err := mock.Fake.
Invokes(testing.NewCreateAction(certResource, mock.ns, svc), &aci.Release{})
if obj == nil {
return nil, err
}
return obj.(*aci.Release), err
}
// Update updates a Release.
func (mock *FakeRelease) Update(svc *aci.Release) (*aci.Release, error) {
obj, err := mock.Fake.
Invokes(testing.NewUpdateAction(certResource, mock.ns, svc), &aci.Release{})
if obj == nil {
return nil, err
}
return obj.(*aci.Release), err
}
// Delete deletes a Release by name.
func (mock *FakeRelease) Delete(name string) error {
_, err := mock.Fake.
Invokes(testing.NewDeleteAction(certResource, mock.ns, name), &aci.Release{})
return err
}
func (mock *FakeRelease) UpdateStatus(srv *aci.Release) (*aci.Release, error) {
obj, err := mock.Fake.
Invokes(testing.NewUpdateSubresourceAction(certResource, "status", mock.ns, srv), &aci.Release{})
if obj == nil {
return nil, err
}
return obj.(*aci.Release), err
}
func (mock *FakeRelease) Watch(opts api.ListOptions) (watch.Interface, error) {
return mock.Fake.
InvokesWatch(testing.NewWatchAction(certResource, mock.ns, opts))
}

@ -0,0 +1,16 @@
package clientset
// These imports are the API groups the client will support.
import (
"fmt"
_ "k8s.io/helm/api/install"
_ "k8s.io/kubernetes/pkg/api/install"
"k8s.io/kubernetes/pkg/apimachinery/registered"
)
func init() {
if missingVersions := registered.ValidateEnvRequestedVersions(); len(missingVersions) != 0 {
panic(fmt.Sprintf("KUBE_API_VERSIONS contains versions that are not installed: %q.", missingVersions))
}
}

@ -0,0 +1,107 @@
package clientset
import (
aci "k8s.io/helm/api"
"k8s.io/kubernetes/pkg/api"
rest "k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/watch"
)
type ReleaseNamespacer interface {
Release(namespace string) ReleaseInterface
}
type ReleaseInterface interface {
List(opts api.ListOptions) (*aci.ReleaseList, error)
Get(name string) (*aci.Release, error)
Create(release *aci.Release) (*aci.Release, error)
Update(release *aci.Release) (*aci.Release, error)
Delete(name string) error
Watch(opts api.ListOptions) (watch.Interface, error)
UpdateStatus(release *aci.Release) (*aci.Release, error)
}
type ReleaseImpl struct {
r rest.Interface
ns string
}
func newRelease(c *ExtensionsClient, namespace string) *ReleaseImpl {
return &ReleaseImpl{c.restClient, namespace}
}
func (c *ReleaseImpl) List(opts api.ListOptions) (result *aci.ReleaseList, err error) {
result = &aci.ReleaseList{}
err = c.r.Get().
Namespace(c.ns).
Resource("releases").
VersionedParams(&opts, ExtendedCodec).
Do().
Into(result)
return
}
func (c *ReleaseImpl) Get(name string) (result *aci.Release, err error) {
result = &aci.Release{}
err = c.r.Get().
Namespace(c.ns).
Resource("releases").
Name(name).
Do().
Into(result)
return
}
func (c *ReleaseImpl) Create(release *aci.Release) (result *aci.Release, err error) {
result = &aci.Release{}
err = c.r.Post().
Namespace(c.ns).
Resource("releases").
Body(release).
Do().
Into(result)
return
}
func (c *ReleaseImpl) Update(release *aci.Release) (result *aci.Release, err error) {
result = &aci.Release{}
err = c.r.Put().
Namespace(c.ns).
Resource("releases").
Name(release.Name).
Body(release).
Do().
Into(result)
return
}
func (c *ReleaseImpl) Delete(name string) (err error) {
return c.r.Delete().
Namespace(c.ns).
Resource("releases").
Name(name).
Do().
Error()
}
func (c *ReleaseImpl) Watch(opts api.ListOptions) (watch.Interface, error) {
return c.r.Get().
Prefix("watch").
Namespace(c.ns).
Resource("releases").
VersionedParams(&opts, ExtendedCodec).
Watch()
}
func (c *ReleaseImpl) UpdateStatus(release *aci.Release) (result *aci.Release, err error) {
result = &aci.Release{}
err = c.r.Put().
Namespace(c.ns).
Resource("releases").
Name(release.Name).
SubResource("status").
Body(release).
Do().
Into(result)
return
}

@ -27,6 +27,7 @@ import (
"github.com/spf13/cobra"
rcs "k8s.io/helm/client/clientset"
"k8s.io/helm/pkg/kube"
"k8s.io/helm/pkg/proto/hapi/services"
"k8s.io/helm/pkg/storage"
@ -39,6 +40,7 @@ import (
const (
storageMemory = "memory"
storageConfigMap = "configmap"
storageInlineTPR = "inline-tpr"
)
// rootServer is the root gRPC server.
@ -90,7 +92,13 @@ func main() {
}
func start(c *cobra.Command, args []string) {
clientset, err := kube.New(nil).ClientSet()
kc := kube.New(nil)
clientcfg, err := kc.ClientConfig()
if err != nil {
fmt.Fprintf(os.Stderr, "Cannot initialize Kubernetes connection: %s\n", err)
os.Exit(1)
}
clientset, err := kc.ClientSet()
if err != nil {
fmt.Fprintf(os.Stderr, "Cannot initialize Kubernetes connection: %s\n", err)
os.Exit(1)
@ -101,6 +109,9 @@ func start(c *cobra.Command, args []string) {
env.Releases = storage.Init(driver.NewMemory())
case storageConfigMap:
env.Releases = storage.Init(driver.NewConfigMaps(clientset.Core().ConfigMaps(namespace())))
case storageInlineTPR:
cs := rcs.NewExtensionsForConfigOrDie(clientcfg)
env.Releases = storage.Init(driver.NewReleases(cs.Release(namespace())))
}
lstn, err := net.Listen("tcp", grpcAddr)

@ -0,0 +1,287 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package driver // import "k8s.io/helm/pkg/storage/driver"
import (
"fmt"
rapi "k8s.io/helm/api"
"strconv"
"strings"
"time"
google_protobuf "github.com/golang/protobuf/ptypes/timestamp"
"k8s.io/kubernetes/pkg/api"
kberrs "k8s.io/kubernetes/pkg/api/errors"
kblabels "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/validation"
"k8s.io/helm/client/clientset"
rspb "k8s.io/helm/pkg/proto/hapi/release"
"k8s.io/kubernetes/pkg/api/unversioned"
)
var _ Driver = (*Releases)(nil)
// ReleasesDriverName is the string name of the driver.
const ReleasesDriverName = "helm.sh/Release"
// Releases is a wrapper around an implementation of a kubernetes
// ReleasesInterface.
type Releases struct {
impl clientset.ReleaseInterface
}
// NewReleases initializes a new Releases wrapping an implmenetation of
// the kubernetes ReleasesInterface.
func NewReleases(impl clientset.ReleaseInterface) *Releases {
return &Releases{impl: impl}
}
// Name returns the name of the driver.
func (releases *Releases) Name() string {
return ReleasesDriverName
}
// Get fetches the release named by key. The corresponding release is returned
// or error if not found.
func (releases *Releases) Get(key string) (*rspb.Release, error) {
// fetch the release holding the release named by key
obj, err := releases.impl.Get(key)
if err != nil {
if kberrs.IsNotFound(err) {
return nil, ErrReleaseNotFound
}
logerrf(err, "get: failed to get %q", key)
return nil, err
}
// found the release, decode the base64 data string
r, err := decodeRelease(obj.Spec.Data.Inline)
if err != nil {
logerrf(err, "get: failed to decode data %q", key)
return nil, err
}
// return the release object
return r, nil
}
// List fetches all releases and returns the list releases such
// that filter(release) == true. An error is returned if the
// release fails to retrieve the releases.
func (releases *Releases) List(filter func(*rspb.Release) bool) ([]*rspb.Release, error) {
lsel := kblabels.Set{"OWNER": "TILLER"}.AsSelector()
opts := api.ListOptions{LabelSelector: lsel}
list, err := releases.impl.List(opts)
if err != nil {
logerrf(err, "list: failed to list")
return nil, err
}
var results []*rspb.Release
// iterate over the releases object list
// and decode each release
for _, item := range list.Items {
rls, err := decodeRelease(item.Spec.Data.Inline)
if err != nil {
logerrf(err, "list: failed to decode release: %v", item)
continue
}
if filter(rls) {
results = append(results, rls)
}
}
return results, nil
}
// Query fetches all releases that match the provided map of labels.
// An error is returned if the release fails to retrieve the releases.
func (releases *Releases) Query(labels map[string]string) ([]*rspb.Release, error) {
ls := kblabels.Set{}
for k, v := range labels {
if errs := validation.IsValidLabelValue(v); len(errs) != 0 {
return nil, fmt.Errorf("invalid label value: %q: %s", v, strings.Join(errs, "; "))
}
ls[k] = v
}
opts := api.ListOptions{LabelSelector: ls.AsSelector()}
list, err := releases.impl.List(opts)
if err != nil {
logerrf(err, "query: failed to query with labels")
return nil, err
}
if len(list.Items) == 0 {
return nil, ErrReleaseNotFound
}
var results []*rspb.Release
for _, item := range list.Items {
rls, err := decodeRelease(item.Spec.Data.Inline)
if err != nil {
logerrf(err, "query: failed to decode release: %s", err)
continue
}
results = append(results, rls)
}
return results, nil
}
// Create creates a new Release holding the release. If the
// Release already exists, ErrReleaseExists is returned.
func (releases *Releases) Create(key string, rls *rspb.Release) error {
// set labels for releases object meta data
var lbs labels
lbs.init()
lbs.set("CREATED_AT", strconv.Itoa(int(time.Now().Unix())))
// create a new release to hold the release
obj, err := newReleasesObject(key, rls, lbs)
if err != nil {
logerrf(err, "create: failed to encode release %q", rls.Name)
return err
}
// push the release object out into the kubiverse
if _, err := releases.impl.Create(obj); err != nil {
if kberrs.IsAlreadyExists(err) {
return ErrReleaseExists
}
logerrf(err, "create: failed to create")
return err
}
return nil
}
// Update updates the Release holding the release. If not found
// the Release is created to hold the release.
func (releases *Releases) Update(key string, rls *rspb.Release) error {
// set labels for releases object meta data
var lbs labels
lbs.init()
lbs.set("MODIFIED_AT", strconv.Itoa(int(time.Now().Unix())))
// create a new release object to hold the release
obj, err := newReleasesObject(key, rls, lbs)
if err != nil {
logerrf(err, "update: failed to encode release %q", rls.Name)
return err
}
// push the release object out into the kubiverse
_, err = releases.impl.Update(obj)
if err != nil {
logerrf(err, "update: failed to update")
return err
}
return nil
}
// Delete deletes the Release holding the release named by key.
func (releases *Releases) Delete(key string) (rls *rspb.Release, err error) {
// fetch the release to check existence
if rls, err = releases.Get(key); err != nil {
if kberrs.IsNotFound(err) {
return nil, ErrReleaseNotFound
}
logerrf(err, "delete: failed to get release %q", key)
return nil, err
}
// delete the release
if err = releases.impl.Delete(key); err != nil {
return rls, err
}
return rls, nil
}
// newReleasesObject constructs a kubernetes Release object
// to store a release. Each release data entry is the base64
// encoded string of a release's binary protobuf encoding.
//
// The following labels are used within each release:
//
// "MODIFIED_AT" - timestamp indicating when this release was last modified. (set in Update)
// "CREATED_AT" - timestamp indicating when this release was created. (set in Create)
// "VERSION" - version of the release.
// "STATUS" - status of the release (see proto/hapi/release.status.pb.go for variants)
// "OWNER" - owner of the release, currently "TILLER".
// "NAME" - name of the release.
//
func newReleasesObject(key string, rls *rspb.Release, lbs labels) (*rapi.Release, error) {
const owner = "TILLER"
// encode the release
s, err := encodeRelease(rls)
if err != nil {
return nil, err
}
if lbs == nil {
lbs.init()
}
// apply labels
lbs.set("NAME", rls.Name)
lbs.set("OWNER", owner)
lbs.set("STATUS", rspb.Status_Code_name[int32(rls.Info.Status.Code)])
lbs.set("VERSION", strconv.Itoa(int(rls.Version)))
// create and return release object
r := &rapi.Release{
ObjectMeta: api.ObjectMeta{
Name: key,
Labels: lbs.toMap(),
},
Spec: rapi.ReleaseSpec{
Config: rls.Config,
Version: rls.Version,
Data: rapi.ReleaseData{
Inline: s,
},
},
Status: rapi.ReleaseStatus{
FirstDeployed: toKubeTime(rls.Info.FirstDeployed),
LastDeployed: toKubeTime(rls.Info.LastDeployed),
Deleted: toKubeTime(rls.Info.Deleted),
},
}
if rls.Info != nil {
r.Spec.Description = rls.Info.Description
if rls.Info.Status != nil {
r.Status.Code = rls.Info.Status.Code.String()
r.Status.Resources = rls.Info.Status.Resources
r.Status.Notes = rls.Info.Status.Notes
}
}
if rls.Chart != nil {
r.Spec.ChartMetadata = rls.Chart.Metadata
}
return r, nil
}
func toKubeTime(pbt *google_protobuf.Timestamp) unversioned.Time {
var t unversioned.Time
if pbt != nil {
t = unversioned.NewTime(time.Unix(pbt.Seconds, int64(pbt.Nanos)))
}
return t
}

@ -0,0 +1,208 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package driver
import (
"encoding/base64"
"reflect"
"testing"
"github.com/gogo/protobuf/proto"
_ "k8s.io/helm/api/install"
"k8s.io/helm/client/clientset/fake"
rspb "k8s.io/helm/pkg/proto/hapi/release"
"k8s.io/kubernetes/pkg/runtime"
)
// newTestFixture initializes a FakeReleaseInterface.
// ConfigMaps are created for each release provided.
func newTestFixtureReleases(t *testing.T, releases ...*rspb.Release) *Releases {
return NewReleases(fake.NewFakeExtensionClient(initFakeTPRs(t, releases...)...).Releases("default"))
}
// initFakeTPRs initializes the FakeReleaseInterface with the set of releases.
func initFakeTPRs(t *testing.T, releases ...*rspb.Release) []runtime.Object {
var objects []runtime.Object
for _, rls := range releases {
objkey := testKey(rls.Name, rls.Version)
r, err := newReleasesObject(objkey, rls, nil)
if err != nil {
t.Fatalf("Failed to create configmap: %s", err)
}
var obj runtime.Object = r
objects = append(objects, obj)
}
return objects
}
func TestReleaseName(t *testing.T) {
c := newTestFixtureReleases(t)
if c.Name() != ReleasesDriverName {
t.Errorf("Expected name to be %q, got %q", ReleasesDriverName, c.Name())
}
}
func TestReleaseGet(t *testing.T) {
vers := int32(1)
name := "smug-pigeon"
namespace := "default"
key := testKey(name, vers)
rel := releaseStub(name, vers, namespace, rspb.Status_DEPLOYED)
releases := newTestFixtureReleases(t, []*rspb.Release{rel}...)
// get release with key
got, err := releases.Get(key)
if err != nil {
t.Fatalf("Failed to get release: %s", err)
}
// compare fetched release with original
if !reflect.DeepEqual(rel, got) {
t.Errorf("Expected {%q}, got {%q}", rel, got)
}
}
func TestUNcompressedReleaseGet(t *testing.T) {
vers := int32(1)
name := "smug-pigeon"
namespace := "default"
key := testKey(name, vers)
rel := releaseStub(name, vers, namespace, rspb.Status_DEPLOYED)
// Create a test fixture which contains an uncompressed release
cfgmap, err := newReleasesObject(key, rel, nil)
if err != nil {
t.Fatalf("Failed to create configmap: %s", err)
}
b, err := proto.Marshal(rel)
if err != nil {
t.Fatalf("Failed to marshal release: %s", err)
}
cfgmap.Spec.Data.Inline = base64.StdEncoding.EncodeToString(b)
releases := NewReleases(fake.NewFakeExtensionClient(initFakeTPRs(t, rel)...).Releases("test"))
// get release with key
got, err := releases.Get(key)
if err != nil {
t.Fatalf("Failed to get release: %s", err)
}
// compare fetched release with original
if !reflect.DeepEqual(rel, got) {
t.Errorf("Expected {%q}, got {%q}", rel, got)
}
}
func TestReleaseList(t *testing.T) {
releases := newTestFixtureReleases(t, []*rspb.Release{
releaseStub("key-1", 1, "default", rspb.Status_DELETED),
releaseStub("key-2", 1, "default", rspb.Status_DELETED),
releaseStub("key-3", 1, "default", rspb.Status_DEPLOYED),
releaseStub("key-4", 1, "default", rspb.Status_DEPLOYED),
releaseStub("key-5", 1, "default", rspb.Status_SUPERSEDED),
releaseStub("key-6", 1, "default", rspb.Status_SUPERSEDED),
}...)
// list all deleted releases
del, err := releases.List(func(rel *rspb.Release) bool {
return rel.Info.Status.Code == rspb.Status_DELETED
})
// check
if err != nil {
t.Errorf("Failed to list deleted: %s", err)
}
if len(del) != 2 {
t.Errorf("Expected 2 deleted, got %d:\n%v\n", len(del), del)
}
// list all deployed releases
dpl, err := releases.List(func(rel *rspb.Release) bool {
return rel.Info.Status.Code == rspb.Status_DEPLOYED
})
// check
if err != nil {
t.Errorf("Failed to list deployed: %s", err)
}
if len(dpl) != 2 {
t.Errorf("Expected 2 deployed, got %d", len(dpl))
}
// list all superseded releases
ssd, err := releases.List(func(rel *rspb.Release) bool {
return rel.Info.Status.Code == rspb.Status_SUPERSEDED
})
// check
if err != nil {
t.Errorf("Failed to list superseded: %s", err)
}
if len(ssd) != 2 {
t.Errorf("Expected 2 superseded, got %d", len(ssd))
}
}
func TestReleaseCreate(t *testing.T) {
releases := newTestFixtureReleases(t)
vers := int32(1)
name := "smug-pigeon"
namespace := "default"
key := testKey(name, vers)
rel := releaseStub(name, vers, namespace, rspb.Status_DEPLOYED)
// store the release in a configmap
if err := releases.Create(key, rel); err != nil {
t.Fatalf("Failed to create release with key %q: %s", key, err)
}
// get the release back
got, err := releases.Get(key)
if err != nil {
t.Fatalf("Failed to get release with key %q: %s", key, err)
}
// compare created release with original
if !reflect.DeepEqual(rel, got) {
t.Errorf("Expected {%q}, got {%q}", rel, got)
}
}
func TestReleaseUpdate(t *testing.T) {
vers := int32(1)
name := "smug-pigeon"
namespace := "default"
key := testKey(name, vers)
rel := releaseStub(name, vers, namespace, rspb.Status_DEPLOYED)
releases := newTestFixtureReleases(t, []*rspb.Release{rel}...)
// modify release status code
rel.Info.Status.Code = rspb.Status_SUPERSEDED
// perform the update
if err := releases.Update(key, rel); err != nil {
t.Fatalf("Failed to update release: %s", err)
}
// fetch the updated release
got, err := releases.Get(key)
if err != nil {
t.Fatalf("Failed to get release with key %q: %s", key, err)
}
// check release has actually been updated by comparing modified fields
if rel.Info.Status.Code != got.Info.Status.Code {
t.Errorf("Expected status %s, got status %s", rel.Info.Status.Code, got.Info.Status.Code)
}
}
Loading…
Cancel
Save