From d8cc1d3dc93b242e040662cf3ff0bb4624d22130 Mon Sep 17 00:00:00 2001 From: Tamal Saha Date: Tue, 21 Mar 2017 11:23:39 -0700 Subject: [PATCH] Store release data in custom TPR (#12) --- api/extensions/helm.yaml | 7 + api/install/install.go | 24 +++ api/register.go | 43 +++++ api/register_v1beta1.go | 29 +++ api/types.go | 68 +++++++ client/clientset/codec.go | 163 ++++++++++++++++ client/clientset/extensions.go | 99 ++++++++++ client/clientset/fake/README.md | 3 + client/clientset/fake/client.go | 19 ++ client/clientset/fake/extensions.go | 33 ++++ client/clientset/fake/release.go | 95 +++++++++ client/clientset/imports.go | 16 ++ client/clientset/release.go | 107 +++++++++++ cmd/tiller/tiller.go | 13 +- pkg/storage/driver/releases.go | 287 ++++++++++++++++++++++++++++ pkg/storage/driver/releases_test.go | 208 ++++++++++++++++++++ 16 files changed, 1213 insertions(+), 1 deletion(-) create mode 100644 api/extensions/helm.yaml create mode 100644 api/install/install.go create mode 100644 api/register.go create mode 100644 api/register_v1beta1.go create mode 100644 api/types.go create mode 100644 client/clientset/codec.go create mode 100644 client/clientset/extensions.go create mode 100644 client/clientset/fake/README.md create mode 100644 client/clientset/fake/client.go create mode 100644 client/clientset/fake/extensions.go create mode 100644 client/clientset/fake/release.go create mode 100644 client/clientset/imports.go create mode 100644 client/clientset/release.go create mode 100644 pkg/storage/driver/releases.go create mode 100644 pkg/storage/driver/releases_test.go diff --git a/api/extensions/helm.yaml b/api/extensions/helm.yaml new file mode 100644 index 000000000..1479a2295 --- /dev/null +++ b/api/extensions/helm.yaml @@ -0,0 +1,7 @@ +metadata: + name: release.helm.sh +apiVersion: extensions/v1beta1 +kind: ThirdPartyResource +description: "A specification of Helm release" +versions: + - name: v1beta1 diff --git a/api/install/install.go b/api/install/install.go new file mode 100644 index 000000000..e083d0c82 --- /dev/null +++ b/api/install/install.go @@ -0,0 +1,24 @@ +package install + +import ( + aci "k8s.io/helm/api" + "k8s.io/kubernetes/pkg/apimachinery/announced" + "k8s.io/kubernetes/pkg/util/sets" +) + +func init() { + if err := announced.NewGroupMetaFactory( + &announced.GroupMetaFactoryArgs{ + GroupName: aci.GroupName, + VersionPreferenceOrder: []string{aci.V1beta1SchemeGroupVersion.Version}, + ImportPrefix: "k8s.io/helm/api", + RootScopedKinds: sets.NewString("ThirdPartyResource"), + AddInternalObjectsToScheme: aci.AddToScheme, + }, + announced.VersionToSchemeFunc{ + aci.V1beta1SchemeGroupVersion.Version: aci.V1betaAddToScheme, + }, + ).Announce().RegisterAndEnable(); err != nil { + panic(err) + } +} diff --git a/api/register.go b/api/register.go new file mode 100644 index 000000000..39d21faf2 --- /dev/null +++ b/api/register.go @@ -0,0 +1,43 @@ +package api + +import ( + "k8s.io/kubernetes/pkg/api" + schema "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/runtime" +) + +// GroupName is the group name use in this package +const GroupName = "helm.sh" + +// SchemeGroupVersion is group version used to register these objects +var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: runtime.APIVersionInternal} + +// Kind takes an unqualified kind and returns back a Group qualified GroupKind +func Kind(kind string) schema.GroupKind { + return SchemeGroupVersion.WithKind(kind).GroupKind() +} + +// Resource takes an unqualified resource and returns back a Group qualified GroupResource +func Resource(resource string) schema.GroupResource { + return SchemeGroupVersion.WithResource(resource).GroupResource() +} + +var ( + SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) + AddToScheme = SchemeBuilder.AddToScheme +) + +// Adds the list of known types to api.Scheme. +func addKnownTypes(scheme *runtime.Scheme) error { + scheme.AddKnownTypes(SchemeGroupVersion, + &Release{}, + &ReleaseList{}, + + &api.ListOptions{}, + &api.DeleteOptions{}, + ) + return nil +} + +func (obj *Release) GetObjectKind() schema.ObjectKind { return &obj.TypeMeta } +func (obj *ReleaseList) GetObjectKind() schema.ObjectKind { return &obj.TypeMeta } diff --git a/api/register_v1beta1.go b/api/register_v1beta1.go new file mode 100644 index 000000000..1f5c98929 --- /dev/null +++ b/api/register_v1beta1.go @@ -0,0 +1,29 @@ +package api + +import ( + schema "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/runtime" + versionedwatch "k8s.io/kubernetes/pkg/watch/versioned" +) + +// SchemeGroupVersion is group version used to register these objects +var V1beta1SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1beta1"} + +var ( + V1beta1SchemeBuilder = runtime.NewSchemeBuilder(v1addKnownTypes) + V1betaAddToScheme = V1beta1SchemeBuilder.AddToScheme +) + +// Adds the list of known types to api.Scheme. +func v1addKnownTypes(scheme *runtime.Scheme) error { + scheme.AddKnownTypes(V1beta1SchemeGroupVersion, + &Release{}, + &ReleaseList{}, + + &v1.ListOptions{}, + &v1.DeleteOptions{}, + ) + versionedwatch.AddToGroupVersion(scheme, V1beta1SchemeGroupVersion) + return nil +} diff --git a/api/types.go b/api/types.go new file mode 100644 index 000000000..a22b56db6 --- /dev/null +++ b/api/types.go @@ -0,0 +1,68 @@ +package api + +import ( + hapi_chart "k8s.io/helm/pkg/proto/hapi/chart" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" +) + +// Release captures the state of a individual release and are immutable. +// Release replaces the version wise configmaps used by Tiller 2.0 +type Release struct { + unversioned.TypeMeta `json:",inline,omitempty"` + api.ObjectMeta `json:"metadata,omitempty"` + Spec ReleaseSpec `json:"spec,omitempty"` + Status ReleaseStatus `json:"status,omitempty"` +} + +type ReleaseSpec struct { + // Description is human-friendly "log entry" about this release. + Description string `json:"Description,omitempty"` + // Chart is the chart that was released. + ChartMetadata *hapi_chart.Metadata `json:"chartMetadata,omitempty"` + // Config is the set of extra Values added to the chart. + // These values override the default values inside of the chart. + Config *hapi_chart.Config `json:"config,omitempty"` + // Version is an int32 which represents the version of the release. + Version int32 `json:"version,omitempty"` + + // TODO(tamal): Store in proper namespace + // Namespace is the kubernetes namespace of the release. + // Namespace string `json:"namespace,omitempty"` + + // The ChartSource represents the location and type of a chart to install. + // This is modelled like Volume in Pods, which allows specifying a chart + // inline (like today) or pulling a chart object from a (potentially private) + // chart registry similar to pulling a Docker image. + Data ReleaseData `json:"data,omitempty"` +} + +//------------------------------------------------------------------------------------------- +// Chart represents a chart that is installed in a Release. +// The ChartSource represents the location and type of a chart to install. +// This is modelled like Volume in Pods, which allows specifying a chart +// inline (like today) or pulling a chart object from a (potentially private) chart registry similar to pulling a Docker image. +// +optional +type ReleaseData struct { + // Inline charts are what is done today with Helm cli. Release request + // contains the chart definition in the release spec, sent by Helm cli. + Inline string `json:"inline,omitempty"` +} + +type ReleaseStatus struct { + Code string `json:"code,omitempty"` + // Cluster resources as kubectl would print them. + Resources string `json:"resources,omitempty"` + // Contains the rendered templates/NOTES.txt if available + Notes string `json:"notes,omitempty"` + FirstDeployed unversioned.Time `json:"first_deployed,omitempty"` + LastDeployed unversioned.Time `json:"last_deployed,omitempty"` + // Deleted tracks when this object was deleted. + Deleted unversioned.Time `json:"deleted,omitempty"` +} + +type ReleaseList struct { + unversioned.TypeMeta `json:",inline"` + unversioned.ListMeta `json:"metadata,omitempty"` + Items []Release `json:"items,omitempty"` +} diff --git a/client/clientset/codec.go b/client/clientset/codec.go new file mode 100644 index 000000000..989c544a4 --- /dev/null +++ b/client/clientset/codec.go @@ -0,0 +1,163 @@ +package clientset + +import ( + "encoding/json" + "io" + "net/url" + "reflect" + "strings" + + "github.com/ghodss/yaml" + aci "k8s.io/helm/api" + "k8s.io/kubernetes/pkg/api" + schema "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/runtime" + kubejson "k8s.io/kubernetes/pkg/runtime/serializer/json" + "log" +) + +// TODO(@sadlil): Find a better way to replace ExtendedCodec to encode and decode objects. +// Follow the guide to replace it with api.Codec and api.ParameterCodecs. +var ExtendedCodec = &extendedCodec{} + +// DirectCodecFactory provides methods for retrieving "DirectCodec"s, which do not do conversion. +type DirectCodecFactory struct { + *extendedCodec +} + +// EncoderForVersion returns an encoder that does not do conversion. gv is ignored. +func (f DirectCodecFactory) EncoderForVersion(serializer runtime.Encoder, _ runtime.GroupVersioner) runtime.Encoder { + return serializer +} + +// DecoderToVersion returns an decoder that does not do conversion. gv is ignored. +func (f DirectCodecFactory) DecoderToVersion(serializer runtime.Decoder, _ runtime.GroupVersioner) runtime.Decoder { + return serializer +} + +// SupportedMediaTypes returns the RFC2046 media types that this factory has serializers for. +func (f DirectCodecFactory) SupportedMediaTypes() []runtime.SerializerInfo { + return []runtime.SerializerInfo{ + { + MediaType: "application/json", + EncodesAsText: true, + Serializer: &extendedCodec{}, + PrettySerializer: &extendedCodec{pretty: true}, + StreamSerializer: &runtime.StreamSerializerInfo{ + Framer: kubejson.Framer, + EncodesAsText: true, + Serializer: &extendedCodec{}, + }, + }, + { + MediaType: "application/yaml", + EncodesAsText: true, + Serializer: &extendedCodec{yaml: true}, + PrettySerializer: &extendedCodec{yaml: true}, + }, + } +} + +type extendedCodec struct { + pretty bool + yaml bool +} + +func (e *extendedCodec) Decode(data []byte, gvk *schema.GroupVersionKind, obj runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) { + if e.yaml { + altered, err := yaml.YAMLToJSON(data) + if err != nil { + return nil, nil, err + } + data = altered + } + if obj == nil { + metadata := &schema.TypeMeta{} + err := json.Unmarshal(data, metadata) + if err != nil { + return obj, gvk, err + } + log.Println("Detected metadata type for nil object, got", metadata.APIVersion, metadata.Kind) + obj, err = setDefaultType(metadata) + if err != nil { + return obj, gvk, err + } + } + err := json.Unmarshal(data, obj) + if err != nil { + return obj, gvk, err + } + return obj, gvk, nil +} + +func (e *extendedCodec) Encode(obj runtime.Object, w io.Writer) error { + setDefaultVersionKind(obj) + if e.yaml { + json, err := json.Marshal(obj) + if err != nil { + return err + } + data, err := yaml.JSONToYAML(json) + if err != nil { + return err + } + _, err = w.Write(data) + } + + if e.pretty { + data, err := json.MarshalIndent(obj, "", " ") + if err != nil { + return err + } + _, err = w.Write(data) + return err + } + return json.NewEncoder(w).Encode(obj) +} + +// DecodeParameters converts the provided url.Values into an object of type From with the kind of into, and then +// converts that object to into (if necessary). Returns an error if the operation cannot be completed. +func (*extendedCodec) DecodeParameters(parameters url.Values, from schema.GroupVersion, into runtime.Object) error { + if len(parameters) == 0 { + return nil + } + _, okDelete := into.(*api.DeleteOptions) + if _, okList := into.(*api.ListOptions); okList || okDelete { + from = schema.GroupVersion{Version: "v1"} + } + return runtime.NewParameterCodec(api.Scheme).DecodeParameters(parameters, from, into) +} + +// EncodeParameters converts the provided object into the to version, then converts that object to url.Values. +// Returns an error if conversion is not possible. +func (c *extendedCodec) EncodeParameters(obj runtime.Object, to schema.GroupVersion) (url.Values, error) { + result := url.Values{} + if obj == nil { + return result, nil + } + _, okDelete := obj.(*api.DeleteOptions) + if _, okList := obj.(*api.ListOptions); okList || okDelete { + to = schema.GroupVersion{Version: "v1"} + } + return runtime.NewParameterCodec(api.Scheme).EncodeParameters(obj, to) +} + +func setDefaultVersionKind(obj runtime.Object) { + // Check the values can are In type Extended Ingress + defaultGVK := schema.GroupVersionKind{ + Group: aci.V1beta1SchemeGroupVersion.Group, + Version: aci.V1beta1SchemeGroupVersion.Version, + } + + fullyQualifiedKind := reflect.ValueOf(obj).Type().String() + lastIndexOfDot := strings.LastIndex(fullyQualifiedKind, ".") + if lastIndexOfDot > 0 { + defaultGVK.Kind = fullyQualifiedKind[lastIndexOfDot+1:] + } + + obj.GetObjectKind().SetGroupVersionKind(defaultGVK) +} + +func setDefaultType(metadata *schema.TypeMeta) (runtime.Object, error) { + return api.Scheme.New(metadata.GroupVersionKind()) +} diff --git a/client/clientset/extensions.go b/client/clientset/extensions.go new file mode 100644 index 000000000..9e76ebdf9 --- /dev/null +++ b/client/clientset/extensions.go @@ -0,0 +1,99 @@ +package clientset + +import ( + "fmt" + + schema "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apimachinery/registered" + rest "k8s.io/kubernetes/pkg/client/restclient" +) + +const ( + defaultAPIPath = "/apis" +) + +type ExtensionInterface interface { + RESTClient() rest.Interface + ReleaseNamespacer +} + +// ExtensionsClient is used to interact with experimental Kubernetes features. +// Features of Extensions group are not supported and may be changed or removed in +// incompatible ways at any time. +type ExtensionsClient struct { + restClient rest.Interface +} + +func (a *ExtensionsClient) Release(namespace string) ReleaseInterface { + return newRelease(a, namespace) +} + +// NewExtensions creates a new ExtensionsClient for the given config. This client +// provides access to experimental Kubernetes features. +// Features of Extensions group are not supported and may be changed or removed in +// incompatible ways at any time. +func NewExtensionsForConfig(c *rest.Config) (*ExtensionsClient, error) { + config := *c + if err := setExtensionsDefaults(&config); err != nil { + return nil, err + } + client, err := rest.RESTClientFor(&config) + if err != nil { + return nil, err + } + return &ExtensionsClient{client}, nil +} + +// NewExtensionsOrDie creates a new ExtensionsClient for the given config and +// panics if there is an error in the config. +// Features of Extensions group are not supported and may be changed or removed in +// incompatible ways at any time. +func NewExtensionsForConfigOrDie(c *rest.Config) *ExtensionsClient { + client, err := NewExtensionsForConfig(c) + if err != nil { + panic(err) + } + return client +} + +// New creates a new ExtensionsV1beta1Client for the given RESTClient. +func NewNewExtensions(c rest.Interface) *ExtensionsClient { + return &ExtensionsClient{c} +} + +func setExtensionsDefaults(config *rest.Config) error { + gv, err := schema.ParseGroupVersion("helm.sh/v1beta1") + if err != nil { + return err + } + // if helm.sh/v1beta1 is not enabled, return an error + if !registered.IsEnabledVersion(gv) { + return fmt.Errorf("helm.sh/v1beta1 is not enabled") + } + config.APIPath = defaultAPIPath + if config.UserAgent == "" { + config.UserAgent = rest.DefaultKubernetesUserAgent() + } + + if config.GroupVersion == nil || config.GroupVersion.Group != "helm.sh" { + g, err := registered.Group("helm.sh") + if err != nil { + return err + } + copyGroupVersion := g.GroupVersion + config.GroupVersion = ©GroupVersion + } + + config.NegotiatedSerializer = DirectCodecFactory{extendedCodec: ExtendedCodec} + + return nil +} + +// RESTClient returns a RESTClient that is used to communicate +// with API server by this client implementation. +func (c *ExtensionsClient) RESTClient() rest.Interface { + if c == nil { + return nil + } + return c.restClient +} diff --git a/client/clientset/fake/README.md b/client/clientset/fake/README.md new file mode 100644 index 000000000..206ab4cc8 --- /dev/null +++ b/client/clientset/fake/README.md @@ -0,0 +1,3 @@ +## fakes + +This package is a fake kube resource implementation. Should Only be used for Testing purpose. \ No newline at end of file diff --git a/client/clientset/fake/client.go b/client/clientset/fake/client.go new file mode 100644 index 000000000..831f68632 --- /dev/null +++ b/client/clientset/fake/client.go @@ -0,0 +1,19 @@ +package fake + +import ( + _ "github.com/appscode/log" + "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/fake" + "k8s.io/kubernetes/pkg/runtime" +) + +type ClientSets struct { + *fake.Clientset + ExtensionClient *FakeExtensionClient +} + +func NewFakeClient(objects ...runtime.Object) *ClientSets { + return &ClientSets{ + Clientset: fake.NewSimpleClientset(objects...), + ExtensionClient: NewFakeExtensionClient(objects...), + } +} diff --git a/client/clientset/fake/extensions.go b/client/clientset/fake/extensions.go new file mode 100644 index 000000000..209162f93 --- /dev/null +++ b/client/clientset/fake/extensions.go @@ -0,0 +1,33 @@ +package fake + +import ( + "k8s.io/helm/client/clientset" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apimachinery/registered" + testing "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" +) + +type FakeExtensionClient struct { + *testing.Fake +} + +func NewFakeExtensionClient(objects ...runtime.Object) *FakeExtensionClient { + o := testing.NewObjectTracker(api.Scheme, api.Codecs.UniversalDecoder()) + for _, obj := range objects { + if obj.GetObjectKind().GroupVersionKind().Group == "helm.sh" { + if err := o.Add(obj); err != nil { + panic(err) + } + } + } + fakePtr := testing.Fake{} + fakePtr.AddReactor("*", "*", testing.ObjectReaction(o, registered.RESTMapper())) + fakePtr.AddWatchReactor("*", testing.DefaultWatchReactor(watch.NewFake(), nil)) + return &FakeExtensionClient{&fakePtr} +} + +func (m *FakeExtensionClient) Releases(ns string) clientset.ReleaseInterface { + return &FakeRelease{Fake: m.Fake, ns: ns} +} diff --git a/client/clientset/fake/release.go b/client/clientset/fake/release.go new file mode 100644 index 000000000..c89e8eb1d --- /dev/null +++ b/client/clientset/fake/release.go @@ -0,0 +1,95 @@ +package fake + +import ( + aci "k8s.io/helm/api" + "k8s.io/kubernetes/pkg/api" + schema "k8s.io/kubernetes/pkg/api/unversioned" + testing "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/watch" +) + +type FakeRelease struct { + Fake *testing.Fake + ns string +} + +var certResource = schema.GroupVersionResource{Group: "helm.sh", Version: "v1beta1", Resource: "releases"} + +// Get returns the Release by name. +func (mock *FakeRelease) Get(name string) (*aci.Release, error) { + obj, err := mock.Fake. + Invokes(testing.NewGetAction(certResource, mock.ns, name), &aci.Release{}) + + if obj == nil { + return nil, err + } + return obj.(*aci.Release), err +} + +// List returns the a of Releases. +func (mock *FakeRelease) List(opts api.ListOptions) (*aci.ReleaseList, error) { + obj, err := mock.Fake. + Invokes(testing.NewListAction(certResource, mock.ns, opts), &aci.Release{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &aci.ReleaseList{} + for _, item := range obj.(*aci.ReleaseList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Create creates a new Release. +func (mock *FakeRelease) Create(svc *aci.Release) (*aci.Release, error) { + obj, err := mock.Fake. + Invokes(testing.NewCreateAction(certResource, mock.ns, svc), &aci.Release{}) + + if obj == nil { + return nil, err + } + return obj.(*aci.Release), err +} + +// Update updates a Release. +func (mock *FakeRelease) Update(svc *aci.Release) (*aci.Release, error) { + obj, err := mock.Fake. + Invokes(testing.NewUpdateAction(certResource, mock.ns, svc), &aci.Release{}) + + if obj == nil { + return nil, err + } + return obj.(*aci.Release), err +} + +// Delete deletes a Release by name. +func (mock *FakeRelease) Delete(name string) error { + _, err := mock.Fake. + Invokes(testing.NewDeleteAction(certResource, mock.ns, name), &aci.Release{}) + + return err +} + +func (mock *FakeRelease) UpdateStatus(srv *aci.Release) (*aci.Release, error) { + obj, err := mock.Fake. + Invokes(testing.NewUpdateSubresourceAction(certResource, "status", mock.ns, srv), &aci.Release{}) + + if obj == nil { + return nil, err + } + return obj.(*aci.Release), err +} + +func (mock *FakeRelease) Watch(opts api.ListOptions) (watch.Interface, error) { + return mock.Fake. + InvokesWatch(testing.NewWatchAction(certResource, mock.ns, opts)) +} diff --git a/client/clientset/imports.go b/client/clientset/imports.go new file mode 100644 index 000000000..b53ab418b --- /dev/null +++ b/client/clientset/imports.go @@ -0,0 +1,16 @@ +package clientset + +// These imports are the API groups the client will support. +import ( + "fmt" + + _ "k8s.io/helm/api/install" + _ "k8s.io/kubernetes/pkg/api/install" + "k8s.io/kubernetes/pkg/apimachinery/registered" +) + +func init() { + if missingVersions := registered.ValidateEnvRequestedVersions(); len(missingVersions) != 0 { + panic(fmt.Sprintf("KUBE_API_VERSIONS contains versions that are not installed: %q.", missingVersions)) + } +} diff --git a/client/clientset/release.go b/client/clientset/release.go new file mode 100644 index 000000000..077e215f8 --- /dev/null +++ b/client/clientset/release.go @@ -0,0 +1,107 @@ +package clientset + +import ( + aci "k8s.io/helm/api" + "k8s.io/kubernetes/pkg/api" + rest "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/watch" +) + +type ReleaseNamespacer interface { + Release(namespace string) ReleaseInterface +} + +type ReleaseInterface interface { + List(opts api.ListOptions) (*aci.ReleaseList, error) + Get(name string) (*aci.Release, error) + Create(release *aci.Release) (*aci.Release, error) + Update(release *aci.Release) (*aci.Release, error) + Delete(name string) error + Watch(opts api.ListOptions) (watch.Interface, error) + UpdateStatus(release *aci.Release) (*aci.Release, error) +} + +type ReleaseImpl struct { + r rest.Interface + ns string +} + +func newRelease(c *ExtensionsClient, namespace string) *ReleaseImpl { + return &ReleaseImpl{c.restClient, namespace} +} + +func (c *ReleaseImpl) List(opts api.ListOptions) (result *aci.ReleaseList, err error) { + result = &aci.ReleaseList{} + err = c.r.Get(). + Namespace(c.ns). + Resource("releases"). + VersionedParams(&opts, ExtendedCodec). + Do(). + Into(result) + return +} + +func (c *ReleaseImpl) Get(name string) (result *aci.Release, err error) { + result = &aci.Release{} + err = c.r.Get(). + Namespace(c.ns). + Resource("releases"). + Name(name). + Do(). + Into(result) + return +} + +func (c *ReleaseImpl) Create(release *aci.Release) (result *aci.Release, err error) { + result = &aci.Release{} + err = c.r.Post(). + Namespace(c.ns). + Resource("releases"). + Body(release). + Do(). + Into(result) + return +} + +func (c *ReleaseImpl) Update(release *aci.Release) (result *aci.Release, err error) { + result = &aci.Release{} + err = c.r.Put(). + Namespace(c.ns). + Resource("releases"). + Name(release.Name). + Body(release). + Do(). + Into(result) + return +} + +func (c *ReleaseImpl) Delete(name string) (err error) { + return c.r.Delete(). + Namespace(c.ns). + Resource("releases"). + Name(name). + Do(). + Error() +} + +func (c *ReleaseImpl) Watch(opts api.ListOptions) (watch.Interface, error) { + return c.r.Get(). + Prefix("watch"). + Namespace(c.ns). + Resource("releases"). + VersionedParams(&opts, ExtendedCodec). + Watch() +} + +func (c *ReleaseImpl) UpdateStatus(release *aci.Release) (result *aci.Release, err error) { + result = &aci.Release{} + err = c.r.Put(). + Namespace(c.ns). + Resource("releases"). + Name(release.Name). + SubResource("status"). + Body(release). + Do(). + Into(result) + return +} diff --git a/cmd/tiller/tiller.go b/cmd/tiller/tiller.go index 72388d307..8ab31f7c6 100644 --- a/cmd/tiller/tiller.go +++ b/cmd/tiller/tiller.go @@ -27,6 +27,7 @@ import ( "github.com/spf13/cobra" + rcs "k8s.io/helm/client/clientset" "k8s.io/helm/pkg/kube" "k8s.io/helm/pkg/proto/hapi/services" "k8s.io/helm/pkg/storage" @@ -39,6 +40,7 @@ import ( const ( storageMemory = "memory" storageConfigMap = "configmap" + storageInlineTPR = "inline-tpr" ) // rootServer is the root gRPC server. @@ -90,7 +92,13 @@ func main() { } func start(c *cobra.Command, args []string) { - clientset, err := kube.New(nil).ClientSet() + kc := kube.New(nil) + clientcfg, err := kc.ClientConfig() + if err != nil { + fmt.Fprintf(os.Stderr, "Cannot initialize Kubernetes connection: %s\n", err) + os.Exit(1) + } + clientset, err := kc.ClientSet() if err != nil { fmt.Fprintf(os.Stderr, "Cannot initialize Kubernetes connection: %s\n", err) os.Exit(1) @@ -101,6 +109,9 @@ func start(c *cobra.Command, args []string) { env.Releases = storage.Init(driver.NewMemory()) case storageConfigMap: env.Releases = storage.Init(driver.NewConfigMaps(clientset.Core().ConfigMaps(namespace()))) + case storageInlineTPR: + cs := rcs.NewExtensionsForConfigOrDie(clientcfg) + env.Releases = storage.Init(driver.NewReleases(cs.Release(namespace()))) } lstn, err := net.Listen("tcp", grpcAddr) diff --git a/pkg/storage/driver/releases.go b/pkg/storage/driver/releases.go new file mode 100644 index 000000000..459b5732c --- /dev/null +++ b/pkg/storage/driver/releases.go @@ -0,0 +1,287 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package driver // import "k8s.io/helm/pkg/storage/driver" + +import ( + "fmt" + rapi "k8s.io/helm/api" + "strconv" + "strings" + "time" + + google_protobuf "github.com/golang/protobuf/ptypes/timestamp" + "k8s.io/kubernetes/pkg/api" + kberrs "k8s.io/kubernetes/pkg/api/errors" + kblabels "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util/validation" + + "k8s.io/helm/client/clientset" + rspb "k8s.io/helm/pkg/proto/hapi/release" + "k8s.io/kubernetes/pkg/api/unversioned" +) + +var _ Driver = (*Releases)(nil) + +// ReleasesDriverName is the string name of the driver. +const ReleasesDriverName = "helm.sh/Release" + +// Releases is a wrapper around an implementation of a kubernetes +// ReleasesInterface. +type Releases struct { + impl clientset.ReleaseInterface +} + +// NewReleases initializes a new Releases wrapping an implmenetation of +// the kubernetes ReleasesInterface. +func NewReleases(impl clientset.ReleaseInterface) *Releases { + return &Releases{impl: impl} +} + +// Name returns the name of the driver. +func (releases *Releases) Name() string { + return ReleasesDriverName +} + +// Get fetches the release named by key. The corresponding release is returned +// or error if not found. +func (releases *Releases) Get(key string) (*rspb.Release, error) { + // fetch the release holding the release named by key + obj, err := releases.impl.Get(key) + if err != nil { + if kberrs.IsNotFound(err) { + return nil, ErrReleaseNotFound + } + + logerrf(err, "get: failed to get %q", key) + return nil, err + } + // found the release, decode the base64 data string + r, err := decodeRelease(obj.Spec.Data.Inline) + if err != nil { + logerrf(err, "get: failed to decode data %q", key) + return nil, err + } + // return the release object + return r, nil +} + +// List fetches all releases and returns the list releases such +// that filter(release) == true. An error is returned if the +// release fails to retrieve the releases. +func (releases *Releases) List(filter func(*rspb.Release) bool) ([]*rspb.Release, error) { + lsel := kblabels.Set{"OWNER": "TILLER"}.AsSelector() + opts := api.ListOptions{LabelSelector: lsel} + + list, err := releases.impl.List(opts) + if err != nil { + logerrf(err, "list: failed to list") + return nil, err + } + + var results []*rspb.Release + + // iterate over the releases object list + // and decode each release + for _, item := range list.Items { + rls, err := decodeRelease(item.Spec.Data.Inline) + if err != nil { + logerrf(err, "list: failed to decode release: %v", item) + continue + } + if filter(rls) { + results = append(results, rls) + } + } + return results, nil +} + +// Query fetches all releases that match the provided map of labels. +// An error is returned if the release fails to retrieve the releases. +func (releases *Releases) Query(labels map[string]string) ([]*rspb.Release, error) { + ls := kblabels.Set{} + for k, v := range labels { + if errs := validation.IsValidLabelValue(v); len(errs) != 0 { + return nil, fmt.Errorf("invalid label value: %q: %s", v, strings.Join(errs, "; ")) + } + ls[k] = v + } + + opts := api.ListOptions{LabelSelector: ls.AsSelector()} + + list, err := releases.impl.List(opts) + if err != nil { + logerrf(err, "query: failed to query with labels") + return nil, err + } + + if len(list.Items) == 0 { + return nil, ErrReleaseNotFound + } + + var results []*rspb.Release + for _, item := range list.Items { + rls, err := decodeRelease(item.Spec.Data.Inline) + if err != nil { + logerrf(err, "query: failed to decode release: %s", err) + continue + } + results = append(results, rls) + } + return results, nil +} + +// Create creates a new Release holding the release. If the +// Release already exists, ErrReleaseExists is returned. +func (releases *Releases) Create(key string, rls *rspb.Release) error { + // set labels for releases object meta data + var lbs labels + + lbs.init() + lbs.set("CREATED_AT", strconv.Itoa(int(time.Now().Unix()))) + + // create a new release to hold the release + obj, err := newReleasesObject(key, rls, lbs) + if err != nil { + logerrf(err, "create: failed to encode release %q", rls.Name) + return err + } + // push the release object out into the kubiverse + if _, err := releases.impl.Create(obj); err != nil { + if kberrs.IsAlreadyExists(err) { + return ErrReleaseExists + } + + logerrf(err, "create: failed to create") + return err + } + return nil +} + +// Update updates the Release holding the release. If not found +// the Release is created to hold the release. +func (releases *Releases) Update(key string, rls *rspb.Release) error { + // set labels for releases object meta data + var lbs labels + + lbs.init() + lbs.set("MODIFIED_AT", strconv.Itoa(int(time.Now().Unix()))) + + // create a new release object to hold the release + obj, err := newReleasesObject(key, rls, lbs) + if err != nil { + logerrf(err, "update: failed to encode release %q", rls.Name) + return err + } + // push the release object out into the kubiverse + _, err = releases.impl.Update(obj) + if err != nil { + logerrf(err, "update: failed to update") + return err + } + return nil +} + +// Delete deletes the Release holding the release named by key. +func (releases *Releases) Delete(key string) (rls *rspb.Release, err error) { + // fetch the release to check existence + if rls, err = releases.Get(key); err != nil { + if kberrs.IsNotFound(err) { + return nil, ErrReleaseNotFound + } + + logerrf(err, "delete: failed to get release %q", key) + return nil, err + } + // delete the release + if err = releases.impl.Delete(key); err != nil { + return rls, err + } + return rls, nil +} + +// newReleasesObject constructs a kubernetes Release object +// to store a release. Each release data entry is the base64 +// encoded string of a release's binary protobuf encoding. +// +// The following labels are used within each release: +// +// "MODIFIED_AT" - timestamp indicating when this release was last modified. (set in Update) +// "CREATED_AT" - timestamp indicating when this release was created. (set in Create) +// "VERSION" - version of the release. +// "STATUS" - status of the release (see proto/hapi/release.status.pb.go for variants) +// "OWNER" - owner of the release, currently "TILLER". +// "NAME" - name of the release. +// +func newReleasesObject(key string, rls *rspb.Release, lbs labels) (*rapi.Release, error) { + const owner = "TILLER" + + // encode the release + s, err := encodeRelease(rls) + if err != nil { + return nil, err + } + + if lbs == nil { + lbs.init() + } + + // apply labels + lbs.set("NAME", rls.Name) + lbs.set("OWNER", owner) + lbs.set("STATUS", rspb.Status_Code_name[int32(rls.Info.Status.Code)]) + lbs.set("VERSION", strconv.Itoa(int(rls.Version))) + + // create and return release object + r := &rapi.Release{ + ObjectMeta: api.ObjectMeta{ + Name: key, + Labels: lbs.toMap(), + }, + Spec: rapi.ReleaseSpec{ + Config: rls.Config, + Version: rls.Version, + Data: rapi.ReleaseData{ + Inline: s, + }, + }, + Status: rapi.ReleaseStatus{ + FirstDeployed: toKubeTime(rls.Info.FirstDeployed), + LastDeployed: toKubeTime(rls.Info.LastDeployed), + Deleted: toKubeTime(rls.Info.Deleted), + }, + } + if rls.Info != nil { + r.Spec.Description = rls.Info.Description + if rls.Info.Status != nil { + r.Status.Code = rls.Info.Status.Code.String() + r.Status.Resources = rls.Info.Status.Resources + r.Status.Notes = rls.Info.Status.Notes + } + } + if rls.Chart != nil { + r.Spec.ChartMetadata = rls.Chart.Metadata + } + return r, nil +} + +func toKubeTime(pbt *google_protobuf.Timestamp) unversioned.Time { + var t unversioned.Time + if pbt != nil { + t = unversioned.NewTime(time.Unix(pbt.Seconds, int64(pbt.Nanos))) + } + return t +} diff --git a/pkg/storage/driver/releases_test.go b/pkg/storage/driver/releases_test.go new file mode 100644 index 000000000..4e51424a3 --- /dev/null +++ b/pkg/storage/driver/releases_test.go @@ -0,0 +1,208 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package driver + +import ( + "encoding/base64" + "reflect" + "testing" + + "github.com/gogo/protobuf/proto" + + _ "k8s.io/helm/api/install" + "k8s.io/helm/client/clientset/fake" + rspb "k8s.io/helm/pkg/proto/hapi/release" + "k8s.io/kubernetes/pkg/runtime" +) + +// newTestFixture initializes a FakeReleaseInterface. +// ConfigMaps are created for each release provided. +func newTestFixtureReleases(t *testing.T, releases ...*rspb.Release) *Releases { + return NewReleases(fake.NewFakeExtensionClient(initFakeTPRs(t, releases...)...).Releases("default")) +} + +// initFakeTPRs initializes the FakeReleaseInterface with the set of releases. +func initFakeTPRs(t *testing.T, releases ...*rspb.Release) []runtime.Object { + var objects []runtime.Object + for _, rls := range releases { + objkey := testKey(rls.Name, rls.Version) + + r, err := newReleasesObject(objkey, rls, nil) + if err != nil { + t.Fatalf("Failed to create configmap: %s", err) + } + var obj runtime.Object = r + objects = append(objects, obj) + } + return objects +} + +func TestReleaseName(t *testing.T) { + c := newTestFixtureReleases(t) + if c.Name() != ReleasesDriverName { + t.Errorf("Expected name to be %q, got %q", ReleasesDriverName, c.Name()) + } +} + +func TestReleaseGet(t *testing.T) { + vers := int32(1) + name := "smug-pigeon" + namespace := "default" + key := testKey(name, vers) + rel := releaseStub(name, vers, namespace, rspb.Status_DEPLOYED) + + releases := newTestFixtureReleases(t, []*rspb.Release{rel}...) + + // get release with key + got, err := releases.Get(key) + if err != nil { + t.Fatalf("Failed to get release: %s", err) + } + // compare fetched release with original + if !reflect.DeepEqual(rel, got) { + t.Errorf("Expected {%q}, got {%q}", rel, got) + } +} + +func TestUNcompressedReleaseGet(t *testing.T) { + vers := int32(1) + name := "smug-pigeon" + namespace := "default" + key := testKey(name, vers) + rel := releaseStub(name, vers, namespace, rspb.Status_DEPLOYED) + + // Create a test fixture which contains an uncompressed release + cfgmap, err := newReleasesObject(key, rel, nil) + if err != nil { + t.Fatalf("Failed to create configmap: %s", err) + } + b, err := proto.Marshal(rel) + if err != nil { + t.Fatalf("Failed to marshal release: %s", err) + } + cfgmap.Spec.Data.Inline = base64.StdEncoding.EncodeToString(b) + releases := NewReleases(fake.NewFakeExtensionClient(initFakeTPRs(t, rel)...).Releases("test")) + + // get release with key + got, err := releases.Get(key) + if err != nil { + t.Fatalf("Failed to get release: %s", err) + } + // compare fetched release with original + if !reflect.DeepEqual(rel, got) { + t.Errorf("Expected {%q}, got {%q}", rel, got) + } +} + +func TestReleaseList(t *testing.T) { + releases := newTestFixtureReleases(t, []*rspb.Release{ + releaseStub("key-1", 1, "default", rspb.Status_DELETED), + releaseStub("key-2", 1, "default", rspb.Status_DELETED), + releaseStub("key-3", 1, "default", rspb.Status_DEPLOYED), + releaseStub("key-4", 1, "default", rspb.Status_DEPLOYED), + releaseStub("key-5", 1, "default", rspb.Status_SUPERSEDED), + releaseStub("key-6", 1, "default", rspb.Status_SUPERSEDED), + }...) + + // list all deleted releases + del, err := releases.List(func(rel *rspb.Release) bool { + return rel.Info.Status.Code == rspb.Status_DELETED + }) + // check + if err != nil { + t.Errorf("Failed to list deleted: %s", err) + } + if len(del) != 2 { + t.Errorf("Expected 2 deleted, got %d:\n%v\n", len(del), del) + } + + // list all deployed releases + dpl, err := releases.List(func(rel *rspb.Release) bool { + return rel.Info.Status.Code == rspb.Status_DEPLOYED + }) + // check + if err != nil { + t.Errorf("Failed to list deployed: %s", err) + } + if len(dpl) != 2 { + t.Errorf("Expected 2 deployed, got %d", len(dpl)) + } + + // list all superseded releases + ssd, err := releases.List(func(rel *rspb.Release) bool { + return rel.Info.Status.Code == rspb.Status_SUPERSEDED + }) + // check + if err != nil { + t.Errorf("Failed to list superseded: %s", err) + } + if len(ssd) != 2 { + t.Errorf("Expected 2 superseded, got %d", len(ssd)) + } +} + +func TestReleaseCreate(t *testing.T) { + releases := newTestFixtureReleases(t) + + vers := int32(1) + name := "smug-pigeon" + namespace := "default" + key := testKey(name, vers) + rel := releaseStub(name, vers, namespace, rspb.Status_DEPLOYED) + + // store the release in a configmap + if err := releases.Create(key, rel); err != nil { + t.Fatalf("Failed to create release with key %q: %s", key, err) + } + + // get the release back + got, err := releases.Get(key) + if err != nil { + t.Fatalf("Failed to get release with key %q: %s", key, err) + } + + // compare created release with original + if !reflect.DeepEqual(rel, got) { + t.Errorf("Expected {%q}, got {%q}", rel, got) + } +} + +func TestReleaseUpdate(t *testing.T) { + vers := int32(1) + name := "smug-pigeon" + namespace := "default" + key := testKey(name, vers) + rel := releaseStub(name, vers, namespace, rspb.Status_DEPLOYED) + + releases := newTestFixtureReleases(t, []*rspb.Release{rel}...) + + // modify release status code + rel.Info.Status.Code = rspb.Status_SUPERSEDED + + // perform the update + if err := releases.Update(key, rel); err != nil { + t.Fatalf("Failed to update release: %s", err) + } + + // fetch the updated release + got, err := releases.Get(key) + if err != nil { + t.Fatalf("Failed to get release with key %q: %s", key, err) + } + + // check release has actually been updated by comparing modified fields + if rel.Info.Status.Code != got.Info.Status.Code { + t.Errorf("Expected status %s, got status %s", rel.Info.Status.Code, got.Info.Status.Code) + } +}