From d8cc1d3dc93b242e040662cf3ff0bb4624d22130 Mon Sep 17 00:00:00 2001 From: Tamal Saha Date: Tue, 21 Mar 2017 11:23:39 -0700 Subject: [PATCH 01/12] Store release data in custom TPR (#12) --- api/extensions/helm.yaml | 7 + api/install/install.go | 24 +++ api/register.go | 43 +++++ api/register_v1beta1.go | 29 +++ api/types.go | 68 +++++++ client/clientset/codec.go | 163 ++++++++++++++++ client/clientset/extensions.go | 99 ++++++++++ client/clientset/fake/README.md | 3 + client/clientset/fake/client.go | 19 ++ client/clientset/fake/extensions.go | 33 ++++ client/clientset/fake/release.go | 95 +++++++++ client/clientset/imports.go | 16 ++ client/clientset/release.go | 107 +++++++++++ cmd/tiller/tiller.go | 13 +- pkg/storage/driver/releases.go | 287 ++++++++++++++++++++++++++++ pkg/storage/driver/releases_test.go | 208 ++++++++++++++++++++ 16 files changed, 1213 insertions(+), 1 deletion(-) create mode 100644 api/extensions/helm.yaml create mode 100644 api/install/install.go create mode 100644 api/register.go create mode 100644 api/register_v1beta1.go create mode 100644 api/types.go create mode 100644 client/clientset/codec.go create mode 100644 client/clientset/extensions.go create mode 100644 client/clientset/fake/README.md create mode 100644 client/clientset/fake/client.go create mode 100644 client/clientset/fake/extensions.go create mode 100644 client/clientset/fake/release.go create mode 100644 client/clientset/imports.go create mode 100644 client/clientset/release.go create mode 100644 pkg/storage/driver/releases.go create mode 100644 pkg/storage/driver/releases_test.go diff --git a/api/extensions/helm.yaml b/api/extensions/helm.yaml new file mode 100644 index 000000000..1479a2295 --- /dev/null +++ b/api/extensions/helm.yaml @@ -0,0 +1,7 @@ +metadata: + name: release.helm.sh +apiVersion: extensions/v1beta1 +kind: ThirdPartyResource +description: "A specification of Helm release" +versions: + - name: v1beta1 diff --git a/api/install/install.go b/api/install/install.go new file mode 100644 index 000000000..e083d0c82 --- /dev/null +++ b/api/install/install.go @@ -0,0 +1,24 @@ +package install + +import ( + aci "k8s.io/helm/api" + "k8s.io/kubernetes/pkg/apimachinery/announced" + "k8s.io/kubernetes/pkg/util/sets" +) + +func init() { + if err := announced.NewGroupMetaFactory( + &announced.GroupMetaFactoryArgs{ + GroupName: aci.GroupName, + VersionPreferenceOrder: []string{aci.V1beta1SchemeGroupVersion.Version}, + ImportPrefix: "k8s.io/helm/api", + RootScopedKinds: sets.NewString("ThirdPartyResource"), + AddInternalObjectsToScheme: aci.AddToScheme, + }, + announced.VersionToSchemeFunc{ + aci.V1beta1SchemeGroupVersion.Version: aci.V1betaAddToScheme, + }, + ).Announce().RegisterAndEnable(); err != nil { + panic(err) + } +} diff --git a/api/register.go b/api/register.go new file mode 100644 index 000000000..39d21faf2 --- /dev/null +++ b/api/register.go @@ -0,0 +1,43 @@ +package api + +import ( + "k8s.io/kubernetes/pkg/api" + schema "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/runtime" +) + +// GroupName is the group name use in this package +const GroupName = "helm.sh" + +// SchemeGroupVersion is group version used to register these objects +var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: runtime.APIVersionInternal} + +// Kind takes an unqualified kind and returns back a Group qualified GroupKind +func Kind(kind string) schema.GroupKind { + return SchemeGroupVersion.WithKind(kind).GroupKind() +} + +// Resource takes an unqualified resource and returns back a Group qualified GroupResource +func Resource(resource string) schema.GroupResource { + return SchemeGroupVersion.WithResource(resource).GroupResource() +} + +var ( + SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) + AddToScheme = SchemeBuilder.AddToScheme +) + +// Adds the list of known types to api.Scheme. +func addKnownTypes(scheme *runtime.Scheme) error { + scheme.AddKnownTypes(SchemeGroupVersion, + &Release{}, + &ReleaseList{}, + + &api.ListOptions{}, + &api.DeleteOptions{}, + ) + return nil +} + +func (obj *Release) GetObjectKind() schema.ObjectKind { return &obj.TypeMeta } +func (obj *ReleaseList) GetObjectKind() schema.ObjectKind { return &obj.TypeMeta } diff --git a/api/register_v1beta1.go b/api/register_v1beta1.go new file mode 100644 index 000000000..1f5c98929 --- /dev/null +++ b/api/register_v1beta1.go @@ -0,0 +1,29 @@ +package api + +import ( + schema "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/runtime" + versionedwatch "k8s.io/kubernetes/pkg/watch/versioned" +) + +// SchemeGroupVersion is group version used to register these objects +var V1beta1SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1beta1"} + +var ( + V1beta1SchemeBuilder = runtime.NewSchemeBuilder(v1addKnownTypes) + V1betaAddToScheme = V1beta1SchemeBuilder.AddToScheme +) + +// Adds the list of known types to api.Scheme. +func v1addKnownTypes(scheme *runtime.Scheme) error { + scheme.AddKnownTypes(V1beta1SchemeGroupVersion, + &Release{}, + &ReleaseList{}, + + &v1.ListOptions{}, + &v1.DeleteOptions{}, + ) + versionedwatch.AddToGroupVersion(scheme, V1beta1SchemeGroupVersion) + return nil +} diff --git a/api/types.go b/api/types.go new file mode 100644 index 000000000..a22b56db6 --- /dev/null +++ b/api/types.go @@ -0,0 +1,68 @@ +package api + +import ( + hapi_chart "k8s.io/helm/pkg/proto/hapi/chart" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" +) + +// Release captures the state of a individual release and are immutable. +// Release replaces the version wise configmaps used by Tiller 2.0 +type Release struct { + unversioned.TypeMeta `json:",inline,omitempty"` + api.ObjectMeta `json:"metadata,omitempty"` + Spec ReleaseSpec `json:"spec,omitempty"` + Status ReleaseStatus `json:"status,omitempty"` +} + +type ReleaseSpec struct { + // Description is human-friendly "log entry" about this release. + Description string `json:"Description,omitempty"` + // Chart is the chart that was released. + ChartMetadata *hapi_chart.Metadata `json:"chartMetadata,omitempty"` + // Config is the set of extra Values added to the chart. + // These values override the default values inside of the chart. + Config *hapi_chart.Config `json:"config,omitempty"` + // Version is an int32 which represents the version of the release. + Version int32 `json:"version,omitempty"` + + // TODO(tamal): Store in proper namespace + // Namespace is the kubernetes namespace of the release. + // Namespace string `json:"namespace,omitempty"` + + // The ChartSource represents the location and type of a chart to install. + // This is modelled like Volume in Pods, which allows specifying a chart + // inline (like today) or pulling a chart object from a (potentially private) + // chart registry similar to pulling a Docker image. + Data ReleaseData `json:"data,omitempty"` +} + +//------------------------------------------------------------------------------------------- +// Chart represents a chart that is installed in a Release. +// The ChartSource represents the location and type of a chart to install. +// This is modelled like Volume in Pods, which allows specifying a chart +// inline (like today) or pulling a chart object from a (potentially private) chart registry similar to pulling a Docker image. +// +optional +type ReleaseData struct { + // Inline charts are what is done today with Helm cli. Release request + // contains the chart definition in the release spec, sent by Helm cli. + Inline string `json:"inline,omitempty"` +} + +type ReleaseStatus struct { + Code string `json:"code,omitempty"` + // Cluster resources as kubectl would print them. + Resources string `json:"resources,omitempty"` + // Contains the rendered templates/NOTES.txt if available + Notes string `json:"notes,omitempty"` + FirstDeployed unversioned.Time `json:"first_deployed,omitempty"` + LastDeployed unversioned.Time `json:"last_deployed,omitempty"` + // Deleted tracks when this object was deleted. + Deleted unversioned.Time `json:"deleted,omitempty"` +} + +type ReleaseList struct { + unversioned.TypeMeta `json:",inline"` + unversioned.ListMeta `json:"metadata,omitempty"` + Items []Release `json:"items,omitempty"` +} diff --git a/client/clientset/codec.go b/client/clientset/codec.go new file mode 100644 index 000000000..989c544a4 --- /dev/null +++ b/client/clientset/codec.go @@ -0,0 +1,163 @@ +package clientset + +import ( + "encoding/json" + "io" + "net/url" + "reflect" + "strings" + + "github.com/ghodss/yaml" + aci "k8s.io/helm/api" + "k8s.io/kubernetes/pkg/api" + schema "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/runtime" + kubejson "k8s.io/kubernetes/pkg/runtime/serializer/json" + "log" +) + +// TODO(@sadlil): Find a better way to replace ExtendedCodec to encode and decode objects. +// Follow the guide to replace it with api.Codec and api.ParameterCodecs. +var ExtendedCodec = &extendedCodec{} + +// DirectCodecFactory provides methods for retrieving "DirectCodec"s, which do not do conversion. +type DirectCodecFactory struct { + *extendedCodec +} + +// EncoderForVersion returns an encoder that does not do conversion. gv is ignored. +func (f DirectCodecFactory) EncoderForVersion(serializer runtime.Encoder, _ runtime.GroupVersioner) runtime.Encoder { + return serializer +} + +// DecoderToVersion returns an decoder that does not do conversion. gv is ignored. +func (f DirectCodecFactory) DecoderToVersion(serializer runtime.Decoder, _ runtime.GroupVersioner) runtime.Decoder { + return serializer +} + +// SupportedMediaTypes returns the RFC2046 media types that this factory has serializers for. +func (f DirectCodecFactory) SupportedMediaTypes() []runtime.SerializerInfo { + return []runtime.SerializerInfo{ + { + MediaType: "application/json", + EncodesAsText: true, + Serializer: &extendedCodec{}, + PrettySerializer: &extendedCodec{pretty: true}, + StreamSerializer: &runtime.StreamSerializerInfo{ + Framer: kubejson.Framer, + EncodesAsText: true, + Serializer: &extendedCodec{}, + }, + }, + { + MediaType: "application/yaml", + EncodesAsText: true, + Serializer: &extendedCodec{yaml: true}, + PrettySerializer: &extendedCodec{yaml: true}, + }, + } +} + +type extendedCodec struct { + pretty bool + yaml bool +} + +func (e *extendedCodec) Decode(data []byte, gvk *schema.GroupVersionKind, obj runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) { + if e.yaml { + altered, err := yaml.YAMLToJSON(data) + if err != nil { + return nil, nil, err + } + data = altered + } + if obj == nil { + metadata := &schema.TypeMeta{} + err := json.Unmarshal(data, metadata) + if err != nil { + return obj, gvk, err + } + log.Println("Detected metadata type for nil object, got", metadata.APIVersion, metadata.Kind) + obj, err = setDefaultType(metadata) + if err != nil { + return obj, gvk, err + } + } + err := json.Unmarshal(data, obj) + if err != nil { + return obj, gvk, err + } + return obj, gvk, nil +} + +func (e *extendedCodec) Encode(obj runtime.Object, w io.Writer) error { + setDefaultVersionKind(obj) + if e.yaml { + json, err := json.Marshal(obj) + if err != nil { + return err + } + data, err := yaml.JSONToYAML(json) + if err != nil { + return err + } + _, err = w.Write(data) + } + + if e.pretty { + data, err := json.MarshalIndent(obj, "", " ") + if err != nil { + return err + } + _, err = w.Write(data) + return err + } + return json.NewEncoder(w).Encode(obj) +} + +// DecodeParameters converts the provided url.Values into an object of type From with the kind of into, and then +// converts that object to into (if necessary). Returns an error if the operation cannot be completed. +func (*extendedCodec) DecodeParameters(parameters url.Values, from schema.GroupVersion, into runtime.Object) error { + if len(parameters) == 0 { + return nil + } + _, okDelete := into.(*api.DeleteOptions) + if _, okList := into.(*api.ListOptions); okList || okDelete { + from = schema.GroupVersion{Version: "v1"} + } + return runtime.NewParameterCodec(api.Scheme).DecodeParameters(parameters, from, into) +} + +// EncodeParameters converts the provided object into the to version, then converts that object to url.Values. +// Returns an error if conversion is not possible. +func (c *extendedCodec) EncodeParameters(obj runtime.Object, to schema.GroupVersion) (url.Values, error) { + result := url.Values{} + if obj == nil { + return result, nil + } + _, okDelete := obj.(*api.DeleteOptions) + if _, okList := obj.(*api.ListOptions); okList || okDelete { + to = schema.GroupVersion{Version: "v1"} + } + return runtime.NewParameterCodec(api.Scheme).EncodeParameters(obj, to) +} + +func setDefaultVersionKind(obj runtime.Object) { + // Check the values can are In type Extended Ingress + defaultGVK := schema.GroupVersionKind{ + Group: aci.V1beta1SchemeGroupVersion.Group, + Version: aci.V1beta1SchemeGroupVersion.Version, + } + + fullyQualifiedKind := reflect.ValueOf(obj).Type().String() + lastIndexOfDot := strings.LastIndex(fullyQualifiedKind, ".") + if lastIndexOfDot > 0 { + defaultGVK.Kind = fullyQualifiedKind[lastIndexOfDot+1:] + } + + obj.GetObjectKind().SetGroupVersionKind(defaultGVK) +} + +func setDefaultType(metadata *schema.TypeMeta) (runtime.Object, error) { + return api.Scheme.New(metadata.GroupVersionKind()) +} diff --git a/client/clientset/extensions.go b/client/clientset/extensions.go new file mode 100644 index 000000000..9e76ebdf9 --- /dev/null +++ b/client/clientset/extensions.go @@ -0,0 +1,99 @@ +package clientset + +import ( + "fmt" + + schema "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apimachinery/registered" + rest "k8s.io/kubernetes/pkg/client/restclient" +) + +const ( + defaultAPIPath = "/apis" +) + +type ExtensionInterface interface { + RESTClient() rest.Interface + ReleaseNamespacer +} + +// ExtensionsClient is used to interact with experimental Kubernetes features. +// Features of Extensions group are not supported and may be changed or removed in +// incompatible ways at any time. +type ExtensionsClient struct { + restClient rest.Interface +} + +func (a *ExtensionsClient) Release(namespace string) ReleaseInterface { + return newRelease(a, namespace) +} + +// NewExtensions creates a new ExtensionsClient for the given config. This client +// provides access to experimental Kubernetes features. +// Features of Extensions group are not supported and may be changed or removed in +// incompatible ways at any time. +func NewExtensionsForConfig(c *rest.Config) (*ExtensionsClient, error) { + config := *c + if err := setExtensionsDefaults(&config); err != nil { + return nil, err + } + client, err := rest.RESTClientFor(&config) + if err != nil { + return nil, err + } + return &ExtensionsClient{client}, nil +} + +// NewExtensionsOrDie creates a new ExtensionsClient for the given config and +// panics if there is an error in the config. +// Features of Extensions group are not supported and may be changed or removed in +// incompatible ways at any time. +func NewExtensionsForConfigOrDie(c *rest.Config) *ExtensionsClient { + client, err := NewExtensionsForConfig(c) + if err != nil { + panic(err) + } + return client +} + +// New creates a new ExtensionsV1beta1Client for the given RESTClient. +func NewNewExtensions(c rest.Interface) *ExtensionsClient { + return &ExtensionsClient{c} +} + +func setExtensionsDefaults(config *rest.Config) error { + gv, err := schema.ParseGroupVersion("helm.sh/v1beta1") + if err != nil { + return err + } + // if helm.sh/v1beta1 is not enabled, return an error + if !registered.IsEnabledVersion(gv) { + return fmt.Errorf("helm.sh/v1beta1 is not enabled") + } + config.APIPath = defaultAPIPath + if config.UserAgent == "" { + config.UserAgent = rest.DefaultKubernetesUserAgent() + } + + if config.GroupVersion == nil || config.GroupVersion.Group != "helm.sh" { + g, err := registered.Group("helm.sh") + if err != nil { + return err + } + copyGroupVersion := g.GroupVersion + config.GroupVersion = ©GroupVersion + } + + config.NegotiatedSerializer = DirectCodecFactory{extendedCodec: ExtendedCodec} + + return nil +} + +// RESTClient returns a RESTClient that is used to communicate +// with API server by this client implementation. +func (c *ExtensionsClient) RESTClient() rest.Interface { + if c == nil { + return nil + } + return c.restClient +} diff --git a/client/clientset/fake/README.md b/client/clientset/fake/README.md new file mode 100644 index 000000000..206ab4cc8 --- /dev/null +++ b/client/clientset/fake/README.md @@ -0,0 +1,3 @@ +## fakes + +This package is a fake kube resource implementation. Should Only be used for Testing purpose. \ No newline at end of file diff --git a/client/clientset/fake/client.go b/client/clientset/fake/client.go new file mode 100644 index 000000000..831f68632 --- /dev/null +++ b/client/clientset/fake/client.go @@ -0,0 +1,19 @@ +package fake + +import ( + _ "github.com/appscode/log" + "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/fake" + "k8s.io/kubernetes/pkg/runtime" +) + +type ClientSets struct { + *fake.Clientset + ExtensionClient *FakeExtensionClient +} + +func NewFakeClient(objects ...runtime.Object) *ClientSets { + return &ClientSets{ + Clientset: fake.NewSimpleClientset(objects...), + ExtensionClient: NewFakeExtensionClient(objects...), + } +} diff --git a/client/clientset/fake/extensions.go b/client/clientset/fake/extensions.go new file mode 100644 index 000000000..209162f93 --- /dev/null +++ b/client/clientset/fake/extensions.go @@ -0,0 +1,33 @@ +package fake + +import ( + "k8s.io/helm/client/clientset" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apimachinery/registered" + testing "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" +) + +type FakeExtensionClient struct { + *testing.Fake +} + +func NewFakeExtensionClient(objects ...runtime.Object) *FakeExtensionClient { + o := testing.NewObjectTracker(api.Scheme, api.Codecs.UniversalDecoder()) + for _, obj := range objects { + if obj.GetObjectKind().GroupVersionKind().Group == "helm.sh" { + if err := o.Add(obj); err != nil { + panic(err) + } + } + } + fakePtr := testing.Fake{} + fakePtr.AddReactor("*", "*", testing.ObjectReaction(o, registered.RESTMapper())) + fakePtr.AddWatchReactor("*", testing.DefaultWatchReactor(watch.NewFake(), nil)) + return &FakeExtensionClient{&fakePtr} +} + +func (m *FakeExtensionClient) Releases(ns string) clientset.ReleaseInterface { + return &FakeRelease{Fake: m.Fake, ns: ns} +} diff --git a/client/clientset/fake/release.go b/client/clientset/fake/release.go new file mode 100644 index 000000000..c89e8eb1d --- /dev/null +++ b/client/clientset/fake/release.go @@ -0,0 +1,95 @@ +package fake + +import ( + aci "k8s.io/helm/api" + "k8s.io/kubernetes/pkg/api" + schema "k8s.io/kubernetes/pkg/api/unversioned" + testing "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/watch" +) + +type FakeRelease struct { + Fake *testing.Fake + ns string +} + +var certResource = schema.GroupVersionResource{Group: "helm.sh", Version: "v1beta1", Resource: "releases"} + +// Get returns the Release by name. +func (mock *FakeRelease) Get(name string) (*aci.Release, error) { + obj, err := mock.Fake. + Invokes(testing.NewGetAction(certResource, mock.ns, name), &aci.Release{}) + + if obj == nil { + return nil, err + } + return obj.(*aci.Release), err +} + +// List returns the a of Releases. +func (mock *FakeRelease) List(opts api.ListOptions) (*aci.ReleaseList, error) { + obj, err := mock.Fake. + Invokes(testing.NewListAction(certResource, mock.ns, opts), &aci.Release{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &aci.ReleaseList{} + for _, item := range obj.(*aci.ReleaseList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Create creates a new Release. +func (mock *FakeRelease) Create(svc *aci.Release) (*aci.Release, error) { + obj, err := mock.Fake. + Invokes(testing.NewCreateAction(certResource, mock.ns, svc), &aci.Release{}) + + if obj == nil { + return nil, err + } + return obj.(*aci.Release), err +} + +// Update updates a Release. +func (mock *FakeRelease) Update(svc *aci.Release) (*aci.Release, error) { + obj, err := mock.Fake. + Invokes(testing.NewUpdateAction(certResource, mock.ns, svc), &aci.Release{}) + + if obj == nil { + return nil, err + } + return obj.(*aci.Release), err +} + +// Delete deletes a Release by name. +func (mock *FakeRelease) Delete(name string) error { + _, err := mock.Fake. + Invokes(testing.NewDeleteAction(certResource, mock.ns, name), &aci.Release{}) + + return err +} + +func (mock *FakeRelease) UpdateStatus(srv *aci.Release) (*aci.Release, error) { + obj, err := mock.Fake. + Invokes(testing.NewUpdateSubresourceAction(certResource, "status", mock.ns, srv), &aci.Release{}) + + if obj == nil { + return nil, err + } + return obj.(*aci.Release), err +} + +func (mock *FakeRelease) Watch(opts api.ListOptions) (watch.Interface, error) { + return mock.Fake. + InvokesWatch(testing.NewWatchAction(certResource, mock.ns, opts)) +} diff --git a/client/clientset/imports.go b/client/clientset/imports.go new file mode 100644 index 000000000..b53ab418b --- /dev/null +++ b/client/clientset/imports.go @@ -0,0 +1,16 @@ +package clientset + +// These imports are the API groups the client will support. +import ( + "fmt" + + _ "k8s.io/helm/api/install" + _ "k8s.io/kubernetes/pkg/api/install" + "k8s.io/kubernetes/pkg/apimachinery/registered" +) + +func init() { + if missingVersions := registered.ValidateEnvRequestedVersions(); len(missingVersions) != 0 { + panic(fmt.Sprintf("KUBE_API_VERSIONS contains versions that are not installed: %q.", missingVersions)) + } +} diff --git a/client/clientset/release.go b/client/clientset/release.go new file mode 100644 index 000000000..077e215f8 --- /dev/null +++ b/client/clientset/release.go @@ -0,0 +1,107 @@ +package clientset + +import ( + aci "k8s.io/helm/api" + "k8s.io/kubernetes/pkg/api" + rest "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/watch" +) + +type ReleaseNamespacer interface { + Release(namespace string) ReleaseInterface +} + +type ReleaseInterface interface { + List(opts api.ListOptions) (*aci.ReleaseList, error) + Get(name string) (*aci.Release, error) + Create(release *aci.Release) (*aci.Release, error) + Update(release *aci.Release) (*aci.Release, error) + Delete(name string) error + Watch(opts api.ListOptions) (watch.Interface, error) + UpdateStatus(release *aci.Release) (*aci.Release, error) +} + +type ReleaseImpl struct { + r rest.Interface + ns string +} + +func newRelease(c *ExtensionsClient, namespace string) *ReleaseImpl { + return &ReleaseImpl{c.restClient, namespace} +} + +func (c *ReleaseImpl) List(opts api.ListOptions) (result *aci.ReleaseList, err error) { + result = &aci.ReleaseList{} + err = c.r.Get(). + Namespace(c.ns). + Resource("releases"). + VersionedParams(&opts, ExtendedCodec). + Do(). + Into(result) + return +} + +func (c *ReleaseImpl) Get(name string) (result *aci.Release, err error) { + result = &aci.Release{} + err = c.r.Get(). + Namespace(c.ns). + Resource("releases"). + Name(name). + Do(). + Into(result) + return +} + +func (c *ReleaseImpl) Create(release *aci.Release) (result *aci.Release, err error) { + result = &aci.Release{} + err = c.r.Post(). + Namespace(c.ns). + Resource("releases"). + Body(release). + Do(). + Into(result) + return +} + +func (c *ReleaseImpl) Update(release *aci.Release) (result *aci.Release, err error) { + result = &aci.Release{} + err = c.r.Put(). + Namespace(c.ns). + Resource("releases"). + Name(release.Name). + Body(release). + Do(). + Into(result) + return +} + +func (c *ReleaseImpl) Delete(name string) (err error) { + return c.r.Delete(). + Namespace(c.ns). + Resource("releases"). + Name(name). + Do(). + Error() +} + +func (c *ReleaseImpl) Watch(opts api.ListOptions) (watch.Interface, error) { + return c.r.Get(). + Prefix("watch"). + Namespace(c.ns). + Resource("releases"). + VersionedParams(&opts, ExtendedCodec). + Watch() +} + +func (c *ReleaseImpl) UpdateStatus(release *aci.Release) (result *aci.Release, err error) { + result = &aci.Release{} + err = c.r.Put(). + Namespace(c.ns). + Resource("releases"). + Name(release.Name). + SubResource("status"). + Body(release). + Do(). + Into(result) + return +} diff --git a/cmd/tiller/tiller.go b/cmd/tiller/tiller.go index 72388d307..8ab31f7c6 100644 --- a/cmd/tiller/tiller.go +++ b/cmd/tiller/tiller.go @@ -27,6 +27,7 @@ import ( "github.com/spf13/cobra" + rcs "k8s.io/helm/client/clientset" "k8s.io/helm/pkg/kube" "k8s.io/helm/pkg/proto/hapi/services" "k8s.io/helm/pkg/storage" @@ -39,6 +40,7 @@ import ( const ( storageMemory = "memory" storageConfigMap = "configmap" + storageInlineTPR = "inline-tpr" ) // rootServer is the root gRPC server. @@ -90,7 +92,13 @@ func main() { } func start(c *cobra.Command, args []string) { - clientset, err := kube.New(nil).ClientSet() + kc := kube.New(nil) + clientcfg, err := kc.ClientConfig() + if err != nil { + fmt.Fprintf(os.Stderr, "Cannot initialize Kubernetes connection: %s\n", err) + os.Exit(1) + } + clientset, err := kc.ClientSet() if err != nil { fmt.Fprintf(os.Stderr, "Cannot initialize Kubernetes connection: %s\n", err) os.Exit(1) @@ -101,6 +109,9 @@ func start(c *cobra.Command, args []string) { env.Releases = storage.Init(driver.NewMemory()) case storageConfigMap: env.Releases = storage.Init(driver.NewConfigMaps(clientset.Core().ConfigMaps(namespace()))) + case storageInlineTPR: + cs := rcs.NewExtensionsForConfigOrDie(clientcfg) + env.Releases = storage.Init(driver.NewReleases(cs.Release(namespace()))) } lstn, err := net.Listen("tcp", grpcAddr) diff --git a/pkg/storage/driver/releases.go b/pkg/storage/driver/releases.go new file mode 100644 index 000000000..459b5732c --- /dev/null +++ b/pkg/storage/driver/releases.go @@ -0,0 +1,287 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package driver // import "k8s.io/helm/pkg/storage/driver" + +import ( + "fmt" + rapi "k8s.io/helm/api" + "strconv" + "strings" + "time" + + google_protobuf "github.com/golang/protobuf/ptypes/timestamp" + "k8s.io/kubernetes/pkg/api" + kberrs "k8s.io/kubernetes/pkg/api/errors" + kblabels "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util/validation" + + "k8s.io/helm/client/clientset" + rspb "k8s.io/helm/pkg/proto/hapi/release" + "k8s.io/kubernetes/pkg/api/unversioned" +) + +var _ Driver = (*Releases)(nil) + +// ReleasesDriverName is the string name of the driver. +const ReleasesDriverName = "helm.sh/Release" + +// Releases is a wrapper around an implementation of a kubernetes +// ReleasesInterface. +type Releases struct { + impl clientset.ReleaseInterface +} + +// NewReleases initializes a new Releases wrapping an implmenetation of +// the kubernetes ReleasesInterface. +func NewReleases(impl clientset.ReleaseInterface) *Releases { + return &Releases{impl: impl} +} + +// Name returns the name of the driver. +func (releases *Releases) Name() string { + return ReleasesDriverName +} + +// Get fetches the release named by key. The corresponding release is returned +// or error if not found. +func (releases *Releases) Get(key string) (*rspb.Release, error) { + // fetch the release holding the release named by key + obj, err := releases.impl.Get(key) + if err != nil { + if kberrs.IsNotFound(err) { + return nil, ErrReleaseNotFound + } + + logerrf(err, "get: failed to get %q", key) + return nil, err + } + // found the release, decode the base64 data string + r, err := decodeRelease(obj.Spec.Data.Inline) + if err != nil { + logerrf(err, "get: failed to decode data %q", key) + return nil, err + } + // return the release object + return r, nil +} + +// List fetches all releases and returns the list releases such +// that filter(release) == true. An error is returned if the +// release fails to retrieve the releases. +func (releases *Releases) List(filter func(*rspb.Release) bool) ([]*rspb.Release, error) { + lsel := kblabels.Set{"OWNER": "TILLER"}.AsSelector() + opts := api.ListOptions{LabelSelector: lsel} + + list, err := releases.impl.List(opts) + if err != nil { + logerrf(err, "list: failed to list") + return nil, err + } + + var results []*rspb.Release + + // iterate over the releases object list + // and decode each release + for _, item := range list.Items { + rls, err := decodeRelease(item.Spec.Data.Inline) + if err != nil { + logerrf(err, "list: failed to decode release: %v", item) + continue + } + if filter(rls) { + results = append(results, rls) + } + } + return results, nil +} + +// Query fetches all releases that match the provided map of labels. +// An error is returned if the release fails to retrieve the releases. +func (releases *Releases) Query(labels map[string]string) ([]*rspb.Release, error) { + ls := kblabels.Set{} + for k, v := range labels { + if errs := validation.IsValidLabelValue(v); len(errs) != 0 { + return nil, fmt.Errorf("invalid label value: %q: %s", v, strings.Join(errs, "; ")) + } + ls[k] = v + } + + opts := api.ListOptions{LabelSelector: ls.AsSelector()} + + list, err := releases.impl.List(opts) + if err != nil { + logerrf(err, "query: failed to query with labels") + return nil, err + } + + if len(list.Items) == 0 { + return nil, ErrReleaseNotFound + } + + var results []*rspb.Release + for _, item := range list.Items { + rls, err := decodeRelease(item.Spec.Data.Inline) + if err != nil { + logerrf(err, "query: failed to decode release: %s", err) + continue + } + results = append(results, rls) + } + return results, nil +} + +// Create creates a new Release holding the release. If the +// Release already exists, ErrReleaseExists is returned. +func (releases *Releases) Create(key string, rls *rspb.Release) error { + // set labels for releases object meta data + var lbs labels + + lbs.init() + lbs.set("CREATED_AT", strconv.Itoa(int(time.Now().Unix()))) + + // create a new release to hold the release + obj, err := newReleasesObject(key, rls, lbs) + if err != nil { + logerrf(err, "create: failed to encode release %q", rls.Name) + return err + } + // push the release object out into the kubiverse + if _, err := releases.impl.Create(obj); err != nil { + if kberrs.IsAlreadyExists(err) { + return ErrReleaseExists + } + + logerrf(err, "create: failed to create") + return err + } + return nil +} + +// Update updates the Release holding the release. If not found +// the Release is created to hold the release. +func (releases *Releases) Update(key string, rls *rspb.Release) error { + // set labels for releases object meta data + var lbs labels + + lbs.init() + lbs.set("MODIFIED_AT", strconv.Itoa(int(time.Now().Unix()))) + + // create a new release object to hold the release + obj, err := newReleasesObject(key, rls, lbs) + if err != nil { + logerrf(err, "update: failed to encode release %q", rls.Name) + return err + } + // push the release object out into the kubiverse + _, err = releases.impl.Update(obj) + if err != nil { + logerrf(err, "update: failed to update") + return err + } + return nil +} + +// Delete deletes the Release holding the release named by key. +func (releases *Releases) Delete(key string) (rls *rspb.Release, err error) { + // fetch the release to check existence + if rls, err = releases.Get(key); err != nil { + if kberrs.IsNotFound(err) { + return nil, ErrReleaseNotFound + } + + logerrf(err, "delete: failed to get release %q", key) + return nil, err + } + // delete the release + if err = releases.impl.Delete(key); err != nil { + return rls, err + } + return rls, nil +} + +// newReleasesObject constructs a kubernetes Release object +// to store a release. Each release data entry is the base64 +// encoded string of a release's binary protobuf encoding. +// +// The following labels are used within each release: +// +// "MODIFIED_AT" - timestamp indicating when this release was last modified. (set in Update) +// "CREATED_AT" - timestamp indicating when this release was created. (set in Create) +// "VERSION" - version of the release. +// "STATUS" - status of the release (see proto/hapi/release.status.pb.go for variants) +// "OWNER" - owner of the release, currently "TILLER". +// "NAME" - name of the release. +// +func newReleasesObject(key string, rls *rspb.Release, lbs labels) (*rapi.Release, error) { + const owner = "TILLER" + + // encode the release + s, err := encodeRelease(rls) + if err != nil { + return nil, err + } + + if lbs == nil { + lbs.init() + } + + // apply labels + lbs.set("NAME", rls.Name) + lbs.set("OWNER", owner) + lbs.set("STATUS", rspb.Status_Code_name[int32(rls.Info.Status.Code)]) + lbs.set("VERSION", strconv.Itoa(int(rls.Version))) + + // create and return release object + r := &rapi.Release{ + ObjectMeta: api.ObjectMeta{ + Name: key, + Labels: lbs.toMap(), + }, + Spec: rapi.ReleaseSpec{ + Config: rls.Config, + Version: rls.Version, + Data: rapi.ReleaseData{ + Inline: s, + }, + }, + Status: rapi.ReleaseStatus{ + FirstDeployed: toKubeTime(rls.Info.FirstDeployed), + LastDeployed: toKubeTime(rls.Info.LastDeployed), + Deleted: toKubeTime(rls.Info.Deleted), + }, + } + if rls.Info != nil { + r.Spec.Description = rls.Info.Description + if rls.Info.Status != nil { + r.Status.Code = rls.Info.Status.Code.String() + r.Status.Resources = rls.Info.Status.Resources + r.Status.Notes = rls.Info.Status.Notes + } + } + if rls.Chart != nil { + r.Spec.ChartMetadata = rls.Chart.Metadata + } + return r, nil +} + +func toKubeTime(pbt *google_protobuf.Timestamp) unversioned.Time { + var t unversioned.Time + if pbt != nil { + t = unversioned.NewTime(time.Unix(pbt.Seconds, int64(pbt.Nanos))) + } + return t +} diff --git a/pkg/storage/driver/releases_test.go b/pkg/storage/driver/releases_test.go new file mode 100644 index 000000000..4e51424a3 --- /dev/null +++ b/pkg/storage/driver/releases_test.go @@ -0,0 +1,208 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package driver + +import ( + "encoding/base64" + "reflect" + "testing" + + "github.com/gogo/protobuf/proto" + + _ "k8s.io/helm/api/install" + "k8s.io/helm/client/clientset/fake" + rspb "k8s.io/helm/pkg/proto/hapi/release" + "k8s.io/kubernetes/pkg/runtime" +) + +// newTestFixture initializes a FakeReleaseInterface. +// ConfigMaps are created for each release provided. +func newTestFixtureReleases(t *testing.T, releases ...*rspb.Release) *Releases { + return NewReleases(fake.NewFakeExtensionClient(initFakeTPRs(t, releases...)...).Releases("default")) +} + +// initFakeTPRs initializes the FakeReleaseInterface with the set of releases. +func initFakeTPRs(t *testing.T, releases ...*rspb.Release) []runtime.Object { + var objects []runtime.Object + for _, rls := range releases { + objkey := testKey(rls.Name, rls.Version) + + r, err := newReleasesObject(objkey, rls, nil) + if err != nil { + t.Fatalf("Failed to create configmap: %s", err) + } + var obj runtime.Object = r + objects = append(objects, obj) + } + return objects +} + +func TestReleaseName(t *testing.T) { + c := newTestFixtureReleases(t) + if c.Name() != ReleasesDriverName { + t.Errorf("Expected name to be %q, got %q", ReleasesDriverName, c.Name()) + } +} + +func TestReleaseGet(t *testing.T) { + vers := int32(1) + name := "smug-pigeon" + namespace := "default" + key := testKey(name, vers) + rel := releaseStub(name, vers, namespace, rspb.Status_DEPLOYED) + + releases := newTestFixtureReleases(t, []*rspb.Release{rel}...) + + // get release with key + got, err := releases.Get(key) + if err != nil { + t.Fatalf("Failed to get release: %s", err) + } + // compare fetched release with original + if !reflect.DeepEqual(rel, got) { + t.Errorf("Expected {%q}, got {%q}", rel, got) + } +} + +func TestUNcompressedReleaseGet(t *testing.T) { + vers := int32(1) + name := "smug-pigeon" + namespace := "default" + key := testKey(name, vers) + rel := releaseStub(name, vers, namespace, rspb.Status_DEPLOYED) + + // Create a test fixture which contains an uncompressed release + cfgmap, err := newReleasesObject(key, rel, nil) + if err != nil { + t.Fatalf("Failed to create configmap: %s", err) + } + b, err := proto.Marshal(rel) + if err != nil { + t.Fatalf("Failed to marshal release: %s", err) + } + cfgmap.Spec.Data.Inline = base64.StdEncoding.EncodeToString(b) + releases := NewReleases(fake.NewFakeExtensionClient(initFakeTPRs(t, rel)...).Releases("test")) + + // get release with key + got, err := releases.Get(key) + if err != nil { + t.Fatalf("Failed to get release: %s", err) + } + // compare fetched release with original + if !reflect.DeepEqual(rel, got) { + t.Errorf("Expected {%q}, got {%q}", rel, got) + } +} + +func TestReleaseList(t *testing.T) { + releases := newTestFixtureReleases(t, []*rspb.Release{ + releaseStub("key-1", 1, "default", rspb.Status_DELETED), + releaseStub("key-2", 1, "default", rspb.Status_DELETED), + releaseStub("key-3", 1, "default", rspb.Status_DEPLOYED), + releaseStub("key-4", 1, "default", rspb.Status_DEPLOYED), + releaseStub("key-5", 1, "default", rspb.Status_SUPERSEDED), + releaseStub("key-6", 1, "default", rspb.Status_SUPERSEDED), + }...) + + // list all deleted releases + del, err := releases.List(func(rel *rspb.Release) bool { + return rel.Info.Status.Code == rspb.Status_DELETED + }) + // check + if err != nil { + t.Errorf("Failed to list deleted: %s", err) + } + if len(del) != 2 { + t.Errorf("Expected 2 deleted, got %d:\n%v\n", len(del), del) + } + + // list all deployed releases + dpl, err := releases.List(func(rel *rspb.Release) bool { + return rel.Info.Status.Code == rspb.Status_DEPLOYED + }) + // check + if err != nil { + t.Errorf("Failed to list deployed: %s", err) + } + if len(dpl) != 2 { + t.Errorf("Expected 2 deployed, got %d", len(dpl)) + } + + // list all superseded releases + ssd, err := releases.List(func(rel *rspb.Release) bool { + return rel.Info.Status.Code == rspb.Status_SUPERSEDED + }) + // check + if err != nil { + t.Errorf("Failed to list superseded: %s", err) + } + if len(ssd) != 2 { + t.Errorf("Expected 2 superseded, got %d", len(ssd)) + } +} + +func TestReleaseCreate(t *testing.T) { + releases := newTestFixtureReleases(t) + + vers := int32(1) + name := "smug-pigeon" + namespace := "default" + key := testKey(name, vers) + rel := releaseStub(name, vers, namespace, rspb.Status_DEPLOYED) + + // store the release in a configmap + if err := releases.Create(key, rel); err != nil { + t.Fatalf("Failed to create release with key %q: %s", key, err) + } + + // get the release back + got, err := releases.Get(key) + if err != nil { + t.Fatalf("Failed to get release with key %q: %s", key, err) + } + + // compare created release with original + if !reflect.DeepEqual(rel, got) { + t.Errorf("Expected {%q}, got {%q}", rel, got) + } +} + +func TestReleaseUpdate(t *testing.T) { + vers := int32(1) + name := "smug-pigeon" + namespace := "default" + key := testKey(name, vers) + rel := releaseStub(name, vers, namespace, rspb.Status_DEPLOYED) + + releases := newTestFixtureReleases(t, []*rspb.Release{rel}...) + + // modify release status code + rel.Info.Status.Code = rspb.Status_SUPERSEDED + + // perform the update + if err := releases.Update(key, rel); err != nil { + t.Fatalf("Failed to update release: %s", err) + } + + // fetch the updated release + got, err := releases.Get(key) + if err != nil { + t.Fatalf("Failed to get release with key %q: %s", key, err) + } + + // check release has actually been updated by comparing modified fields + if rel.Info.Status.Code != got.Info.Status.Code { + t.Errorf("Expected status %s, got status %s", rel.Info.Status.Code, got.Info.Status.Code) + } +} From 408f304965f1f1615f3680f97178c53d10ccab41 Mon Sep 17 00:00:00 2001 From: tamal Date: Tue, 21 Mar 2017 12:31:51 -0700 Subject: [PATCH 02/12] Remove appscode/log --- client/clientset/fake/client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/client/clientset/fake/client.go b/client/clientset/fake/client.go index 831f68632..83f74a841 100644 --- a/client/clientset/fake/client.go +++ b/client/clientset/fake/client.go @@ -1,7 +1,6 @@ package fake import ( - _ "github.com/appscode/log" "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/fake" "k8s.io/kubernetes/pkg/runtime" ) From 28eb45c43e3fbfb21c19e4d707ebb3f9623f8fab Mon Sep 17 00:00:00 2001 From: Tamal Saha Date: Tue, 21 Mar 2017 14:46:43 -0700 Subject: [PATCH 03/12] Store release data in cloud storage buckets. (#14) --- api/types.go | 14 +--- cmd/tiller/tiller.go | 117 ++++++++++++++++++++++++++- glide.yaml | 12 +++ pkg/storage/driver/releases.go | 139 +++++++++++++++++++++++++++++++-- 4 files changed, 258 insertions(+), 24 deletions(-) diff --git a/api/types.go b/api/types.go index a22b56db6..968d5cd87 100644 --- a/api/types.go +++ b/api/types.go @@ -34,19 +34,7 @@ type ReleaseSpec struct { // This is modelled like Volume in Pods, which allows specifying a chart // inline (like today) or pulling a chart object from a (potentially private) // chart registry similar to pulling a Docker image. - Data ReleaseData `json:"data,omitempty"` -} - -//------------------------------------------------------------------------------------------- -// Chart represents a chart that is installed in a Release. -// The ChartSource represents the location and type of a chart to install. -// This is modelled like Volume in Pods, which allows specifying a chart -// inline (like today) or pulling a chart object from a (potentially private) chart registry similar to pulling a Docker image. -// +optional -type ReleaseData struct { - // Inline charts are what is done today with Helm cli. Release request - // contains the chart definition in the release spec, sent by Helm cli. - Inline string `json:"inline,omitempty"` + Data string `json:"data,omitempty"` } type ReleaseStatus struct { diff --git a/cmd/tiller/tiller.go b/cmd/tiller/tiller.go index 8ab31f7c6..bcbb71abc 100644 --- a/cmd/tiller/tiller.go +++ b/cmd/tiller/tiller.go @@ -25,6 +25,11 @@ import ( "os" "strings" + "github.com/graymeta/stow" + "github.com/graymeta/stow/azure" + gcs "github.com/graymeta/stow/google" + "github.com/graymeta/stow/s3" + "github.com/graymeta/stow/swift" "github.com/spf13/cobra" rcs "k8s.io/helm/client/clientset" @@ -38,9 +43,10 @@ import ( ) const ( - storageMemory = "memory" - storageConfigMap = "configmap" - storageInlineTPR = "inline-tpr" + storageMemory = "memory" + storageConfigMap = "configmap" + storageInlineTPR = "inline-tpr" + storageObjectStoreTPR = "object-store-tpr" ) // rootServer is the root gRPC server. @@ -59,6 +65,24 @@ var ( traceAddr = ":44136" enableTracing = false store = storageConfigMap + + storageProvider string + s3ConfigAccessKeyID string + s3ConfigEndpoint string + s3ConfigRegion string + s3ConfigSecretKey string + gcsConfigJSONKeyPath string + gcsConfigProjectId string + gcsConfigScopes string + azureConfigAccount string + azureConfigKey string + swiftConfigKey string + swiftConfigTenantAuthURL string + swiftConfigTenantName string + swiftConfigUsername string + + container string + storagePrefix string ) const globalUsage = `The Kubernetes Helm server. @@ -85,6 +109,28 @@ func main() { p.StringVar(&store, "storage", storageConfigMap, "storage driver to use. One of 'configmap' or 'memory'") p.BoolVar(&enableTracing, "trace", false, "enable rpc tracing") + p.StringVar(&storageProvider, "provider", "", "Cloud storage provider") + + p.StringVar(&s3ConfigAccessKeyID, s3.Kind+"."+s3.ConfigAccessKeyID, "", "S3 config access key id") + p.StringVar(&s3ConfigEndpoint, s3.Kind+"."+s3.ConfigEndpoint, "", "S3 config endpoint") + p.StringVar(&s3ConfigRegion, s3.Kind+"."+s3.ConfigRegion, "", "S3 config region") + p.StringVar(&s3ConfigSecretKey, s3.Kind+"."+s3.ConfigSecretKey, "", "S3 config secret key") + + p.StringVar(&gcsConfigJSONKeyPath, gcs.Kind+".json_key_path", "", "GCS config json key path") + p.StringVar(&gcsConfigProjectId, gcs.Kind+"."+gcs.ConfigProjectId, "", "GCS config project id") + p.StringVar(&gcsConfigScopes, gcs.Kind+"."+gcs.ConfigScopes, "", "GCS config scopes") + + p.StringVar(&azureConfigAccount, azure.Kind+"."+azure.ConfigAccount, "", "Azure config account") + p.StringVar(&azureConfigKey, azure.Kind+"."+azure.ConfigKey, "", "Azure config key") + + p.StringVar(&swiftConfigKey, swift.Kind+"."+swift.ConfigKey, "", "Swift config key") + p.StringVar(&swiftConfigTenantAuthURL, swift.Kind+"."+swift.ConfigTenantAuthURL, "", "Swift teanant auth url") + p.StringVar(&swiftConfigTenantName, swift.Kind+"."+swift.ConfigTenantName, "", "Swift tenant name") + p.StringVar(&swiftConfigUsername, swift.Kind+"."+swift.ConfigUsername, "", "Swift username") + + p.StringVar(&container, "container", "", "Name of container") + p.StringVar(&storagePrefix, "storage-prefix", "tiller", "Prefix to container key where release data is stored") + if err := rootCommand.Execute(); err != nil { fmt.Fprint(os.Stderr, err) os.Exit(1) @@ -112,6 +158,71 @@ func start(c *cobra.Command, args []string) { case storageInlineTPR: cs := rcs.NewExtensionsForConfigOrDie(clientcfg) env.Releases = storage.Init(driver.NewReleases(cs.Release(namespace()))) + case storageObjectStoreTPR: + stowCfg := stow.ConfigMap{} + switch storageProvider { + case s3.Kind: + if s3ConfigAccessKeyID != "" { + stowCfg[s3.ConfigAccessKeyID] = s3ConfigAccessKeyID + } + if s3ConfigEndpoint != "" { + stowCfg[s3.ConfigEndpoint] = s3ConfigEndpoint + } + if s3ConfigRegion != "" { + stowCfg[s3.ConfigRegion] = s3ConfigRegion + } + if s3ConfigSecretKey != "" { + stowCfg[s3.ConfigSecretKey] = s3ConfigSecretKey + } + case gcs.Kind: + if gcsConfigJSONKeyPath != "" { + jsonKey, err := ioutil.ReadFile(gcsConfigJSONKeyPath) + fmt.Fprintf(os.Stderr, "Cannot read json key file: %s\n", err) + os.Exit(1) + stowCfg[gcs.ConfigJSON] = string(jsonKey) + } + if gcsConfigProjectId != "" { + stowCfg[gcs.ConfigProjectId] = gcsConfigProjectId + } + if gcsConfigScopes != "" { + stowCfg[gcs.ConfigScopes] = gcsConfigScopes + } + case azure.Kind: + if azureConfigAccount != "" { + stowCfg[azure.ConfigAccount] = azureConfigAccount + } + if azureConfigKey != "" { + stowCfg[azure.ConfigKey] = azureConfigKey + } + case swift.Kind: + if swiftConfigKey != "" { + stowCfg[swift.ConfigKey] = swiftConfigKey + } + if swiftConfigTenantAuthURL != "" { + stowCfg[swift.ConfigTenantAuthURL] = swiftConfigTenantAuthURL + } + if swiftConfigTenantName != "" { + stowCfg[swift.ConfigTenantName] = swiftConfigTenantName + } + if swiftConfigUsername != "" { + stowCfg[swift.ConfigUsername] = swiftConfigUsername + } + default: + fmt.Fprintf(os.Stderr, "Unknown provider: %s\n", storageProvider) + os.Exit(1) + } + loc, err := stow.Dial(storageProvider, stowCfg) + if err != nil { + fmt.Fprintf(os.Stderr, "Cannot connect to object store: %s\n", err) + os.Exit(1) + } + c, err := loc.Container(container) + if err != nil { + fmt.Fprintf(os.Stderr, "Cannot find container: %s\n", err) + os.Exit(1) + } + cs := rcs.NewExtensionsForConfigOrDie(clientcfg) + env.Releases = storage.Init(driver.NewObjectStoreReleases(cs.Release(namespace()), c, storagePrefix)) } lstn, err := net.Listen("tcp", grpcAddr) diff --git a/glide.yaml b/glide.yaml index f8a7af4f4..eba8902f1 100644 --- a/glide.yaml +++ b/glide.yaml @@ -69,3 +69,15 @@ import: version: ~0.1.0 - package: github.com/naoina/go-stringutil version: ~0.1.0 +- package: github.com/graymeta/stow + repo: https://github.com/appscode/stow.git + version: master + vcs: git +- package: github.com/aws/aws-sdk-go + version: v1.7.8 +- package: google.golang.org/api/storage/v1 + version: master +- package: github.com/Azure/azure-sdk-for-go + vcs: git + version: v7.0.1-beta +- package: github.com/ncw/swift diff --git a/pkg/storage/driver/releases.go b/pkg/storage/driver/releases.go index 459b5732c..960b87b0e 100644 --- a/pkg/storage/driver/releases.go +++ b/pkg/storage/driver/releases.go @@ -17,8 +17,9 @@ limitations under the License. package driver // import "k8s.io/helm/pkg/storage/driver" import ( + "bytes" "fmt" - rapi "k8s.io/helm/api" + "io" "strconv" "strings" "time" @@ -29,6 +30,8 @@ import ( kblabels "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/validation" + "github.com/graymeta/stow" + rapi "k8s.io/helm/api" "k8s.io/helm/client/clientset" rspb "k8s.io/helm/pkg/proto/hapi/release" "k8s.io/kubernetes/pkg/api/unversioned" @@ -42,7 +45,9 @@ const ReleasesDriverName = "helm.sh/Release" // Releases is a wrapper around an implementation of a kubernetes // ReleasesInterface. type Releases struct { - impl clientset.ReleaseInterface + impl clientset.ReleaseInterface + container stow.Container + prefix string } // NewReleases initializes a new Releases wrapping an implmenetation of @@ -51,6 +56,14 @@ func NewReleases(impl clientset.ReleaseInterface) *Releases { return &Releases{impl: impl} } +func NewObjectStoreReleases(impl clientset.ReleaseInterface, c stow.Container, prefix string) *Releases { + p := prefix + if prefix == "" { + p = "tiller" + } + return &Releases{impl: impl, container: c, prefix: p} +} + // Name returns the name of the driver. func (releases *Releases) Name() string { return ReleasesDriverName @@ -69,8 +82,13 @@ func (releases *Releases) Get(key string) (*rspb.Release, error) { logerrf(err, "get: failed to get %q", key) return nil, err } + // found the release, decode the base64 data string - r, err := decodeRelease(obj.Spec.Data.Inline) + data, err := releases.getReleaseData(obj) + if err != nil { + return nil, err + } + r, err := decodeRelease(data) if err != nil { logerrf(err, "get: failed to decode data %q", key) return nil, err @@ -97,7 +115,11 @@ func (releases *Releases) List(filter func(*rspb.Release) bool) ([]*rspb.Release // iterate over the releases object list // and decode each release for _, item := range list.Items { - rls, err := decodeRelease(item.Spec.Data.Inline) + data, err := releases.getReleaseData(&item) + if err != nil { + return nil, err + } + rls, err := decodeRelease(data) if err != nil { logerrf(err, "list: failed to decode release: %v", item) continue @@ -134,7 +156,11 @@ func (releases *Releases) Query(labels map[string]string) ([]*rspb.Release, erro var results []*rspb.Release for _, item := range list.Items { - rls, err := decodeRelease(item.Spec.Data.Inline) + data, err := releases.getReleaseData(&item) + if err != nil { + return nil, err + } + rls, err := decodeRelease(data) if err != nil { logerrf(err, "query: failed to decode release: %s", err) continue @@ -159,6 +185,12 @@ func (releases *Releases) Create(key string, rls *rspb.Release) error { logerrf(err, "create: failed to encode release %q", rls.Name) return err } + // push the release object data to object store if configured + err = releases.writeReleaseData(obj) + if err != nil { + logerrf(err, "create: failed to encode release %q", rls.Name) + return err + } // push the release object out into the kubiverse if _, err := releases.impl.Create(obj); err != nil { if kberrs.IsAlreadyExists(err) { @@ -186,6 +218,12 @@ func (releases *Releases) Update(key string, rls *rspb.Release) error { logerrf(err, "update: failed to encode release %q", rls.Name) return err } + // push the release object data to object store if configured + err = releases.writeReleaseData(obj) + if err != nil { + logerrf(err, "create: failed to encode release %q", rls.Name) + return err + } // push the release object out into the kubiverse _, err = releases.impl.Update(obj) if err != nil { @@ -207,12 +245,99 @@ func (releases *Releases) Delete(key string) (rls *rspb.Release, err error) { return nil, err } // delete the release + if err = releases.deleteReleaseData(rls); err != nil { + return rls, err + } if err = releases.impl.Delete(key); err != nil { return rls, err } return rls, nil } +func (releases *Releases) itemIDFromTPR(rls *rapi.Release) string { + return fmt.Sprintf("%v/releases/%v/versions%v", releases.prefix, rls.Name, rls.Spec.Version) +} + +func (releases *Releases) itemIDFromProto(rls *rspb.Release) string { + return fmt.Sprintf("%v/releases/%v/versions%v", releases.prefix, rls.Name, rls.Version) +} + +func (releases *Releases) deleteReleaseData(rls *rspb.Release) error { + if releases.container != nil { + return releases.container.RemoveItem(releases.itemIDFromProto(rls)) + } + return nil +} + +func (releases *Releases) writeReleaseData(rls *rapi.Release) error { + if releases.container != nil { + b := bytes.NewBufferString(rls.Spec.Data) + sz := len(rls.Spec.Data) + rls.Spec.Data = "" + _, err := releases.container.Put(releases.itemIDFromTPR(rls), b, int64(sz), nil) + if err != nil { + return err + } + } + return nil +} + +func (releases *Releases) getReleaseData(rls *rapi.Release) (string, error) { + if rls.Spec.Data != "" { + return rls.Spec.Data, nil + } else if releases.container != nil { + item, err := releases.container.Item(releases.itemIDFromTPR(rls)) + if err != nil { + return "", err + } + + f, err := item.Open() + if err != nil { + return "", err + } + defer f.Close() + // It's a good but not certain bet that FileInfo will tell us exactly how much to + // read, so let's try it but be prepared for the answer to be wrong. + var n int64 + // Don't preallocate a huge buffer, just in case. + if size, err := item.Size(); err != nil && size < 1e9 { + n = size + } + // As initial capacity for readAll, use n + a little extra in case Size is zero, + // and to avoid another allocation after Read has filled the buffer. The readAll + // call will read into its allocated internal buffer cheaply. If the size was + // wrong, we'll either waste some space off the end or reallocate as needed, but + // in the overwhelmingly common case we'll get it just right. + b, err := readAll(f, n+bytes.MinRead) + if err != nil { + return "", err + } + return string(b), nil + } + return "", fmt.Errorf("Missing release data for %v", rls.Name) +} + +// readAll reads from r until an error or EOF and returns the data it read +// from the internal buffer allocated with a specified capacity. +func readAll(r io.Reader, capacity int64) (b []byte, err error) { + buf := bytes.NewBuffer(make([]byte, 0, capacity)) + // If the buffer overflows, we will get bytes.ErrTooLarge. + // Return that as an error. Any other panic remains. + defer func() { + e := recover() + if e == nil { + return + } + if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge { + err = panicErr + } else { + panic(e) + } + }() + _, err = buf.ReadFrom(r) + return buf.Bytes(), err +} + // newReleasesObject constructs a kubernetes Release object // to store a release. Each release data entry is the base64 // encoded string of a release's binary protobuf encoding. @@ -254,9 +379,7 @@ func newReleasesObject(key string, rls *rspb.Release, lbs labels) (*rapi.Release Spec: rapi.ReleaseSpec{ Config: rls.Config, Version: rls.Version, - Data: rapi.ReleaseData{ - Inline: s, - }, + Data: s, }, Status: rapi.ReleaseStatus{ FirstDeployed: toKubeTime(rls.Info.FirstDeployed), From 705aa009bb58265230b798c2a7c4ed947f9b50da Mon Sep 17 00:00:00 2001 From: tamal Date: Tue, 21 Mar 2017 14:59:48 -0700 Subject: [PATCH 04/12] Ensure TPR is created. --- cmd/tiller/tiller.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/cmd/tiller/tiller.go b/cmd/tiller/tiller.go index bcbb71abc..c7d92933e 100644 --- a/cmd/tiller/tiller.go +++ b/cmd/tiller/tiller.go @@ -31,7 +31,10 @@ import ( "github.com/graymeta/stow/s3" "github.com/graymeta/stow/swift" "github.com/spf13/cobra" + kberrs "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/apis/extensions" + rapi "k8s.io/helm/api" rcs "k8s.io/helm/client/clientset" "k8s.io/helm/pkg/kube" "k8s.io/helm/pkg/proto/hapi/services" @@ -40,6 +43,9 @@ import ( "k8s.io/helm/pkg/tiller" "k8s.io/helm/pkg/tiller/environment" "k8s.io/helm/pkg/version" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" ) const ( @@ -156,9 +162,11 @@ func start(c *cobra.Command, args []string) { case storageConfigMap: env.Releases = storage.Init(driver.NewConfigMaps(clientset.Core().ConfigMaps(namespace()))) case storageInlineTPR: + ensureResource(clientset) cs := rcs.NewExtensionsForConfigOrDie(clientcfg) env.Releases = storage.Init(driver.NewReleases(cs.Release(namespace()))) case storageObjectStoreTPR: + ensureResource(clientset) stowCfg := stow.ConfigMap{} switch storageProvider { case s3.Kind: @@ -281,3 +289,28 @@ func namespace() string { return environment.DefaultTillerNamespace } + +func ensureResource(clientset *internalclientset.Clientset) { + _, err := clientset.Extensions().ThirdPartyResources().Get("release." + rapi.V1beta1SchemeGroupVersion.Group) + if kberrs.IsNotFound(err) { + tpr := &extensions.ThirdPartyResource{ + TypeMeta: unversioned.TypeMeta{ + APIVersion: "extensions/v1beta1", + Kind: "ThirdPartyResource", + }, + ObjectMeta: api.ObjectMeta{ + Name: "release." + rapi.V1beta1SchemeGroupVersion.Group, + }, + Versions: []extensions.APIVersion{ + { + Name: rapi.V1beta1SchemeGroupVersion.Version, + }, + }, + } + _, err := clientset.Extensions().ThirdPartyResources().Create(tpr) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to create third party resource: %s\n", err) + os.Exit(1) + } + } +} From 4cdf63f10ccdb2cb3344f78ebc1ee41f63e69fd1 Mon Sep 17 00:00:00 2001 From: tamal Date: Tue, 21 Mar 2017 15:27:02 -0700 Subject: [PATCH 05/12] Change api version to v1alpha1 --- api/extensions/helm.yaml | 4 ++-- api/install/install.go | 4 ++-- api/register_v1beta1.go | 10 +++++----- client/clientset/codec.go | 4 ++-- client/clientset/extensions.go | 8 ++++---- client/clientset/fake/release.go | 2 +- cmd/tiller/tiller.go | 8 ++++---- 7 files changed, 20 insertions(+), 20 deletions(-) diff --git a/api/extensions/helm.yaml b/api/extensions/helm.yaml index 1479a2295..d9d9f7f2d 100644 --- a/api/extensions/helm.yaml +++ b/api/extensions/helm.yaml @@ -1,7 +1,7 @@ metadata: name: release.helm.sh -apiVersion: extensions/v1beta1 +apiVersion: extensions/v1alpha1 kind: ThirdPartyResource description: "A specification of Helm release" versions: - - name: v1beta1 + - name: v1alpha1 diff --git a/api/install/install.go b/api/install/install.go index e083d0c82..4673a87c6 100644 --- a/api/install/install.go +++ b/api/install/install.go @@ -10,13 +10,13 @@ func init() { if err := announced.NewGroupMetaFactory( &announced.GroupMetaFactoryArgs{ GroupName: aci.GroupName, - VersionPreferenceOrder: []string{aci.V1beta1SchemeGroupVersion.Version}, + VersionPreferenceOrder: []string{aci.V1alpha1SchemeGroupVersion.Version}, ImportPrefix: "k8s.io/helm/api", RootScopedKinds: sets.NewString("ThirdPartyResource"), AddInternalObjectsToScheme: aci.AddToScheme, }, announced.VersionToSchemeFunc{ - aci.V1beta1SchemeGroupVersion.Version: aci.V1betaAddToScheme, + aci.V1alpha1SchemeGroupVersion.Version: aci.V1betaAddToScheme, }, ).Announce().RegisterAndEnable(); err != nil { panic(err) diff --git a/api/register_v1beta1.go b/api/register_v1beta1.go index 1f5c98929..321c66d67 100644 --- a/api/register_v1beta1.go +++ b/api/register_v1beta1.go @@ -8,22 +8,22 @@ import ( ) // SchemeGroupVersion is group version used to register these objects -var V1beta1SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1beta1"} +var V1alpha1SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1alpha1"} var ( - V1beta1SchemeBuilder = runtime.NewSchemeBuilder(v1addKnownTypes) - V1betaAddToScheme = V1beta1SchemeBuilder.AddToScheme + V1alpha1SchemeBuilder = runtime.NewSchemeBuilder(v1addKnownTypes) + V1betaAddToScheme = V1alpha1SchemeBuilder.AddToScheme ) // Adds the list of known types to api.Scheme. func v1addKnownTypes(scheme *runtime.Scheme) error { - scheme.AddKnownTypes(V1beta1SchemeGroupVersion, + scheme.AddKnownTypes(V1alpha1SchemeGroupVersion, &Release{}, &ReleaseList{}, &v1.ListOptions{}, &v1.DeleteOptions{}, ) - versionedwatch.AddToGroupVersion(scheme, V1beta1SchemeGroupVersion) + versionedwatch.AddToGroupVersion(scheme, V1alpha1SchemeGroupVersion) return nil } diff --git a/client/clientset/codec.go b/client/clientset/codec.go index 989c544a4..0dd5056f6 100644 --- a/client/clientset/codec.go +++ b/client/clientset/codec.go @@ -145,8 +145,8 @@ func (c *extendedCodec) EncodeParameters(obj runtime.Object, to schema.GroupVers func setDefaultVersionKind(obj runtime.Object) { // Check the values can are In type Extended Ingress defaultGVK := schema.GroupVersionKind{ - Group: aci.V1beta1SchemeGroupVersion.Group, - Version: aci.V1beta1SchemeGroupVersion.Version, + Group: aci.V1alpha1SchemeGroupVersion.Group, + Version: aci.V1alpha1SchemeGroupVersion.Version, } fullyQualifiedKind := reflect.ValueOf(obj).Type().String() diff --git a/client/clientset/extensions.go b/client/clientset/extensions.go index 9e76ebdf9..b5cb20953 100644 --- a/client/clientset/extensions.go +++ b/client/clientset/extensions.go @@ -56,19 +56,19 @@ func NewExtensionsForConfigOrDie(c *rest.Config) *ExtensionsClient { return client } -// New creates a new ExtensionsV1beta1Client for the given RESTClient. +// New creates a new ExtensionsV1alpha1Client for the given RESTClient. func NewNewExtensions(c rest.Interface) *ExtensionsClient { return &ExtensionsClient{c} } func setExtensionsDefaults(config *rest.Config) error { - gv, err := schema.ParseGroupVersion("helm.sh/v1beta1") + gv, err := schema.ParseGroupVersion("helm.sh/v1alpha1") if err != nil { return err } - // if helm.sh/v1beta1 is not enabled, return an error + // if helm.sh/v1alpha1 is not enabled, return an error if !registered.IsEnabledVersion(gv) { - return fmt.Errorf("helm.sh/v1beta1 is not enabled") + return fmt.Errorf("helm.sh/v1alpha1 is not enabled") } config.APIPath = defaultAPIPath if config.UserAgent == "" { diff --git a/client/clientset/fake/release.go b/client/clientset/fake/release.go index c89e8eb1d..439d11a7b 100644 --- a/client/clientset/fake/release.go +++ b/client/clientset/fake/release.go @@ -14,7 +14,7 @@ type FakeRelease struct { ns string } -var certResource = schema.GroupVersionResource{Group: "helm.sh", Version: "v1beta1", Resource: "releases"} +var certResource = schema.GroupVersionResource{Group: "helm.sh", Version: "v1alpha1", Resource: "releases"} // Get returns the Release by name. func (mock *FakeRelease) Get(name string) (*aci.Release, error) { diff --git a/cmd/tiller/tiller.go b/cmd/tiller/tiller.go index c7d92933e..a8ab51d5c 100644 --- a/cmd/tiller/tiller.go +++ b/cmd/tiller/tiller.go @@ -291,19 +291,19 @@ func namespace() string { } func ensureResource(clientset *internalclientset.Clientset) { - _, err := clientset.Extensions().ThirdPartyResources().Get("release." + rapi.V1beta1SchemeGroupVersion.Group) + _, err := clientset.Extensions().ThirdPartyResources().Get("release." + rapi.V1alpha1SchemeGroupVersion.Group) if kberrs.IsNotFound(err) { tpr := &extensions.ThirdPartyResource{ TypeMeta: unversioned.TypeMeta{ - APIVersion: "extensions/v1beta1", + APIVersion: "extensions/v1alpha1", Kind: "ThirdPartyResource", }, ObjectMeta: api.ObjectMeta{ - Name: "release." + rapi.V1beta1SchemeGroupVersion.Group, + Name: "release." + rapi.V1alpha1SchemeGroupVersion.Group, }, Versions: []extensions.APIVersion{ { - Name: rapi.V1beta1SchemeGroupVersion.Version, + Name: rapi.V1alpha1SchemeGroupVersion.Version, }, }, } From 6f65233cdbaf90cfc5d31f4fb9317a46f17c5ded Mon Sep 17 00:00:00 2001 From: tamal Date: Tue, 21 Mar 2017 22:01:11 -0700 Subject: [PATCH 06/12] Fix unit tests. --- api/register_v1beta1.go | 2 +- client/clientset/fake/extensions.go | 3 ++- client/clientset/fake/release.go | 21 +++++++++++++++------ pkg/storage/driver/releases.go | 4 ++++ pkg/storage/driver/releases_test.go | 9 +++++---- 5 files changed, 27 insertions(+), 12 deletions(-) diff --git a/api/register_v1beta1.go b/api/register_v1beta1.go index 321c66d67..404b3e93e 100644 --- a/api/register_v1beta1.go +++ b/api/register_v1beta1.go @@ -12,7 +12,7 @@ var V1alpha1SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: var ( V1alpha1SchemeBuilder = runtime.NewSchemeBuilder(v1addKnownTypes) - V1betaAddToScheme = V1alpha1SchemeBuilder.AddToScheme + V1betaAddToScheme = V1alpha1SchemeBuilder.AddToScheme ) // Adds the list of known types to api.Scheme. diff --git a/client/clientset/fake/extensions.go b/client/clientset/fake/extensions.go index 209162f93..4f0d49d91 100644 --- a/client/clientset/fake/extensions.go +++ b/client/clientset/fake/extensions.go @@ -1,12 +1,13 @@ package fake import ( - "k8s.io/helm/client/clientset" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apimachinery/registered" testing "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/watch" + + "k8s.io/helm/client/clientset" ) type FakeExtensionClient struct { diff --git a/client/clientset/fake/release.go b/client/clientset/fake/release.go index 439d11a7b..bfbe8ff59 100644 --- a/client/clientset/fake/release.go +++ b/client/clientset/fake/release.go @@ -50,9 +50,12 @@ func (mock *FakeRelease) List(opts api.ListOptions) (*aci.ReleaseList, error) { } // Create creates a new Release. -func (mock *FakeRelease) Create(svc *aci.Release) (*aci.Release, error) { +func (mock *FakeRelease) Create(r *aci.Release) (*aci.Release, error) { + if r != nil { + r.Namespace = mock.ns + } obj, err := mock.Fake. - Invokes(testing.NewCreateAction(certResource, mock.ns, svc), &aci.Release{}) + Invokes(testing.NewCreateAction(certResource, mock.ns, r), &aci.Release{}) if obj == nil { return nil, err @@ -61,9 +64,12 @@ func (mock *FakeRelease) Create(svc *aci.Release) (*aci.Release, error) { } // Update updates a Release. -func (mock *FakeRelease) Update(svc *aci.Release) (*aci.Release, error) { +func (mock *FakeRelease) Update(r *aci.Release) (*aci.Release, error) { + if r != nil { + r.Namespace = mock.ns + } obj, err := mock.Fake. - Invokes(testing.NewUpdateAction(certResource, mock.ns, svc), &aci.Release{}) + Invokes(testing.NewUpdateAction(certResource, mock.ns, r), &aci.Release{}) if obj == nil { return nil, err @@ -79,9 +85,12 @@ func (mock *FakeRelease) Delete(name string) error { return err } -func (mock *FakeRelease) UpdateStatus(srv *aci.Release) (*aci.Release, error) { +func (mock *FakeRelease) UpdateStatus(r *aci.Release) (*aci.Release, error) { + if r != nil { + r.Namespace = mock.ns + } obj, err := mock.Fake. - Invokes(testing.NewUpdateSubresourceAction(certResource, "status", mock.ns, srv), &aci.Release{}) + Invokes(testing.NewUpdateSubresourceAction(certResource, "status", mock.ns, r), &aci.Release{}) if obj == nil { return nil, err diff --git a/pkg/storage/driver/releases.go b/pkg/storage/driver/releases.go index 960b87b0e..9bf5ebb5d 100644 --- a/pkg/storage/driver/releases.go +++ b/pkg/storage/driver/releases.go @@ -372,6 +372,10 @@ func newReleasesObject(key string, rls *rspb.Release, lbs labels) (*rapi.Release // create and return release object r := &rapi.Release{ + TypeMeta: unversioned.TypeMeta{ + Kind: "Release", + APIVersion: "helm.sh/v1alpha1", + }, ObjectMeta: api.ObjectMeta{ Name: key, Labels: lbs.toMap(), diff --git a/pkg/storage/driver/releases_test.go b/pkg/storage/driver/releases_test.go index 4e51424a3..1ebc05847 100644 --- a/pkg/storage/driver/releases_test.go +++ b/pkg/storage/driver/releases_test.go @@ -19,11 +19,11 @@ import ( "testing" "github.com/gogo/protobuf/proto" + "k8s.io/kubernetes/pkg/runtime" _ "k8s.io/helm/api/install" "k8s.io/helm/client/clientset/fake" rspb "k8s.io/helm/pkg/proto/hapi/release" - "k8s.io/kubernetes/pkg/runtime" ) // newTestFixture initializes a FakeReleaseInterface. @@ -42,6 +42,7 @@ func initFakeTPRs(t *testing.T, releases ...*rspb.Release) []runtime.Object { if err != nil { t.Fatalf("Failed to create configmap: %s", err) } + r.Namespace = "default" var obj runtime.Object = r objects = append(objects, obj) } @@ -83,7 +84,7 @@ func TestUNcompressedReleaseGet(t *testing.T) { rel := releaseStub(name, vers, namespace, rspb.Status_DEPLOYED) // Create a test fixture which contains an uncompressed release - cfgmap, err := newReleasesObject(key, rel, nil) + r, err := newReleasesObject(key, rel, nil) if err != nil { t.Fatalf("Failed to create configmap: %s", err) } @@ -91,8 +92,8 @@ func TestUNcompressedReleaseGet(t *testing.T) { if err != nil { t.Fatalf("Failed to marshal release: %s", err) } - cfgmap.Spec.Data.Inline = base64.StdEncoding.EncodeToString(b) - releases := NewReleases(fake.NewFakeExtensionClient(initFakeTPRs(t, rel)...).Releases("test")) + r.Spec.Data = base64.StdEncoding.EncodeToString(b) + releases := NewReleases(fake.NewFakeExtensionClient(initFakeTPRs(t, rel)...).Releases("default")) // get release with key got, err := releases.Get(key) From 41f4a70965720d195c4c9fad9fbfa5dbd1a80236 Mon Sep 17 00:00:00 2001 From: tamal Date: Wed, 22 Mar 2017 09:59:57 -0700 Subject: [PATCH 07/12] Convert release key into TPR safe key --- client/clientset/codec.go | 5 +++-- pkg/storage/driver/releases.go | 26 ++++++++++++++++++++------ 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/client/clientset/codec.go b/client/clientset/codec.go index 0dd5056f6..fd4e12afa 100644 --- a/client/clientset/codec.go +++ b/client/clientset/codec.go @@ -3,17 +3,18 @@ package clientset import ( "encoding/json" "io" + "log" "net/url" "reflect" "strings" "github.com/ghodss/yaml" - aci "k8s.io/helm/api" "k8s.io/kubernetes/pkg/api" schema "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/runtime" kubejson "k8s.io/kubernetes/pkg/runtime/serializer/json" - "log" + + aci "k8s.io/helm/api" ) // TODO(@sadlil): Find a better way to replace ExtendedCodec to encode and decode objects. diff --git a/pkg/storage/driver/releases.go b/pkg/storage/driver/releases.go index 9bf5ebb5d..86b92a58e 100644 --- a/pkg/storage/driver/releases.go +++ b/pkg/storage/driver/releases.go @@ -20,21 +20,22 @@ import ( "bytes" "fmt" "io" + "regexp" "strconv" "strings" "time" google_protobuf "github.com/golang/protobuf/ptypes/timestamp" + "github.com/graymeta/stow" "k8s.io/kubernetes/pkg/api" kberrs "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" kblabels "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/validation" - "github.com/graymeta/stow" rapi "k8s.io/helm/api" "k8s.io/helm/client/clientset" rspb "k8s.io/helm/pkg/proto/hapi/release" - "k8s.io/kubernetes/pkg/api/unversioned" ) var _ Driver = (*Releases)(nil) @@ -73,7 +74,7 @@ func (releases *Releases) Name() string { // or error if not found. func (releases *Releases) Get(key string) (*rspb.Release, error) { // fetch the release holding the release named by key - obj, err := releases.impl.Get(key) + obj, err := releases.impl.Get(toTPRSafeKey(key)) if err != nil { if kberrs.IsNotFound(err) { return nil, ErrReleaseNotFound @@ -180,7 +181,7 @@ func (releases *Releases) Create(key string, rls *rspb.Release) error { lbs.set("CREATED_AT", strconv.Itoa(int(time.Now().Unix()))) // create a new release to hold the release - obj, err := newReleasesObject(key, rls, lbs) + obj, err := newReleasesObject(toTPRSafeKey(key), rls, lbs) if err != nil { logerrf(err, "create: failed to encode release %q", rls.Name) return err @@ -213,7 +214,7 @@ func (releases *Releases) Update(key string, rls *rspb.Release) error { lbs.set("MODIFIED_AT", strconv.Itoa(int(time.Now().Unix()))) // create a new release object to hold the release - obj, err := newReleasesObject(key, rls, lbs) + obj, err := newReleasesObject(toTPRSafeKey(key), rls, lbs) if err != nil { logerrf(err, "update: failed to encode release %q", rls.Name) return err @@ -236,7 +237,7 @@ func (releases *Releases) Update(key string, rls *rspb.Release) error { // Delete deletes the Release holding the release named by key. func (releases *Releases) Delete(key string) (rls *rspb.Release, err error) { // fetch the release to check existence - if rls, err = releases.Get(key); err != nil { + if rls, err = releases.Get(toTPRSafeKey(key)); err != nil { if kberrs.IsNotFound(err) { return nil, ErrReleaseNotFound } @@ -317,6 +318,19 @@ func (releases *Releases) getReleaseData(rls *rapi.Release) (string, error) { return "", fmt.Errorf("Missing release data for %v", rls.Name) } +var ( + protoRegex = regexp.MustCompile(`^[a-z0-9][-a-z0-9]*.v[0-9]+$`) +) + +func toTPRSafeKey(key string) string { + if protoRegex.MatchString(key) { + i := strings.LastIndex(key, ".v") + return key[:i] + "-" + key[i+1:] + } else { + return key + } +} + // readAll reads from r until an error or EOF and returns the data it read // from the internal buffer allocated with a specified capacity. func readAll(r io.Reader, capacity int64) (b []byte, err error) { From 87af54909824303920edf839a1b5e2869a7d0623 Mon Sep 17 00:00:00 2001 From: tamal Date: Wed, 22 Mar 2017 10:21:05 -0700 Subject: [PATCH 08/12] Load storage flags from env --- cmd/tiller/tiller.go | 33 ++++++++++++++------------------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/cmd/tiller/tiller.go b/cmd/tiller/tiller.go index a8ab51d5c..550f95ade 100644 --- a/cmd/tiller/tiller.go +++ b/cmd/tiller/tiller.go @@ -79,7 +79,6 @@ var ( s3ConfigSecretKey string gcsConfigJSONKeyPath string gcsConfigProjectId string - gcsConfigScopes string azureConfigAccount string azureConfigKey string swiftConfigKey string @@ -115,26 +114,25 @@ func main() { p.StringVar(&store, "storage", storageConfigMap, "storage driver to use. One of 'configmap' or 'memory'") p.BoolVar(&enableTracing, "trace", false, "enable rpc tracing") - p.StringVar(&storageProvider, "provider", "", "Cloud storage provider") + p.StringVar(&storageProvider, "storage-provider", os.Getenv("STORAGE_PROVIDER"), "Cloud storage provider") - p.StringVar(&s3ConfigAccessKeyID, s3.Kind+"."+s3.ConfigAccessKeyID, "", "S3 config access key id") - p.StringVar(&s3ConfigEndpoint, s3.Kind+"."+s3.ConfigEndpoint, "", "S3 config endpoint") - p.StringVar(&s3ConfigRegion, s3.Kind+"."+s3.ConfigRegion, "", "S3 config region") - p.StringVar(&s3ConfigSecretKey, s3.Kind+"."+s3.ConfigSecretKey, "", "S3 config secret key") + p.StringVar(&s3ConfigAccessKeyID, s3.Kind+"."+s3.ConfigAccessKeyID, os.Getenv("S3_ACCESS_KEY_ID"), "S3 config access key id") + p.StringVar(&s3ConfigEndpoint, s3.Kind+"."+s3.ConfigEndpoint, os.Getenv("S3_ENDPOINT"), "S3 config endpoint") + p.StringVar(&s3ConfigRegion, s3.Kind+"."+s3.ConfigRegion, os.Getenv("S3_REGION"), "S3 config region") + p.StringVar(&s3ConfigSecretKey, s3.Kind+"."+s3.ConfigSecretKey, os.Getenv("S3_SECRET_KEY"), "S3 config secret key") - p.StringVar(&gcsConfigJSONKeyPath, gcs.Kind+".json_key_path", "", "GCS config json key path") - p.StringVar(&gcsConfigProjectId, gcs.Kind+"."+gcs.ConfigProjectId, "", "GCS config project id") - p.StringVar(&gcsConfigScopes, gcs.Kind+"."+gcs.ConfigScopes, "", "GCS config scopes") + p.StringVar(&gcsConfigJSONKeyPath, gcs.Kind+".json_key_path", os.Getenv("GOOGLE_JSON_KEY_PATH"), "GCS config json key path") + p.StringVar(&gcsConfigProjectId, gcs.Kind+"."+gcs.ConfigProjectId, os.Getenv("GOOGLE_PROJECT_ID"), "GCS config project id") - p.StringVar(&azureConfigAccount, azure.Kind+"."+azure.ConfigAccount, "", "Azure config account") - p.StringVar(&azureConfigKey, azure.Kind+"."+azure.ConfigKey, "", "Azure config key") + p.StringVar(&azureConfigAccount, azure.Kind+"."+azure.ConfigAccount, os.Getenv("AZURE_ACCOUNT"), "Azure config account") + p.StringVar(&azureConfigKey, azure.Kind+"."+azure.ConfigKey, os.Getenv("AZURE_KEY"), "Azure config key") - p.StringVar(&swiftConfigKey, swift.Kind+"."+swift.ConfigKey, "", "Swift config key") - p.StringVar(&swiftConfigTenantAuthURL, swift.Kind+"."+swift.ConfigTenantAuthURL, "", "Swift teanant auth url") - p.StringVar(&swiftConfigTenantName, swift.Kind+"."+swift.ConfigTenantName, "", "Swift tenant name") - p.StringVar(&swiftConfigUsername, swift.Kind+"."+swift.ConfigUsername, "", "Swift username") + p.StringVar(&swiftConfigKey, swift.Kind+"."+swift.ConfigKey, os.Getenv("SWIFT_KEY"), "Swift config key") + p.StringVar(&swiftConfigTenantAuthURL, swift.Kind+"."+swift.ConfigTenantAuthURL, os.Getenv("SWIFT_TENANT_AUTH_URL"), "Swift teanant auth url") + p.StringVar(&swiftConfigTenantName, swift.Kind+"."+swift.ConfigTenantName, os.Getenv("SWIFT_TENANT_NAME"), "Swift tenant name") + p.StringVar(&swiftConfigUsername, swift.Kind+"."+swift.ConfigUsername, os.Getenv("SWIFT_USERNAME"), "Swift username") - p.StringVar(&container, "container", "", "Name of container") + p.StringVar(&container, "storage-container", os.Getenv("STORAGE_CONTAINER"), "Name of container") p.StringVar(&storagePrefix, "storage-prefix", "tiller", "Prefix to container key where release data is stored") if err := rootCommand.Execute(); err != nil { @@ -192,9 +190,6 @@ func start(c *cobra.Command, args []string) { if gcsConfigProjectId != "" { stowCfg[gcs.ConfigProjectId] = gcsConfigProjectId } - if gcsConfigScopes != "" { - stowCfg[gcs.ConfigScopes] = gcsConfigScopes - } case azure.Kind: if azureConfigAccount != "" { stowCfg[azure.ConfigAccount] = azureConfigAccount From f6bfbfe51bf507ac59b5614b31af2cae0d895d2e Mon Sep 17 00:00:00 2001 From: tamal Date: Wed, 22 Mar 2017 10:26:44 -0700 Subject: [PATCH 09/12] Make status times nullable. --- api/types.go | 6 +++--- pkg/storage/driver/releases.go | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/api/types.go b/api/types.go index 968d5cd87..1e3a4babf 100644 --- a/api/types.go +++ b/api/types.go @@ -43,10 +43,10 @@ type ReleaseStatus struct { Resources string `json:"resources,omitempty"` // Contains the rendered templates/NOTES.txt if available Notes string `json:"notes,omitempty"` - FirstDeployed unversioned.Time `json:"first_deployed,omitempty"` - LastDeployed unversioned.Time `json:"last_deployed,omitempty"` + FirstDeployed *unversioned.Time `json:"first_deployed,omitempty"` + LastDeployed *unversioned.Time `json:"last_deployed,omitempty"` // Deleted tracks when this object was deleted. - Deleted unversioned.Time `json:"deleted,omitempty"` + Deleted *unversioned.Time `json:"deleted,omitempty"` } type ReleaseList struct { diff --git a/pkg/storage/driver/releases.go b/pkg/storage/driver/releases.go index 86b92a58e..15af61420 100644 --- a/pkg/storage/driver/releases.go +++ b/pkg/storage/driver/releases.go @@ -419,10 +419,10 @@ func newReleasesObject(key string, rls *rspb.Release, lbs labels) (*rapi.Release return r, nil } -func toKubeTime(pbt *google_protobuf.Timestamp) unversioned.Time { - var t unversioned.Time +func toKubeTime(pbt *google_protobuf.Timestamp) *unversioned.Time { if pbt != nil { - t = unversioned.NewTime(time.Unix(pbt.Seconds, int64(pbt.Nanos))) + t := unversioned.NewTime(time.Unix(pbt.Seconds, int64(pbt.Nanos))) + return &t } - return t + return nil } From b23db775aa03af2383581e449019e560cdd472b6 Mon Sep 17 00:00:00 2001 From: tamal Date: Wed, 22 Mar 2017 10:42:13 -0700 Subject: [PATCH 10/12] Fix bucket key --- cmd/tiller/tiller.go | 14 ++++++++------ pkg/storage/driver/releases.go | 4 ++-- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/cmd/tiller/tiller.go b/cmd/tiller/tiller.go index 550f95ade..321abdeca 100644 --- a/cmd/tiller/tiller.go +++ b/cmd/tiller/tiller.go @@ -183,8 +183,10 @@ func start(c *cobra.Command, args []string) { case gcs.Kind: if gcsConfigJSONKeyPath != "" { jsonKey, err := ioutil.ReadFile(gcsConfigJSONKeyPath) - fmt.Fprintf(os.Stderr, "Cannot read json key file: %s\n", err) - os.Exit(1) + if err != nil { + fmt.Fprintf(os.Stderr, "Cannot read json key file: %v\n", err) + os.Exit(1) + } stowCfg[gcs.ConfigJSON] = string(jsonKey) } if gcsConfigProjectId != "" { @@ -211,17 +213,17 @@ func start(c *cobra.Command, args []string) { stowCfg[swift.ConfigUsername] = swiftConfigUsername } default: - fmt.Fprintf(os.Stderr, "Unknown provider: %s\n", storageProvider) + fmt.Fprintf(os.Stderr, "Unknown provider: %v\n", storageProvider) os.Exit(1) } loc, err := stow.Dial(storageProvider, stowCfg) if err != nil { - fmt.Fprintf(os.Stderr, "Cannot connect to object store: %s\n", err) + fmt.Fprintf(os.Stderr, "Cannot connect to object store: %v\n", err) os.Exit(1) } c, err := loc.Container(container) if err != nil { - fmt.Fprintf(os.Stderr, "Cannot find container: %s\n", err) + fmt.Fprintf(os.Stderr, "Cannot find container: %v\n", err) os.Exit(1) } cs := rcs.NewExtensionsForConfigOrDie(clientcfg) @@ -230,7 +232,7 @@ func start(c *cobra.Command, args []string) { lstn, err := net.Listen("tcp", grpcAddr) if err != nil { - fmt.Fprintf(os.Stderr, "Server died: %s\n", err) + fmt.Fprintf(os.Stderr, "Server died: %v\n", err) os.Exit(1) } diff --git a/pkg/storage/driver/releases.go b/pkg/storage/driver/releases.go index 15af61420..a9320e901 100644 --- a/pkg/storage/driver/releases.go +++ b/pkg/storage/driver/releases.go @@ -256,11 +256,11 @@ func (releases *Releases) Delete(key string) (rls *rspb.Release, err error) { } func (releases *Releases) itemIDFromTPR(rls *rapi.Release) string { - return fmt.Sprintf("%v/releases/%v/versions%v", releases.prefix, rls.Name, rls.Spec.Version) + return fmt.Sprintf("%v/releases/%v", releases.prefix, rls.Name) } func (releases *Releases) itemIDFromProto(rls *rspb.Release) string { - return fmt.Sprintf("%v/releases/%v/versions%v", releases.prefix, rls.Name, rls.Version) + return fmt.Sprintf("%v/releases/%v", releases.prefix, toTPRSafeKey(rls.Name)) } func (releases *Releases) deleteReleaseData(rls *rspb.Release) error { From bfb0772e858d08898490fb382bf939ac00119e9b Mon Sep 17 00:00:00 2001 From: tamal Date: Wed, 22 Mar 2017 10:54:57 -0700 Subject: [PATCH 11/12] Store config data in maps directly --- api/types.go | 2 +- pkg/storage/driver/releases.go | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/api/types.go b/api/types.go index 1e3a4babf..6a5d524fb 100644 --- a/api/types.go +++ b/api/types.go @@ -22,7 +22,7 @@ type ReleaseSpec struct { ChartMetadata *hapi_chart.Metadata `json:"chartMetadata,omitempty"` // Config is the set of extra Values added to the chart. // These values override the default values inside of the chart. - Config *hapi_chart.Config `json:"config,omitempty"` + Config map[string]string `json:"config,omitempty"` // Version is an int32 which represents the version of the release. Version int32 `json:"version,omitempty"` diff --git a/pkg/storage/driver/releases.go b/pkg/storage/driver/releases.go index a9320e901..861dc7dec 100644 --- a/pkg/storage/driver/releases.go +++ b/pkg/storage/driver/releases.go @@ -395,7 +395,7 @@ func newReleasesObject(key string, rls *rspb.Release, lbs labels) (*rapi.Release Labels: lbs.toMap(), }, Spec: rapi.ReleaseSpec{ - Config: rls.Config, + Config: map[string]string{}, Version: rls.Version, Data: s, }, @@ -416,6 +416,11 @@ func newReleasesObject(key string, rls *rspb.Release, lbs labels) (*rapi.Release if rls.Chart != nil { r.Spec.ChartMetadata = rls.Chart.Metadata } + if rls.Config != nil { + for k, v := range rls.Config.Values { + r.Spec.Config[k] = v.Value + } + } return r, nil } From a03686a91ade5cfef76dc64201601c0173b378b3 Mon Sep 17 00:00:00 2001 From: tamal Date: Wed, 22 Mar 2017 11:11:04 -0700 Subject: [PATCH 12/12] Add todo --- pkg/storage/driver/releases.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/storage/driver/releases.go b/pkg/storage/driver/releases.go index 861dc7dec..8514dfb2d 100644 --- a/pkg/storage/driver/releases.go +++ b/pkg/storage/driver/releases.go @@ -368,6 +368,7 @@ func readAll(r io.Reader, capacity int64) (b []byte, err error) { func newReleasesObject(key string, rls *rspb.Release, lbs labels) (*rapi.Release, error) { const owner = "TILLER" + // TODO(tamal): Just store the proto bytes directly in cloud bucket. // encode the release s, err := encodeRelease(rls) if err != nil {