From 28eb45c43e3fbfb21c19e4d707ebb3f9623f8fab Mon Sep 17 00:00:00 2001 From: Tamal Saha Date: Tue, 21 Mar 2017 14:46:43 -0700 Subject: [PATCH] Store release data in cloud storage buckets. (#14) --- api/types.go | 14 +--- cmd/tiller/tiller.go | 117 ++++++++++++++++++++++++++- glide.yaml | 12 +++ pkg/storage/driver/releases.go | 139 +++++++++++++++++++++++++++++++-- 4 files changed, 258 insertions(+), 24 deletions(-) diff --git a/api/types.go b/api/types.go index a22b56db6..968d5cd87 100644 --- a/api/types.go +++ b/api/types.go @@ -34,19 +34,7 @@ type ReleaseSpec struct { // This is modelled like Volume in Pods, which allows specifying a chart // inline (like today) or pulling a chart object from a (potentially private) // chart registry similar to pulling a Docker image. - Data ReleaseData `json:"data,omitempty"` -} - -//------------------------------------------------------------------------------------------- -// Chart represents a chart that is installed in a Release. -// The ChartSource represents the location and type of a chart to install. -// This is modelled like Volume in Pods, which allows specifying a chart -// inline (like today) or pulling a chart object from a (potentially private) chart registry similar to pulling a Docker image. -// +optional -type ReleaseData struct { - // Inline charts are what is done today with Helm cli. Release request - // contains the chart definition in the release spec, sent by Helm cli. - Inline string `json:"inline,omitempty"` + Data string `json:"data,omitempty"` } type ReleaseStatus struct { diff --git a/cmd/tiller/tiller.go b/cmd/tiller/tiller.go index 8ab31f7c6..bcbb71abc 100644 --- a/cmd/tiller/tiller.go +++ b/cmd/tiller/tiller.go @@ -25,6 +25,11 @@ import ( "os" "strings" + "github.com/graymeta/stow" + "github.com/graymeta/stow/azure" + gcs "github.com/graymeta/stow/google" + "github.com/graymeta/stow/s3" + "github.com/graymeta/stow/swift" "github.com/spf13/cobra" rcs "k8s.io/helm/client/clientset" @@ -38,9 +43,10 @@ import ( ) const ( - storageMemory = "memory" - storageConfigMap = "configmap" - storageInlineTPR = "inline-tpr" + storageMemory = "memory" + storageConfigMap = "configmap" + storageInlineTPR = "inline-tpr" + storageObjectStoreTPR = "object-store-tpr" ) // rootServer is the root gRPC server. @@ -59,6 +65,24 @@ var ( traceAddr = ":44136" enableTracing = false store = storageConfigMap + + storageProvider string + s3ConfigAccessKeyID string + s3ConfigEndpoint string + s3ConfigRegion string + s3ConfigSecretKey string + gcsConfigJSONKeyPath string + gcsConfigProjectId string + gcsConfigScopes string + azureConfigAccount string + azureConfigKey string + swiftConfigKey string + swiftConfigTenantAuthURL string + swiftConfigTenantName string + swiftConfigUsername string + + container string + storagePrefix string ) const globalUsage = `The Kubernetes Helm server. @@ -85,6 +109,28 @@ func main() { p.StringVar(&store, "storage", storageConfigMap, "storage driver to use. One of 'configmap' or 'memory'") p.BoolVar(&enableTracing, "trace", false, "enable rpc tracing") + p.StringVar(&storageProvider, "provider", "", "Cloud storage provider") + + p.StringVar(&s3ConfigAccessKeyID, s3.Kind+"."+s3.ConfigAccessKeyID, "", "S3 config access key id") + p.StringVar(&s3ConfigEndpoint, s3.Kind+"."+s3.ConfigEndpoint, "", "S3 config endpoint") + p.StringVar(&s3ConfigRegion, s3.Kind+"."+s3.ConfigRegion, "", "S3 config region") + p.StringVar(&s3ConfigSecretKey, s3.Kind+"."+s3.ConfigSecretKey, "", "S3 config secret key") + + p.StringVar(&gcsConfigJSONKeyPath, gcs.Kind+".json_key_path", "", "GCS config json key path") + p.StringVar(&gcsConfigProjectId, gcs.Kind+"."+gcs.ConfigProjectId, "", "GCS config project id") + p.StringVar(&gcsConfigScopes, gcs.Kind+"."+gcs.ConfigScopes, "", "GCS config scopes") + + p.StringVar(&azureConfigAccount, azure.Kind+"."+azure.ConfigAccount, "", "Azure config account") + p.StringVar(&azureConfigKey, azure.Kind+"."+azure.ConfigKey, "", "Azure config key") + + p.StringVar(&swiftConfigKey, swift.Kind+"."+swift.ConfigKey, "", "Swift config key") + p.StringVar(&swiftConfigTenantAuthURL, swift.Kind+"."+swift.ConfigTenantAuthURL, "", "Swift teanant auth url") + p.StringVar(&swiftConfigTenantName, swift.Kind+"."+swift.ConfigTenantName, "", "Swift tenant name") + p.StringVar(&swiftConfigUsername, swift.Kind+"."+swift.ConfigUsername, "", "Swift username") + + p.StringVar(&container, "container", "", "Name of container") + p.StringVar(&storagePrefix, "storage-prefix", "tiller", "Prefix to container key where release data is stored") + if err := rootCommand.Execute(); err != nil { fmt.Fprint(os.Stderr, err) os.Exit(1) @@ -112,6 +158,71 @@ func start(c *cobra.Command, args []string) { case storageInlineTPR: cs := rcs.NewExtensionsForConfigOrDie(clientcfg) env.Releases = storage.Init(driver.NewReleases(cs.Release(namespace()))) + case storageObjectStoreTPR: + stowCfg := stow.ConfigMap{} + switch storageProvider { + case s3.Kind: + if s3ConfigAccessKeyID != "" { + stowCfg[s3.ConfigAccessKeyID] = s3ConfigAccessKeyID + } + if s3ConfigEndpoint != "" { + stowCfg[s3.ConfigEndpoint] = s3ConfigEndpoint + } + if s3ConfigRegion != "" { + stowCfg[s3.ConfigRegion] = s3ConfigRegion + } + if s3ConfigSecretKey != "" { + stowCfg[s3.ConfigSecretKey] = s3ConfigSecretKey + } + case gcs.Kind: + if gcsConfigJSONKeyPath != "" { + jsonKey, err := ioutil.ReadFile(gcsConfigJSONKeyPath) + fmt.Fprintf(os.Stderr, "Cannot read json key file: %s\n", err) + os.Exit(1) + stowCfg[gcs.ConfigJSON] = string(jsonKey) + } + if gcsConfigProjectId != "" { + stowCfg[gcs.ConfigProjectId] = gcsConfigProjectId + } + if gcsConfigScopes != "" { + stowCfg[gcs.ConfigScopes] = gcsConfigScopes + } + case azure.Kind: + if azureConfigAccount != "" { + stowCfg[azure.ConfigAccount] = azureConfigAccount + } + if azureConfigKey != "" { + stowCfg[azure.ConfigKey] = azureConfigKey + } + case swift.Kind: + if swiftConfigKey != "" { + stowCfg[swift.ConfigKey] = swiftConfigKey + } + if swiftConfigTenantAuthURL != "" { + stowCfg[swift.ConfigTenantAuthURL] = swiftConfigTenantAuthURL + } + if swiftConfigTenantName != "" { + stowCfg[swift.ConfigTenantName] = swiftConfigTenantName + } + if swiftConfigUsername != "" { + stowCfg[swift.ConfigUsername] = swiftConfigUsername + } + default: + fmt.Fprintf(os.Stderr, "Unknown provider: %s\n", storageProvider) + os.Exit(1) + } + loc, err := stow.Dial(storageProvider, stowCfg) + if err != nil { + fmt.Fprintf(os.Stderr, "Cannot connect to object store: %s\n", err) + os.Exit(1) + } + c, err := loc.Container(container) + if err != nil { + fmt.Fprintf(os.Stderr, "Cannot find container: %s\n", err) + os.Exit(1) + } + cs := rcs.NewExtensionsForConfigOrDie(clientcfg) + env.Releases = storage.Init(driver.NewObjectStoreReleases(cs.Release(namespace()), c, storagePrefix)) } lstn, err := net.Listen("tcp", grpcAddr) diff --git a/glide.yaml b/glide.yaml index f8a7af4f4..eba8902f1 100644 --- a/glide.yaml +++ b/glide.yaml @@ -69,3 +69,15 @@ import: version: ~0.1.0 - package: github.com/naoina/go-stringutil version: ~0.1.0 +- package: github.com/graymeta/stow + repo: https://github.com/appscode/stow.git + version: master + vcs: git +- package: github.com/aws/aws-sdk-go + version: v1.7.8 +- package: google.golang.org/api/storage/v1 + version: master +- package: github.com/Azure/azure-sdk-for-go + vcs: git + version: v7.0.1-beta +- package: github.com/ncw/swift diff --git a/pkg/storage/driver/releases.go b/pkg/storage/driver/releases.go index 459b5732c..960b87b0e 100644 --- a/pkg/storage/driver/releases.go +++ b/pkg/storage/driver/releases.go @@ -17,8 +17,9 @@ limitations under the License. package driver // import "k8s.io/helm/pkg/storage/driver" import ( + "bytes" "fmt" - rapi "k8s.io/helm/api" + "io" "strconv" "strings" "time" @@ -29,6 +30,8 @@ import ( kblabels "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/validation" + "github.com/graymeta/stow" + rapi "k8s.io/helm/api" "k8s.io/helm/client/clientset" rspb "k8s.io/helm/pkg/proto/hapi/release" "k8s.io/kubernetes/pkg/api/unversioned" @@ -42,7 +45,9 @@ const ReleasesDriverName = "helm.sh/Release" // Releases is a wrapper around an implementation of a kubernetes // ReleasesInterface. type Releases struct { - impl clientset.ReleaseInterface + impl clientset.ReleaseInterface + container stow.Container + prefix string } // NewReleases initializes a new Releases wrapping an implmenetation of @@ -51,6 +56,14 @@ func NewReleases(impl clientset.ReleaseInterface) *Releases { return &Releases{impl: impl} } +func NewObjectStoreReleases(impl clientset.ReleaseInterface, c stow.Container, prefix string) *Releases { + p := prefix + if prefix == "" { + p = "tiller" + } + return &Releases{impl: impl, container: c, prefix: p} +} + // Name returns the name of the driver. func (releases *Releases) Name() string { return ReleasesDriverName @@ -69,8 +82,13 @@ func (releases *Releases) Get(key string) (*rspb.Release, error) { logerrf(err, "get: failed to get %q", key) return nil, err } + // found the release, decode the base64 data string - r, err := decodeRelease(obj.Spec.Data.Inline) + data, err := releases.getReleaseData(obj) + if err != nil { + return nil, err + } + r, err := decodeRelease(data) if err != nil { logerrf(err, "get: failed to decode data %q", key) return nil, err @@ -97,7 +115,11 @@ func (releases *Releases) List(filter func(*rspb.Release) bool) ([]*rspb.Release // iterate over the releases object list // and decode each release for _, item := range list.Items { - rls, err := decodeRelease(item.Spec.Data.Inline) + data, err := releases.getReleaseData(&item) + if err != nil { + return nil, err + } + rls, err := decodeRelease(data) if err != nil { logerrf(err, "list: failed to decode release: %v", item) continue @@ -134,7 +156,11 @@ func (releases *Releases) Query(labels map[string]string) ([]*rspb.Release, erro var results []*rspb.Release for _, item := range list.Items { - rls, err := decodeRelease(item.Spec.Data.Inline) + data, err := releases.getReleaseData(&item) + if err != nil { + return nil, err + } + rls, err := decodeRelease(data) if err != nil { logerrf(err, "query: failed to decode release: %s", err) continue @@ -159,6 +185,12 @@ func (releases *Releases) Create(key string, rls *rspb.Release) error { logerrf(err, "create: failed to encode release %q", rls.Name) return err } + // push the release object data to object store if configured + err = releases.writeReleaseData(obj) + if err != nil { + logerrf(err, "create: failed to encode release %q", rls.Name) + return err + } // push the release object out into the kubiverse if _, err := releases.impl.Create(obj); err != nil { if kberrs.IsAlreadyExists(err) { @@ -186,6 +218,12 @@ func (releases *Releases) Update(key string, rls *rspb.Release) error { logerrf(err, "update: failed to encode release %q", rls.Name) return err } + // push the release object data to object store if configured + err = releases.writeReleaseData(obj) + if err != nil { + logerrf(err, "create: failed to encode release %q", rls.Name) + return err + } // push the release object out into the kubiverse _, err = releases.impl.Update(obj) if err != nil { @@ -207,12 +245,99 @@ func (releases *Releases) Delete(key string) (rls *rspb.Release, err error) { return nil, err } // delete the release + if err = releases.deleteReleaseData(rls); err != nil { + return rls, err + } if err = releases.impl.Delete(key); err != nil { return rls, err } return rls, nil } +func (releases *Releases) itemIDFromTPR(rls *rapi.Release) string { + return fmt.Sprintf("%v/releases/%v/versions%v", releases.prefix, rls.Name, rls.Spec.Version) +} + +func (releases *Releases) itemIDFromProto(rls *rspb.Release) string { + return fmt.Sprintf("%v/releases/%v/versions%v", releases.prefix, rls.Name, rls.Version) +} + +func (releases *Releases) deleteReleaseData(rls *rspb.Release) error { + if releases.container != nil { + return releases.container.RemoveItem(releases.itemIDFromProto(rls)) + } + return nil +} + +func (releases *Releases) writeReleaseData(rls *rapi.Release) error { + if releases.container != nil { + b := bytes.NewBufferString(rls.Spec.Data) + sz := len(rls.Spec.Data) + rls.Spec.Data = "" + _, err := releases.container.Put(releases.itemIDFromTPR(rls), b, int64(sz), nil) + if err != nil { + return err + } + } + return nil +} + +func (releases *Releases) getReleaseData(rls *rapi.Release) (string, error) { + if rls.Spec.Data != "" { + return rls.Spec.Data, nil + } else if releases.container != nil { + item, err := releases.container.Item(releases.itemIDFromTPR(rls)) + if err != nil { + return "", err + } + + f, err := item.Open() + if err != nil { + return "", err + } + defer f.Close() + // It's a good but not certain bet that FileInfo will tell us exactly how much to + // read, so let's try it but be prepared for the answer to be wrong. + var n int64 + // Don't preallocate a huge buffer, just in case. + if size, err := item.Size(); err != nil && size < 1e9 { + n = size + } + // As initial capacity for readAll, use n + a little extra in case Size is zero, + // and to avoid another allocation after Read has filled the buffer. The readAll + // call will read into its allocated internal buffer cheaply. If the size was + // wrong, we'll either waste some space off the end or reallocate as needed, but + // in the overwhelmingly common case we'll get it just right. + b, err := readAll(f, n+bytes.MinRead) + if err != nil { + return "", err + } + return string(b), nil + } + return "", fmt.Errorf("Missing release data for %v", rls.Name) +} + +// readAll reads from r until an error or EOF and returns the data it read +// from the internal buffer allocated with a specified capacity. +func readAll(r io.Reader, capacity int64) (b []byte, err error) { + buf := bytes.NewBuffer(make([]byte, 0, capacity)) + // If the buffer overflows, we will get bytes.ErrTooLarge. + // Return that as an error. Any other panic remains. + defer func() { + e := recover() + if e == nil { + return + } + if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge { + err = panicErr + } else { + panic(e) + } + }() + _, err = buf.ReadFrom(r) + return buf.Bytes(), err +} + // newReleasesObject constructs a kubernetes Release object // to store a release. Each release data entry is the base64 // encoded string of a release's binary protobuf encoding. @@ -254,9 +379,7 @@ func newReleasesObject(key string, rls *rspb.Release, lbs labels) (*rapi.Release Spec: rapi.ReleaseSpec{ Config: rls.Config, Version: rls.Version, - Data: rapi.ReleaseData{ - Inline: s, - }, + Data: s, }, Status: rapi.ReleaseStatus{ FirstDeployed: toKubeTime(rls.Info.FirstDeployed),