Store release data in cloud storage buckets. (#14)

pull/2160/head
Tamal Saha 9 years ago committed by GitHub
parent 408f304965
commit 28eb45c43e

@ -34,19 +34,7 @@ type ReleaseSpec struct {
// This is modelled like Volume in Pods, which allows specifying a chart
// inline (like today) or pulling a chart object from a (potentially private)
// chart registry similar to pulling a Docker image.
Data ReleaseData `json:"data,omitempty"`
}
//-------------------------------------------------------------------------------------------
// Chart represents a chart that is installed in a Release.
// The ChartSource represents the location and type of a chart to install.
// This is modelled like Volume in Pods, which allows specifying a chart
// inline (like today) or pulling a chart object from a (potentially private) chart registry similar to pulling a Docker image.
// +optional
type ReleaseData struct {
// Inline charts are what is done today with Helm cli. Release request
// contains the chart definition in the release spec, sent by Helm cli.
Inline string `json:"inline,omitempty"`
Data string `json:"data,omitempty"`
}
type ReleaseStatus struct {

@ -25,6 +25,11 @@ import (
"os"
"strings"
"github.com/graymeta/stow"
"github.com/graymeta/stow/azure"
gcs "github.com/graymeta/stow/google"
"github.com/graymeta/stow/s3"
"github.com/graymeta/stow/swift"
"github.com/spf13/cobra"
rcs "k8s.io/helm/client/clientset"
@ -41,6 +46,7 @@ const (
storageMemory = "memory"
storageConfigMap = "configmap"
storageInlineTPR = "inline-tpr"
storageObjectStoreTPR = "object-store-tpr"
)
// rootServer is the root gRPC server.
@ -59,6 +65,24 @@ var (
traceAddr = ":44136"
enableTracing = false
store = storageConfigMap
storageProvider string
s3ConfigAccessKeyID string
s3ConfigEndpoint string
s3ConfigRegion string
s3ConfigSecretKey string
gcsConfigJSONKeyPath string
gcsConfigProjectId string
gcsConfigScopes string
azureConfigAccount string
azureConfigKey string
swiftConfigKey string
swiftConfigTenantAuthURL string
swiftConfigTenantName string
swiftConfigUsername string
container string
storagePrefix string
)
const globalUsage = `The Kubernetes Helm server.
@ -85,6 +109,28 @@ func main() {
p.StringVar(&store, "storage", storageConfigMap, "storage driver to use. One of 'configmap' or 'memory'")
p.BoolVar(&enableTracing, "trace", false, "enable rpc tracing")
p.StringVar(&storageProvider, "provider", "", "Cloud storage provider")
p.StringVar(&s3ConfigAccessKeyID, s3.Kind+"."+s3.ConfigAccessKeyID, "", "S3 config access key id")
p.StringVar(&s3ConfigEndpoint, s3.Kind+"."+s3.ConfigEndpoint, "", "S3 config endpoint")
p.StringVar(&s3ConfigRegion, s3.Kind+"."+s3.ConfigRegion, "", "S3 config region")
p.StringVar(&s3ConfigSecretKey, s3.Kind+"."+s3.ConfigSecretKey, "", "S3 config secret key")
p.StringVar(&gcsConfigJSONKeyPath, gcs.Kind+".json_key_path", "", "GCS config json key path")
p.StringVar(&gcsConfigProjectId, gcs.Kind+"."+gcs.ConfigProjectId, "", "GCS config project id")
p.StringVar(&gcsConfigScopes, gcs.Kind+"."+gcs.ConfigScopes, "", "GCS config scopes")
p.StringVar(&azureConfigAccount, azure.Kind+"."+azure.ConfigAccount, "", "Azure config account")
p.StringVar(&azureConfigKey, azure.Kind+"."+azure.ConfigKey, "", "Azure config key")
p.StringVar(&swiftConfigKey, swift.Kind+"."+swift.ConfigKey, "", "Swift config key")
p.StringVar(&swiftConfigTenantAuthURL, swift.Kind+"."+swift.ConfigTenantAuthURL, "", "Swift teanant auth url")
p.StringVar(&swiftConfigTenantName, swift.Kind+"."+swift.ConfigTenantName, "", "Swift tenant name")
p.StringVar(&swiftConfigUsername, swift.Kind+"."+swift.ConfigUsername, "", "Swift username")
p.StringVar(&container, "container", "", "Name of container")
p.StringVar(&storagePrefix, "storage-prefix", "tiller", "Prefix to container key where release data is stored")
if err := rootCommand.Execute(); err != nil {
fmt.Fprint(os.Stderr, err)
os.Exit(1)
@ -112,6 +158,71 @@ func start(c *cobra.Command, args []string) {
case storageInlineTPR:
cs := rcs.NewExtensionsForConfigOrDie(clientcfg)
env.Releases = storage.Init(driver.NewReleases(cs.Release(namespace())))
case storageObjectStoreTPR:
stowCfg := stow.ConfigMap{}
switch storageProvider {
case s3.Kind:
if s3ConfigAccessKeyID != "" {
stowCfg[s3.ConfigAccessKeyID] = s3ConfigAccessKeyID
}
if s3ConfigEndpoint != "" {
stowCfg[s3.ConfigEndpoint] = s3ConfigEndpoint
}
if s3ConfigRegion != "" {
stowCfg[s3.ConfigRegion] = s3ConfigRegion
}
if s3ConfigSecretKey != "" {
stowCfg[s3.ConfigSecretKey] = s3ConfigSecretKey
}
case gcs.Kind:
if gcsConfigJSONKeyPath != "" {
jsonKey, err := ioutil.ReadFile(gcsConfigJSONKeyPath)
fmt.Fprintf(os.Stderr, "Cannot read json key file: %s\n", err)
os.Exit(1)
stowCfg[gcs.ConfigJSON] = string(jsonKey)
}
if gcsConfigProjectId != "" {
stowCfg[gcs.ConfigProjectId] = gcsConfigProjectId
}
if gcsConfigScopes != "" {
stowCfg[gcs.ConfigScopes] = gcsConfigScopes
}
case azure.Kind:
if azureConfigAccount != "" {
stowCfg[azure.ConfigAccount] = azureConfigAccount
}
if azureConfigKey != "" {
stowCfg[azure.ConfigKey] = azureConfigKey
}
case swift.Kind:
if swiftConfigKey != "" {
stowCfg[swift.ConfigKey] = swiftConfigKey
}
if swiftConfigTenantAuthURL != "" {
stowCfg[swift.ConfigTenantAuthURL] = swiftConfigTenantAuthURL
}
if swiftConfigTenantName != "" {
stowCfg[swift.ConfigTenantName] = swiftConfigTenantName
}
if swiftConfigUsername != "" {
stowCfg[swift.ConfigUsername] = swiftConfigUsername
}
default:
fmt.Fprintf(os.Stderr, "Unknown provider: %s\n", storageProvider)
os.Exit(1)
}
loc, err := stow.Dial(storageProvider, stowCfg)
if err != nil {
fmt.Fprintf(os.Stderr, "Cannot connect to object store: %s\n", err)
os.Exit(1)
}
c, err := loc.Container(container)
if err != nil {
fmt.Fprintf(os.Stderr, "Cannot find container: %s\n", err)
os.Exit(1)
}
cs := rcs.NewExtensionsForConfigOrDie(clientcfg)
env.Releases = storage.Init(driver.NewObjectStoreReleases(cs.Release(namespace()), c, storagePrefix))
}
lstn, err := net.Listen("tcp", grpcAddr)

@ -69,3 +69,15 @@ import:
version: ~0.1.0
- package: github.com/naoina/go-stringutil
version: ~0.1.0
- package: github.com/graymeta/stow
repo: https://github.com/appscode/stow.git
version: master
vcs: git
- package: github.com/aws/aws-sdk-go
version: v1.7.8
- package: google.golang.org/api/storage/v1
version: master
- package: github.com/Azure/azure-sdk-for-go
vcs: git
version: v7.0.1-beta
- package: github.com/ncw/swift

@ -17,8 +17,9 @@ limitations under the License.
package driver // import "k8s.io/helm/pkg/storage/driver"
import (
"bytes"
"fmt"
rapi "k8s.io/helm/api"
"io"
"strconv"
"strings"
"time"
@ -29,6 +30,8 @@ import (
kblabels "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/validation"
"github.com/graymeta/stow"
rapi "k8s.io/helm/api"
"k8s.io/helm/client/clientset"
rspb "k8s.io/helm/pkg/proto/hapi/release"
"k8s.io/kubernetes/pkg/api/unversioned"
@ -43,6 +46,8 @@ const ReleasesDriverName = "helm.sh/Release"
// ReleasesInterface.
type Releases struct {
impl clientset.ReleaseInterface
container stow.Container
prefix string
}
// NewReleases initializes a new Releases wrapping an implmenetation of
@ -51,6 +56,14 @@ func NewReleases(impl clientset.ReleaseInterface) *Releases {
return &Releases{impl: impl}
}
func NewObjectStoreReleases(impl clientset.ReleaseInterface, c stow.Container, prefix string) *Releases {
p := prefix
if prefix == "" {
p = "tiller"
}
return &Releases{impl: impl, container: c, prefix: p}
}
// Name returns the name of the driver.
func (releases *Releases) Name() string {
return ReleasesDriverName
@ -69,8 +82,13 @@ func (releases *Releases) Get(key string) (*rspb.Release, error) {
logerrf(err, "get: failed to get %q", key)
return nil, err
}
// found the release, decode the base64 data string
r, err := decodeRelease(obj.Spec.Data.Inline)
data, err := releases.getReleaseData(obj)
if err != nil {
return nil, err
}
r, err := decodeRelease(data)
if err != nil {
logerrf(err, "get: failed to decode data %q", key)
return nil, err
@ -97,7 +115,11 @@ func (releases *Releases) List(filter func(*rspb.Release) bool) ([]*rspb.Release
// iterate over the releases object list
// and decode each release
for _, item := range list.Items {
rls, err := decodeRelease(item.Spec.Data.Inline)
data, err := releases.getReleaseData(&item)
if err != nil {
return nil, err
}
rls, err := decodeRelease(data)
if err != nil {
logerrf(err, "list: failed to decode release: %v", item)
continue
@ -134,7 +156,11 @@ func (releases *Releases) Query(labels map[string]string) ([]*rspb.Release, erro
var results []*rspb.Release
for _, item := range list.Items {
rls, err := decodeRelease(item.Spec.Data.Inline)
data, err := releases.getReleaseData(&item)
if err != nil {
return nil, err
}
rls, err := decodeRelease(data)
if err != nil {
logerrf(err, "query: failed to decode release: %s", err)
continue
@ -159,6 +185,12 @@ func (releases *Releases) Create(key string, rls *rspb.Release) error {
logerrf(err, "create: failed to encode release %q", rls.Name)
return err
}
// push the release object data to object store if configured
err = releases.writeReleaseData(obj)
if err != nil {
logerrf(err, "create: failed to encode release %q", rls.Name)
return err
}
// push the release object out into the kubiverse
if _, err := releases.impl.Create(obj); err != nil {
if kberrs.IsAlreadyExists(err) {
@ -186,6 +218,12 @@ func (releases *Releases) Update(key string, rls *rspb.Release) error {
logerrf(err, "update: failed to encode release %q", rls.Name)
return err
}
// push the release object data to object store if configured
err = releases.writeReleaseData(obj)
if err != nil {
logerrf(err, "create: failed to encode release %q", rls.Name)
return err
}
// push the release object out into the kubiverse
_, err = releases.impl.Update(obj)
if err != nil {
@ -207,12 +245,99 @@ func (releases *Releases) Delete(key string) (rls *rspb.Release, err error) {
return nil, err
}
// delete the release
if err = releases.deleteReleaseData(rls); err != nil {
return rls, err
}
if err = releases.impl.Delete(key); err != nil {
return rls, err
}
return rls, nil
}
func (releases *Releases) itemIDFromTPR(rls *rapi.Release) string {
return fmt.Sprintf("%v/releases/%v/versions%v", releases.prefix, rls.Name, rls.Spec.Version)
}
func (releases *Releases) itemIDFromProto(rls *rspb.Release) string {
return fmt.Sprintf("%v/releases/%v/versions%v", releases.prefix, rls.Name, rls.Version)
}
func (releases *Releases) deleteReleaseData(rls *rspb.Release) error {
if releases.container != nil {
return releases.container.RemoveItem(releases.itemIDFromProto(rls))
}
return nil
}
func (releases *Releases) writeReleaseData(rls *rapi.Release) error {
if releases.container != nil {
b := bytes.NewBufferString(rls.Spec.Data)
sz := len(rls.Spec.Data)
rls.Spec.Data = ""
_, err := releases.container.Put(releases.itemIDFromTPR(rls), b, int64(sz), nil)
if err != nil {
return err
}
}
return nil
}
func (releases *Releases) getReleaseData(rls *rapi.Release) (string, error) {
if rls.Spec.Data != "" {
return rls.Spec.Data, nil
} else if releases.container != nil {
item, err := releases.container.Item(releases.itemIDFromTPR(rls))
if err != nil {
return "", err
}
f, err := item.Open()
if err != nil {
return "", err
}
defer f.Close()
// It's a good but not certain bet that FileInfo will tell us exactly how much to
// read, so let's try it but be prepared for the answer to be wrong.
var n int64
// Don't preallocate a huge buffer, just in case.
if size, err := item.Size(); err != nil && size < 1e9 {
n = size
}
// As initial capacity for readAll, use n + a little extra in case Size is zero,
// and to avoid another allocation after Read has filled the buffer. The readAll
// call will read into its allocated internal buffer cheaply. If the size was
// wrong, we'll either waste some space off the end or reallocate as needed, but
// in the overwhelmingly common case we'll get it just right.
b, err := readAll(f, n+bytes.MinRead)
if err != nil {
return "", err
}
return string(b), nil
}
return "", fmt.Errorf("Missing release data for %v", rls.Name)
}
// readAll reads from r until an error or EOF and returns the data it read
// from the internal buffer allocated with a specified capacity.
func readAll(r io.Reader, capacity int64) (b []byte, err error) {
buf := bytes.NewBuffer(make([]byte, 0, capacity))
// If the buffer overflows, we will get bytes.ErrTooLarge.
// Return that as an error. Any other panic remains.
defer func() {
e := recover()
if e == nil {
return
}
if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge {
err = panicErr
} else {
panic(e)
}
}()
_, err = buf.ReadFrom(r)
return buf.Bytes(), err
}
// newReleasesObject constructs a kubernetes Release object
// to store a release. Each release data entry is the base64
// encoded string of a release's binary protobuf encoding.
@ -254,9 +379,7 @@ func newReleasesObject(key string, rls *rspb.Release, lbs labels) (*rapi.Release
Spec: rapi.ReleaseSpec{
Config: rls.Config,
Version: rls.Version,
Data: rapi.ReleaseData{
Inline: s,
},
Data: s,
},
Status: rapi.ReleaseStatus{
FirstDeployed: toKubeTime(rls.Info.FirstDeployed),

Loading…
Cancel
Save