feat(*): add generic logging interface (#2394)

Adds a generic logging interface to importable packages.
pull/2469/head
Adam Reese 7 years ago committed by GitHub
parent 4d5bffecbd
commit 15254e4c5c

@ -29,6 +29,7 @@ import (
goprom "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
@ -69,6 +70,8 @@ var rootServer *grpc.Server
// Any changes to env should be done before rootServer.Serve() is called.
var env = environment.New()
var logger *log.Logger
var (
grpcAddr = ":44134"
probeAddr = ":44135"
@ -93,64 +96,83 @@ Tiller is the server for Helm. It provides in-cluster resource management.
By default, Tiller listens for gRPC connections on port 44134.
`
var rootCommand = &cobra.Command{
Use: "tiller",
Short: "The Kubernetes Helm server.",
Long: globalUsage,
Run: start,
func addFlags(flags *pflag.FlagSet) {
flags.StringVarP(&grpcAddr, "listen", "l", ":44134", "address:port to listen on")
flags.StringVar(&store, "storage", storageConfigMap, "storage driver to use. One of 'configmap' or 'memory'")
flags.BoolVar(&enableTracing, "trace", false, "enable rpc tracing")
flags.BoolVar(&remoteReleaseModules, "experimental-release", false, "enable experimental release modules")
flags.BoolVar(&tlsEnable, "tls", tlsEnableEnvVarDefault(), "enable TLS")
flags.BoolVar(&tlsVerify, "tls-verify", tlsVerifyEnvVarDefault(), "enable TLS and verify remote certificate")
flags.StringVar(&keyFile, "tls-key", tlsDefaultsFromEnv("tls-key"), "path to TLS private key file")
flags.StringVar(&certFile, "tls-cert", tlsDefaultsFromEnv("tls-cert"), "path to TLS certificate file")
flags.StringVar(&caCertFile, "tls-ca-cert", tlsDefaultsFromEnv("tls-ca-cert"), "trust certificates signed by this CA")
}
func init() {
log.SetFlags(log.Flags() | log.Lshortfile)
func initLog() {
if enableTracing {
log.SetFlags(log.Lshortfile)
}
logger = newLogger("main")
}
func main() {
p := rootCommand.PersistentFlags()
p.StringVarP(&grpcAddr, "listen", "l", ":44134", "address:port to listen on")
p.StringVar(&store, "storage", storageConfigMap, "storage driver to use. One of 'configmap' or 'memory'")
p.BoolVar(&enableTracing, "trace", false, "enable rpc tracing")
p.BoolVar(&remoteReleaseModules, "experimental-release", false, "enable experimental release modules")
p.BoolVar(&tlsEnable, "tls", tlsEnableEnvVarDefault(), "enable TLS")
p.BoolVar(&tlsVerify, "tls-verify", tlsVerifyEnvVarDefault(), "enable TLS and verify remote certificate")
p.StringVar(&keyFile, "tls-key", tlsDefaultsFromEnv("tls-key"), "path to TLS private key file")
p.StringVar(&certFile, "tls-cert", tlsDefaultsFromEnv("tls-cert"), "path to TLS certificate file")
p.StringVar(&caCertFile, "tls-ca-cert", tlsDefaultsFromEnv("tls-ca-cert"), "trust certificates signed by this CA")
if err := rootCommand.Execute(); err != nil {
fmt.Fprint(os.Stderr, err)
os.Exit(1)
root := &cobra.Command{
Use: "tiller",
Short: "The Kubernetes Helm server.",
Long: globalUsage,
Run: start,
PreRun: func(_ *cobra.Command, _ []string) {
initLog()
},
}
addFlags(root.Flags())
if err := root.Execute(); err != nil {
logger.Fatal(err)
}
}
func newLogger(prefix string) *log.Logger {
if len(prefix) > 0 {
prefix = fmt.Sprintf("[%s] ", prefix)
}
return log.New(os.Stderr, prefix, log.Flags())
}
func start(c *cobra.Command, args []string) {
clientset, err := kube.New(nil).ClientSet()
if err != nil {
fmt.Fprintf(os.Stderr, "Cannot initialize Kubernetes connection: %s\n", err)
os.Exit(1)
logger.Fatalf("Cannot initialize Kubernetes connection: %s", err)
}
switch store {
case storageMemory:
env.Releases = storage.Init(driver.NewMemory())
case storageConfigMap:
env.Releases = storage.Init(driver.NewConfigMaps(clientset.Core().ConfigMaps(namespace())))
cfgmaps := driver.NewConfigMaps(clientset.Core().ConfigMaps(namespace()))
cfgmaps.Log = newLogger("storage/driver").Printf
env.Releases = storage.Init(cfgmaps)
env.Releases.Log = newLogger("storage").Printf
}
kubeClient := kube.New(nil)
kubeClient.Log = newLogger("kube").Printf
env.KubeClient = kubeClient
if tlsEnable || tlsVerify {
opts := tlsutil.Options{CertFile: certFile, KeyFile: keyFile}
if tlsVerify {
opts.CaCertFile = caCertFile
}
}
var opts []grpc.ServerOption
if tlsEnable || tlsVerify {
cfg, err := tlsutil.ServerConfig(tlsOptions())
if err != nil {
fmt.Fprintf(os.Stderr, "Could not create server TLS configuration: %v\n", err)
os.Exit(1)
logger.Fatalf("Could not create server TLS configuration: %v", err)
}
opts = append(opts, grpc.Creds(credentials.NewTLS(cfg)))
}
@ -159,14 +181,13 @@ func start(c *cobra.Command, args []string) {
lstn, err := net.Listen("tcp", grpcAddr)
if err != nil {
fmt.Fprintf(os.Stderr, "Server died: %s\n", err)
os.Exit(1)
logger.Fatalf("Server died: %s", err)
}
fmt.Printf("Starting Tiller %s (tls=%t)\n", version.GetVersion(), tlsEnable || tlsVerify)
fmt.Printf("GRPC listening on %s\n", grpcAddr)
fmt.Printf("Probes listening on %s\n", probeAddr)
fmt.Printf("Storage driver is %s\n", env.Releases.Name())
logger.Printf("Starting Tiller %s (tls=%t)", version.GetVersion(), tlsEnable || tlsVerify)
logger.Printf("GRPC listening on %s", grpcAddr)
logger.Printf("Probes listening on %s", probeAddr)
logger.Printf("Storage driver is %s", env.Releases.Name())
if enableTracing {
startTracing(traceAddr)
@ -176,6 +197,7 @@ func start(c *cobra.Command, args []string) {
probeErrCh := make(chan error)
go func() {
svc := tiller.NewReleaseServer(env, clientset, remoteReleaseModules)
svc.Log = newLogger("tiller").Printf
services.RegisterReleaseServiceServer(rootServer, svc)
if err := rootServer.Serve(lstn); err != nil {
srvErrCh <- err
@ -196,10 +218,9 @@ func start(c *cobra.Command, args []string) {
select {
case err := <-srvErrCh:
fmt.Fprintf(os.Stderr, "Server died: %s\n", err)
os.Exit(1)
logger.Fatalf("Server died: %s", err)
case err := <-probeErrCh:
fmt.Fprintf(os.Stderr, "Probes server died: %s\n", err)
logger.Printf("Probes server died: %s", err)
}
}

@ -17,8 +17,6 @@ limitations under the License.
package main // import "k8s.io/helm/cmd/tiller"
import (
"fmt"
"log"
"net/http"
_ "net/http/pprof"
@ -27,7 +25,7 @@ import (
)
func startTracing(addr string) {
fmt.Printf("Tracing server is listening on %s\n", addr)
logger.Printf("Tracing server is listening on %s\n", addr)
grpc.EnableTracing = true
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
@ -41,7 +39,7 @@ func startTracing(addr string) {
go func() {
if err := http.ListenAndServe(addr, nil); err != nil {
log.Printf("tracing error: %s", err)
logger.Printf("tracing error: %s", err)
}
}()
}

@ -59,6 +59,8 @@ type Client struct {
cmdutil.Factory
// SchemaCacheDir is the path for loading cached schema.
SchemaCacheDir string
Log func(string, ...interface{})
}
// New create a new Client
@ -66,6 +68,7 @@ func New(config clientcmd.ClientConfig) *Client {
return &Client{
Factory: cmdutil.NewFactory(config),
SchemaCacheDir: clientcmd.RecommendedSchemaFile,
Log: func(_ string, _ ...interface{}) {},
}
}
@ -99,7 +102,7 @@ func (c *Client) Create(namespace string, reader io.Reader, timeout int64, shoul
func (c *Client) newBuilder(namespace string, reader io.Reader) *resource.Result {
schema, err := c.Validator(true, c.SchemaCacheDir)
if err != nil {
log.Printf("warning: failed to load schema: %s", err)
c.Log("warning: failed to load schema: %s", err)
}
return c.NewBuilder().
ContinueOnError().
@ -115,12 +118,12 @@ func (c *Client) newBuilder(namespace string, reader io.Reader) *resource.Result
func (c *Client) BuildUnstructured(namespace string, reader io.Reader) (Result, error) {
schema, err := c.Validator(true, c.SchemaCacheDir)
if err != nil {
log.Printf("warning: failed to load schema: %s", err)
c.Log("warning: failed to load schema: %s", err)
}
mapper, typer, err := c.UnstructuredObject()
if err != nil {
log.Printf("failed to load mapper: %s", err)
c.Log("failed to load mapper: %s", err)
return nil, err
}
var result Result
@ -155,9 +158,9 @@ func (c *Client) Get(namespace string, reader io.Reader) (string, error) {
}
missing := []string{}
err = perform(infos, func(info *resource.Info) error {
log.Printf("Doing get for %s: %q", info.Mapping.GroupVersionKind.Kind, info.Name)
c.Log("Doing get for %s: %q", info.Mapping.GroupVersionKind.Kind, info.Name)
if err := info.Get(); err != nil {
log.Printf("WARNING: Failed Get for resource %q: %s", info.Name, err)
c.Log("WARNING: Failed Get for resource %q: %s", info.Name, err)
missing = append(missing, fmt.Sprintf("%v\t\t%s", info.Mapping.Resource, info.Name))
return nil
}
@ -185,7 +188,7 @@ func (c *Client) Get(namespace string, reader io.Reader) (string, error) {
}
for _, o := range ot {
if err := p.PrintObj(o, buf); err != nil {
log.Printf("failed to print object type %s, object: %q :\n %v", t, o, err)
c.Log("failed to print object type %s, object: %q :\n %v", t, o, err)
return "", err
}
}
@ -238,7 +241,7 @@ func (c *Client) Update(namespace string, originalReader, targetReader io.Reader
}
kind := info.Mapping.GroupVersionKind.Kind
log.Printf("Created a new %s called %q\n", kind, info.Name)
c.Log("Created a new %s called %q\n", kind, info.Name)
return nil
}
@ -248,7 +251,7 @@ func (c *Client) Update(namespace string, originalReader, targetReader io.Reader
}
if err := updateResource(c, info, originalInfo.Object, recreate); err != nil {
log.Printf("error updating the resource %q:\n\t %v", info.Name, err)
c.Log("error updating the resource %q:\n\t %v", info.Name, err)
updateErrors = append(updateErrors, err.Error())
}
@ -263,9 +266,9 @@ func (c *Client) Update(namespace string, originalReader, targetReader io.Reader
}
for _, info := range original.Difference(target) {
log.Printf("Deleting %q in %s...", info.Name, info.Namespace)
c.Log("Deleting %q in %s...", info.Name, info.Namespace)
if err := deleteResource(c, info); err != nil {
log.Printf("Failed to delete %q, err: %s", info.Name, err)
c.Log("Failed to delete %q, err: %s", info.Name, err)
}
}
if shouldWait {
@ -283,23 +286,23 @@ func (c *Client) Delete(namespace string, reader io.Reader) error {
return err
}
return perform(infos, func(info *resource.Info) error {
log.Printf("Starting delete for %q %s", info.Name, info.Mapping.GroupVersionKind.Kind)
c.Log("Starting delete for %q %s", info.Name, info.Mapping.GroupVersionKind.Kind)
err := deleteResource(c, info)
return skipIfNotFound(err)
return c.skipIfNotFound(err)
})
}
func skipIfNotFound(err error) error {
func (c *Client) skipIfNotFound(err error) error {
if errors.IsNotFound(err) {
log.Printf("%v", err)
c.Log("%v", err)
return nil
}
return err
}
func watchTimeout(t time.Duration) ResourceActorFunc {
func (c *Client) watchTimeout(t time.Duration) ResourceActorFunc {
return func(info *resource.Info) error {
return watchUntilReady(t, info)
return c.watchUntilReady(t, info)
}
}
@ -322,7 +325,7 @@ func (c *Client) WatchUntilReady(namespace string, reader io.Reader, timeout int
}
// For jobs, there's also the option to do poll c.Jobs(namespace).Get():
// https://github.com/adamreese/kubernetes/blob/master/test/e2e/job.go#L291-L300
return perform(infos, watchTimeout(time.Duration(timeout)*time.Second))
return perform(infos, c.watchTimeout(time.Duration(timeout)*time.Second))
}
func perform(infos Result, fn ResourceActorFunc) error {
@ -355,7 +358,7 @@ func deleteResource(c *Client, info *resource.Info) error {
}
return err
}
log.Printf("Using reaper for deleting %q", info.Name)
c.Log("Using reaper for deleting %q", info.Name)
return reaper.Stop(info.Namespace, info.Name, 0, nil)
}
@ -395,7 +398,7 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object,
return fmt.Errorf("failed to create patch: %s", err)
}
if patch == nil {
log.Printf("Looks like there are no changes for %s %q", target.Mapping.GroupVersionKind.Kind, target.Name)
c.Log("Looks like there are no changes for %s %q", target.Mapping.GroupVersionKind.Kind, target.Name)
// This needs to happen to make sure that tiller has the latest info from the API
// Otherwise there will be no labels and other functions that use labels will panic
if err := target.Get(); err != nil {
@ -445,7 +448,7 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object,
// Restart pods
for _, pod := range pods.Items {
log.Printf("Restarting pod: %v/%v", pod.Namespace, pod.Name)
c.Log("Restarting pod: %v/%v", pod.Namespace, pod.Name)
// Delete each pod for get them restarted with changed spec.
if err := client.Core().Pods(pod.Namespace).Delete(pod.Name, metav1.NewPreconditionDeleteOptions(string(pod.UID))); err != nil {
@ -474,14 +477,14 @@ func getSelectorFromObject(obj runtime.Object) (map[string]string, error) {
}
}
func watchUntilReady(timeout time.Duration, info *resource.Info) error {
func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error {
w, err := resource.NewHelper(info.Client, info.Mapping).WatchSingle(info.Namespace, info.Name, info.ResourceVersion)
if err != nil {
return err
}
kind := info.Mapping.GroupVersionKind.Kind
log.Printf("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeout)
c.Log("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeout)
// What we watch for depends on the Kind.
// - For a Job, we watch for completion.
@ -496,17 +499,17 @@ func watchUntilReady(timeout time.Duration, info *resource.Info) error {
// we get. We care mostly about jobs, where what we want to see is
// the status go into a good state. For other types, like ReplicaSet
// we don't really do anything to support these as hooks.
log.Printf("Add/Modify event for %s: %v", info.Name, e.Type)
c.Log("Add/Modify event for %s: %v", info.Name, e.Type)
if kind == "Job" {
return waitForJob(e, info.Name)
return c.waitForJob(e, info.Name)
}
return true, nil
case watch.Deleted:
log.Printf("Deleted event for %s", info.Name)
c.Log("Deleted event for %s", info.Name)
return true, nil
case watch.Error:
// Handle error and return with an error.
log.Printf("Error event for %s", info.Name)
c.Log("Error event for %s", info.Name)
return true, fmt.Errorf("Failed to deploy %s", info.Name)
default:
return false, nil
@ -529,7 +532,7 @@ func (c *Client) AsVersionedObject(obj runtime.Object) (runtime.Object, error) {
// waitForJob is a helper that waits for a job to complete.
//
// This operates on an event returned from a watcher.
func waitForJob(e watch.Event, name string) (bool, error) {
func (c *Client) waitForJob(e watch.Event, name string) (bool, error) {
o, ok := e.Object.(*batchinternal.Job)
if !ok {
return true, fmt.Errorf("Expected %s to be a *batch.Job, got %T", name, e.Object)
@ -543,7 +546,7 @@ func waitForJob(e watch.Event, name string) (bool, error) {
}
}
log.Printf("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", name, o.Status.Active, o.Status.Failed, o.Status.Succeeded)
c.Log("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", name, o.Status.Active, o.Status.Failed, o.Status.Succeeded)
return false, nil
}
@ -574,7 +577,7 @@ func (c *Client) WaitAndGetCompletedPodPhase(namespace string, reader io.Reader,
return api.PodUnknown, fmt.Errorf("%s is not a Pod", info.Name)
}
if err := watchPodUntilComplete(timeout, info); err != nil {
if err := c.watchPodUntilComplete(timeout, info); err != nil {
return api.PodUnknown, err
}
@ -586,13 +589,13 @@ func (c *Client) WaitAndGetCompletedPodPhase(namespace string, reader io.Reader,
return status, nil
}
func watchPodUntilComplete(timeout time.Duration, info *resource.Info) error {
func (c *Client) watchPodUntilComplete(timeout time.Duration, info *resource.Info) error {
w, err := resource.NewHelper(info.Client, info.Mapping).WatchSingle(info.Namespace, info.Name, info.ResourceVersion)
if err != nil {
return err
}
log.Printf("Watching pod %s for completion with timeout of %v", info.Name, timeout)
c.Log("Watching pod %s for completion with timeout of %v", info.Name, timeout)
_, err = watch.Until(timeout, w, func(e watch.Event) (bool, error) {
return conditions.PodCompleted(e)
})

@ -138,6 +138,12 @@ func encodeAndMarshalEvent(e *watch.Event) ([]byte, error) {
return json.Marshal(encodedEvent)
}
func newTestClient(f cmdutil.Factory) *Client {
c := New(nil)
c.Factory = f
return c
}
func TestUpdate(t *testing.T) {
listA := newPodList("starfish", "otter", "squid")
listB := newPodList("starfish", "otter", "dolphin")
@ -186,7 +192,7 @@ func TestUpdate(t *testing.T) {
reaper := &fakeReaper{}
rf := &fakeReaperFactory{Factory: f, reaper: reaper}
c := &Client{Factory: rf}
c := newTestClient(rf)
if err := c.Update(api.NamespaceDefault, objBody(codec, &listA), objBody(codec, &listB), false, 0, false); err != nil {
t.Fatal(err)
}
@ -251,7 +257,7 @@ func TestBuild(t *testing.T) {
for _, tt := range tests {
f, tf, _, _ := cmdtesting.NewAPIFactory()
c := &Client{Factory: f}
c := newTestClient(f)
if tt.swaggerFile != "" {
data, err := ioutil.ReadFile(tt.swaggerFile)
if err != nil {
@ -320,7 +326,7 @@ func TestGet(t *testing.T) {
}
}),
}
c := &Client{Factory: f}
c := newTestClient(f)
// Test Success
data := strings.NewReader("kind: Pod\napiVersion: v1\nmetadata:\n name: otter")
@ -380,7 +386,7 @@ func TestPerform(t *testing.T) {
}
f, tf, _, _ := cmdtesting.NewAPIFactory()
c := &Client{Factory: f}
c := newTestClient(f)
if tt.swaggerFile != "" {
data, err := ioutil.ReadFile(tt.swaggerFile)
if err != nil {
@ -464,7 +470,7 @@ func TestWaitAndGetCompletedPodPhase(t *testing.T) {
}),
}
c := &Client{Factory: f}
c := newTestClient(f)
phase, err := c.WaitAndGetCompletedPodPhase("test", objBody(codec, &testPodList), 1*time.Second)
if (err != nil) != tt.err {

@ -22,7 +22,6 @@ import (
"encoding/base64"
"fmt"
"io/ioutil"
"log"
"strconv"
"strings"
"time"
@ -51,12 +50,16 @@ var magicGzip = []byte{0x1f, 0x8b, 0x08}
// ConfigMapsInterface.
type ConfigMaps struct {
impl internalversion.ConfigMapInterface
Log func(string, ...interface{})
}
// NewConfigMaps initializes a new ConfigMaps wrapping an implmenetation of
// the kubernetes ConfigMapsInterface.
func NewConfigMaps(impl internalversion.ConfigMapInterface) *ConfigMaps {
return &ConfigMaps{impl: impl}
return &ConfigMaps{
impl: impl,
Log: func(_ string, _ ...interface{}) {},
}
}
// Name returns the name of the driver.
@ -74,13 +77,13 @@ func (cfgmaps *ConfigMaps) Get(key string) (*rspb.Release, error) {
return nil, ErrReleaseNotFound(key)
}
logerrf(err, "get: failed to get %q", key)
cfgmaps.Log("get: failed to get %q: %s", key, err)
return nil, err
}
// found the configmap, decode the base64 data string
r, err := decodeRelease(obj.Data["release"])
if err != nil {
logerrf(err, "get: failed to decode data %q", key)
cfgmaps.Log("get: failed to decode data %q: %s", key, err)
return nil, err
}
// return the release object
@ -96,7 +99,7 @@ func (cfgmaps *ConfigMaps) List(filter func(*rspb.Release) bool) ([]*rspb.Releas
list, err := cfgmaps.impl.List(opts)
if err != nil {
logerrf(err, "list: failed to list")
cfgmaps.Log("list: failed to list: %s", err)
return nil, err
}
@ -107,7 +110,7 @@ func (cfgmaps *ConfigMaps) List(filter func(*rspb.Release) bool) ([]*rspb.Releas
for _, item := range list.Items {
rls, err := decodeRelease(item.Data["release"])
if err != nil {
logerrf(err, "list: failed to decode release: %v", item)
cfgmaps.Log("list: failed to decode release: %v: %s", item, err)
continue
}
if filter(rls) {
@ -132,7 +135,7 @@ func (cfgmaps *ConfigMaps) Query(labels map[string]string) ([]*rspb.Release, err
list, err := cfgmaps.impl.List(opts)
if err != nil {
logerrf(err, "query: failed to query with labels")
cfgmaps.Log("query: failed to query with labels: %s", err)
return nil, err
}
@ -144,7 +147,7 @@ func (cfgmaps *ConfigMaps) Query(labels map[string]string) ([]*rspb.Release, err
for _, item := range list.Items {
rls, err := decodeRelease(item.Data["release"])
if err != nil {
logerrf(err, "query: failed to decode release: %s", err)
cfgmaps.Log("query: failed to decode release: %s", err)
continue
}
results = append(results, rls)
@ -164,7 +167,7 @@ func (cfgmaps *ConfigMaps) Create(key string, rls *rspb.Release) error {
// create a new configmap to hold the release
obj, err := newConfigMapsObject(key, rls, lbs)
if err != nil {
logerrf(err, "create: failed to encode release %q", rls.Name)
cfgmaps.Log("create: failed to encode release %q: %s", rls.Name, err)
return err
}
// push the configmap object out into the kubiverse
@ -173,7 +176,7 @@ func (cfgmaps *ConfigMaps) Create(key string, rls *rspb.Release) error {
return ErrReleaseExists(rls.Name)
}
logerrf(err, "create: failed to create")
cfgmaps.Log("create: failed to create: %s", err)
return err
}
return nil
@ -191,13 +194,13 @@ func (cfgmaps *ConfigMaps) Update(key string, rls *rspb.Release) error {
// create a new configmap object to hold the release
obj, err := newConfigMapsObject(key, rls, lbs)
if err != nil {
logerrf(err, "update: failed to encode release %q", rls.Name)
cfgmaps.Log("update: failed to encode release %q: %s", rls.Name, err)
return err
}
// push the configmap object out into the kubiverse
_, err = cfgmaps.impl.Update(obj)
if err != nil {
logerrf(err, "update: failed to update")
cfgmaps.Log("update: failed to update: %s", err)
return err
}
return nil
@ -211,7 +214,7 @@ func (cfgmaps *ConfigMaps) Delete(key string) (rls *rspb.Release, err error) {
return nil, ErrReleaseExists(rls.Name)
}
logerrf(err, "delete: failed to get release %q", key)
cfgmaps.Log("delete: failed to get release %q: %s", key, err)
return nil, err
}
// delete the release
@ -316,8 +319,3 @@ func decodeRelease(data string) (*rspb.Release, error) {
}
return &rls, nil
}
// logerrf wraps an error with a formatted string (used for debugging)
func logerrf(err error, format string, args ...interface{}) {
log.Printf("configmaps: %s: %s\n", fmt.Sprintf(format, args...), err)
}

@ -18,7 +18,6 @@ package storage // import "k8s.io/helm/pkg/storage"
import (
"fmt"
"log"
"sync"
rspb "k8s.io/helm/pkg/proto/hapi/release"
@ -34,13 +33,15 @@ type Storage struct {
releaseLocks map[string]*sync.Mutex
// releaseLocksLock is a mutex for accessing releaseLocks
releaseLocksLock *sync.Mutex
Log func(string, ...interface{})
}
// Get retrieves the release from storage. An error is returned
// if the storage driver failed to fetch the release, or the
// release identified by the key, version pair does not exist.
func (s *Storage) Get(name string, version int32) (*rspb.Release, error) {
log.Printf("Getting release %q (v%d) from storage\n", name, version)
s.Log("Getting release %q", makeKey(name, version))
return s.Driver.Get(makeKey(name, version))
}
@ -48,7 +49,7 @@ func (s *Storage) Get(name string, version int32) (*rspb.Release, error) {
// error is returned if the storage driver failed to store the
// release, or a release with identical an key already exists.
func (s *Storage) Create(rls *rspb.Release) error {
log.Printf("Create release %q (v%d) in storage\n", rls.Name, rls.Version)
s.Log("Creating release %q", makeKey(rls.Name, rls.Version))
return s.Driver.Create(makeKey(rls.Name, rls.Version), rls)
}
@ -56,7 +57,7 @@ func (s *Storage) Create(rls *rspb.Release) error {
// storage backend fails to update the release or if the release
// does not exist.
func (s *Storage) Update(rls *rspb.Release) error {
log.Printf("Updating %q (v%d) in storage\n", rls.Name, rls.Version)
s.Log("Updating release %q", makeKey(rls.Name, rls.Version))
return s.Driver.Update(makeKey(rls.Name, rls.Version), rls)
}
@ -64,21 +65,21 @@ func (s *Storage) Update(rls *rspb.Release) error {
// the storage backend fails to delete the release or if the release
// does not exist.
func (s *Storage) Delete(name string, version int32) (*rspb.Release, error) {
log.Printf("Deleting release %q (v%d) from storage\n", name, version)
s.Log("Deleting release %q", makeKey(name, version))
return s.Driver.Delete(makeKey(name, version))
}
// ListReleases returns all releases from storage. An error is returned if the
// storage backend fails to retrieve the releases.
func (s *Storage) ListReleases() ([]*rspb.Release, error) {
log.Println("Listing all releases in storage")
s.Log("Listing all releases in storage")
return s.Driver.List(func(_ *rspb.Release) bool { return true })
}
// ListDeleted returns all releases with Status == DELETED. An error is returned
// if the storage backend fails to retrieve the releases.
func (s *Storage) ListDeleted() ([]*rspb.Release, error) {
log.Println("List deleted releases in storage")
s.Log("Listing deleted releases in storage")
return s.Driver.List(func(rls *rspb.Release) bool {
return relutil.StatusFilter(rspb.Status_DELETED).Check(rls)
})
@ -87,7 +88,7 @@ func (s *Storage) ListDeleted() ([]*rspb.Release, error) {
// ListDeployed returns all releases with Status == DEPLOYED. An error is returned
// if the storage backend fails to retrieve the releases.
func (s *Storage) ListDeployed() ([]*rspb.Release, error) {
log.Println("Listing all deployed releases in storage")
s.Log("Listing all deployed releases in storage")
return s.Driver.List(func(rls *rspb.Release) bool {
return relutil.StatusFilter(rspb.Status_DEPLOYED).Check(rls)
})
@ -97,7 +98,7 @@ func (s *Storage) ListDeployed() ([]*rspb.Release, error) {
// (filter0 && filter1 && ... && filterN), i.e. a Release is included in the results
// if and only if all filters return true.
func (s *Storage) ListFilterAll(fns ...relutil.FilterFunc) ([]*rspb.Release, error) {
log.Println("Listing all releases with filter")
s.Log("Listing all releases with filter")
return s.Driver.List(func(rls *rspb.Release) bool {
return relutil.All(fns...).Check(rls)
})
@ -107,7 +108,7 @@ func (s *Storage) ListFilterAll(fns ...relutil.FilterFunc) ([]*rspb.Release, err
// (filter0 || filter1 || ... || filterN), i.e. a Release is included in the results
// if at least one of the filters returns true.
func (s *Storage) ListFilterAny(fns ...relutil.FilterFunc) ([]*rspb.Release, error) {
log.Println("Listing any releases with filter")
s.Log("Listing any releases with filter")
return s.Driver.List(func(rls *rspb.Release) bool {
return relutil.Any(fns...).Check(rls)
})
@ -116,7 +117,7 @@ func (s *Storage) ListFilterAny(fns ...relutil.FilterFunc) ([]*rspb.Release, err
// Deployed returns the deployed release with the provided release name, or
// returns ErrReleaseNotFound if not found.
func (s *Storage) Deployed(name string) (*rspb.Release, error) {
log.Printf("Getting deployed release from '%s' history\n", name)
s.Log("Getting deployed release from %q history", name)
ls, err := s.Driver.Query(map[string]string{
"NAME": name,
@ -127,7 +128,7 @@ func (s *Storage) Deployed(name string) (*rspb.Release, error) {
case err != nil:
return nil, err
case len(ls) == 0:
return nil, fmt.Errorf("'%s' has no deployed releases", name)
return nil, fmt.Errorf("%q has no deployed releases", name)
default:
return ls[0], nil
}
@ -136,17 +137,14 @@ func (s *Storage) Deployed(name string) (*rspb.Release, error) {
// History returns the revision history for the release with the provided name, or
// returns ErrReleaseNotFound if no such release name exists.
func (s *Storage) History(name string) ([]*rspb.Release, error) {
log.Printf("Getting release history for '%s'\n", name)
s.Log("Getting release history for %q", name)
l, err := s.Driver.Query(map[string]string{"NAME": name, "OWNER": "TILLER"})
if err != nil {
return nil, err
}
return l, nil
return s.Driver.Query(map[string]string{"NAME": name, "OWNER": "TILLER"})
}
// Last fetches the last revision of the named release.
func (s *Storage) Last(name string) (*rspb.Release, error) {
s.Log("Getting last revision of %q", name)
h, err := s.History(name)
if err != nil {
return nil, err
@ -180,7 +178,7 @@ func (s *Storage) LockRelease(name string) error {
}
}
if !found {
return fmt.Errorf("Unable to lock release %s: release not found", name)
return fmt.Errorf("Unable to lock release %q: release not found", name)
}
lock = &sync.Mutex{}
@ -222,5 +220,6 @@ func Init(d driver.Driver) *Storage {
Driver: d,
releaseLocks: make(map[string]*sync.Mutex),
releaseLocksLock: &sync.Mutex{},
Log: func(_ string, _ ...interface{}) {},
}
}

@ -235,6 +235,5 @@ func New() *Environment {
return &Environment{
EngineYard: ey,
Releases: storage.Init(driver.NewMemory()),
KubeClient: kube.New(nil),
}
}

@ -20,7 +20,6 @@ import (
"bytes"
"errors"
"fmt"
"log"
"path"
"regexp"
"strings"
@ -84,6 +83,7 @@ type ReleaseServer struct {
ReleaseModule
env *environment.Environment
clientset internalclientset.Interface
Log func(string, ...interface{})
}
// NewReleaseServer creates a new release server.
@ -101,6 +101,7 @@ func NewReleaseServer(env *environment.Environment, clientset internalclientset.
env: env,
clientset: clientset,
ReleaseModule: releaseModule,
Log: func(_ string, _ ...interface{}) {},
}
}
@ -268,7 +269,7 @@ func (s *ReleaseServer) GetReleaseStatus(c ctx.Context, req *services.GetRelease
// Skip errors if this is already deleted or failed.
return statusResp, nil
} else if err != nil {
log.Printf("warning: Get for %s failed: %v", rel.Name, err)
s.Log("warning: Get for %s failed: %v", rel.Name, err)
return nil, err
}
rel.Info.Status.Resources = resp
@ -321,7 +322,7 @@ func (s *ReleaseServer) performUpdate(originalRelease, updatedRelease *release.R
res := &services.UpdateReleaseResponse{Release: updatedRelease}
if req.DryRun {
log.Printf("Dry run for %s", updatedRelease.Name)
s.Log("Dry run for %s", updatedRelease.Name)
res.Release.Info.Description = "Dry run complete"
return res, nil
}
@ -334,7 +335,7 @@ func (s *ReleaseServer) performUpdate(originalRelease, updatedRelease *release.R
}
if err := s.ReleaseModule.Update(originalRelease, updatedRelease, req, s.env); err != nil {
msg := fmt.Sprintf("Upgrade %q failed: %s", updatedRelease.Name, err)
log.Printf("warning: %s", msg)
s.Log("warning: %s", msg)
originalRelease.Info.Status.Code = release.Status_SUPERSEDED
updatedRelease.Info.Status.Code = release.Status_FAILED
updatedRelease.Info.Description = msg
@ -370,19 +371,19 @@ func (s *ReleaseServer) performUpdate(originalRelease, updatedRelease *release.R
func (s *ReleaseServer) reuseValues(req *services.UpdateReleaseRequest, current *release.Release) error {
if req.ResetValues {
// If ResetValues is set, we comletely ignore current.Config.
log.Print("Reset values to the chart's original version.")
s.Log("Reset values to the chart's original version.")
return nil
}
// If the ReuseValues flag is set, we always copy the old values over the new config's values.
if req.ReuseValues {
log.Print("Reusing the old release's values")
s.Log("Reusing the old release's values")
// We have to regenerate the old coalesced values:
oldVals, err := chartutil.CoalesceValues(current.Chart, current.Config)
if err != nil {
err := fmt.Errorf("failed to rebuild old values: %s", err)
log.Print(err)
s.Log("%s", err)
return err
}
nv, err := oldVals.YAML()
@ -399,7 +400,7 @@ func (s *ReleaseServer) reuseValues(req *services.UpdateReleaseRequest, current
current.Config != nil &&
current.Config.Raw != "" &&
current.Config.Raw != "{}\n" {
log.Printf("Copying values from %s (v%d) to new release.", current.Name, current.Version)
s.Log("Copying values from %s (v%d) to new release.", current.Name, current.Version)
req.Values = current.Config
}
return nil
@ -508,7 +509,7 @@ func (s *ReleaseServer) performRollback(currentRelease, targetRelease *release.R
res := &services.RollbackReleaseResponse{Release: targetRelease}
if req.DryRun {
log.Printf("Dry run for %s", targetRelease.Name)
s.Log("Dry run for %s", targetRelease.Name)
return res, nil
}
@ -521,7 +522,7 @@ func (s *ReleaseServer) performRollback(currentRelease, targetRelease *release.R
if err := s.ReleaseModule.Rollback(currentRelease, targetRelease, req, s.env); err != nil {
msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err)
log.Printf("warning: %s", msg)
s.Log("warning: %s", msg)
currentRelease.Info.Status.Code = release.Status_SUPERSEDED
targetRelease.Info.Status.Code = release.Status_FAILED
targetRelease.Info.Description = msg
@ -565,7 +566,7 @@ func (s *ReleaseServer) prepareRollback(req *services.RollbackReleaseRequest) (*
rbv = crls.Version - 1
}
log.Printf("rolling back %s (current: v%d, target: v%d)", req.Name, crls.Version, rbv)
s.Log("rolling back %s (current: v%d, target: v%d)", req.Name, crls.Version, rbv)
prls, err := s.env.Releases.Get(req.Name, rbv)
if err != nil {
@ -617,7 +618,7 @@ func (s *ReleaseServer) uniqName(start string, reuse bool) (string, error) {
if st := rel.Info.Status.Code; reuse && (st == release.Status_DELETED || st == release.Status_FAILED) {
// Allowe re-use of names if the previous release is marked deleted.
log.Printf("reusing name %q", start)
s.Log("reusing name %q", start)
return start, nil
} else if reuse {
return "", errors.New("cannot re-use a name that is still in use")
@ -636,9 +637,9 @@ func (s *ReleaseServer) uniqName(start string, reuse bool) (string, error) {
if _, err := s.env.Releases.Get(name, 1); strings.Contains(err.Error(), "not found") {
return name, nil
}
log.Printf("info: Name %q is taken. Searching again.", name)
s.Log("info: Name %q is taken. Searching again.", name)
}
log.Printf("warning: No available release names found after %d tries", maxTries)
s.Log("warning: No available release names found after %d tries", maxTries)
return "ERROR", errors.New("no available release name found")
}
@ -648,7 +649,7 @@ func (s *ReleaseServer) engine(ch *chart.Chart) environment.Engine {
if r, ok := s.env.EngineYard.Get(ch.Metadata.Engine); ok {
renderer = r
} else {
log.Printf("warning: %s requested non-existent template engine %s", ch.Metadata.Name, ch.Metadata.Engine)
s.Log("warning: %s requested non-existent template engine %s", ch.Metadata.Name, ch.Metadata.Engine)
}
}
return renderer
@ -658,7 +659,7 @@ func (s *ReleaseServer) engine(ch *chart.Chart) environment.Engine {
func (s *ReleaseServer) InstallRelease(c ctx.Context, req *services.InstallReleaseRequest) (*services.InstallReleaseResponse, error) {
rel, err := s.prepareRelease(req)
if err != nil {
log.Printf("Failed install prepare step: %s", err)
s.Log("Failed install prepare step: %s", err)
res := &services.InstallReleaseResponse{Release: rel}
// On dry run, append the manifest contents to a failed release. This is
@ -671,7 +672,7 @@ func (s *ReleaseServer) InstallRelease(c ctx.Context, req *services.InstallRelea
res, err := s.performRelease(rel, req)
if err != nil {
log.Printf("Failed install perform step: %s", err)
s.Log("Failed install perform step: %s", err)
}
return res, err
}
@ -854,10 +855,10 @@ func (s *ReleaseServer) renderResources(ch *chart.Chart, values chartutil.Values
func (s *ReleaseServer) recordRelease(r *release.Release, reuse bool) {
if reuse {
if err := s.env.Releases.Update(r); err != nil {
log.Printf("warning: Failed to update release %q: %s", r.Name, err)
s.Log("warning: Failed to update release %q: %s", r.Name, err)
}
} else if err := s.env.Releases.Create(r); err != nil {
log.Printf("warning: Failed to record release %q: %s", r.Name, err)
s.Log("warning: Failed to record release %q: %s", r.Name, err)
}
}
@ -866,7 +867,7 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install
res := &services.InstallReleaseResponse{Release: r}
if req.DryRun {
log.Printf("Dry run for %s", r.Name)
s.Log("Dry run for %s", r.Name)
res.Release.Info.Description = "Dry run complete"
return res, nil
}
@ -901,7 +902,7 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install
}
if err := s.ReleaseModule.Update(old, r, updateReq, s.env); err != nil {
msg := fmt.Sprintf("Release replace %q failed: %s", r.Name, err)
log.Printf("warning: %s", msg)
s.Log("warning: %s", msg)
old.Info.Status.Code = release.Status_SUPERSEDED
r.Info.Status.Code = release.Status_FAILED
r.Info.Description = msg
@ -915,7 +916,7 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install
// regular manifests
if err := s.ReleaseModule.Create(r, req, s.env); err != nil {
msg := fmt.Sprintf("Release %q failed: %s", r.Name, err)
log.Printf("warning: %s", msg)
s.Log("warning: %s", msg)
r.Info.Status.Code = release.Status_FAILED
r.Info.Description = msg
s.recordRelease(r, false)
@ -927,7 +928,7 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install
if !req.DisableHooks {
if err := s.execHook(r.Hooks, r.Name, r.Namespace, hooks.PostInstall, req.Timeout); err != nil {
msg := fmt.Sprintf("Release %q failed post-install: %s", r.Name, err)
log.Printf("warning: %s", msg)
s.Log("warning: %s", msg)
r.Info.Status.Code = release.Status_FAILED
r.Info.Description = msg
s.recordRelease(r, false)
@ -956,7 +957,7 @@ func (s *ReleaseServer) execHook(hs []*release.Hook, name, namespace, hook strin
return fmt.Errorf("unknown hook %q", hook)
}
log.Printf("Executing %s hooks for %s", hook, name)
s.Log("Executing %s hooks for %s", hook, name)
executingHooks := []*release.Hook{}
for _, h := range hs {
for _, e := range h.Events {
@ -972,20 +973,20 @@ func (s *ReleaseServer) execHook(hs []*release.Hook, name, namespace, hook strin
b := bytes.NewBufferString(h.Manifest)
if err := kubeCli.Create(namespace, b, timeout, false); err != nil {
log.Printf("warning: Release %q %s %s failed: %s", name, hook, h.Path, err)
s.Log("warning: Release %q %s %s failed: %s", name, hook, h.Path, err)
return err
}
// No way to rewind a bytes.Buffer()?
b.Reset()
b.WriteString(h.Manifest)
if err := kubeCli.WatchUntilReady(namespace, b, timeout, false); err != nil {
log.Printf("warning: Release %q %s %s could not complete: %s", name, hook, h.Path, err)
s.Log("warning: Release %q %s %s could not complete: %s", name, hook, h.Path, err)
return err
}
h.LastRun = timeconv.Now()
}
log.Printf("Hooks complete for %s %s", hook, name)
s.Log("Hooks complete for %s %s", hook, name)
return nil
}
@ -1007,7 +1008,7 @@ func (s *ReleaseServer) UninstallRelease(c ctx.Context, req *services.UninstallR
defer s.env.Releases.UnlockRelease(req.Name)
if !ValidName.MatchString(req.Name) {
log.Printf("uninstall: Release not found: %s", req.Name)
s.Log("uninstall: Release not found: %s", req.Name)
return nil, errMissingRelease
}
@ -1017,7 +1018,7 @@ func (s *ReleaseServer) UninstallRelease(c ctx.Context, req *services.UninstallR
rels, err := s.env.Releases.History(req.Name)
if err != nil {
log.Printf("uninstall: Release not loaded: %s", req.Name)
s.Log("uninstall: Release not loaded: %s", req.Name)
return nil, err
}
if len(rels) < 1 {
@ -1032,7 +1033,7 @@ func (s *ReleaseServer) UninstallRelease(c ctx.Context, req *services.UninstallR
if rel.Info.Status.Code == release.Status_DELETED {
if req.Purge {
if err := s.purgeReleases(rels...); err != nil {
log.Printf("uninstall: Failed to purge the release: %s", err)
s.Log("uninstall: Failed to purge the release: %s", err)
return nil, err
}
return &services.UninstallReleaseResponse{Release: rel}, nil
@ -1040,7 +1041,7 @@ func (s *ReleaseServer) UninstallRelease(c ctx.Context, req *services.UninstallR
return nil, fmt.Errorf("the release named %q is already deleted", req.Name)
}
log.Printf("uninstall: Deleting %s", req.Name)
s.Log("uninstall: Deleting %s", req.Name)
rel.Info.Status.Code = release.Status_DELETING
rel.Info.Deleted = timeconv.Now()
rel.Info.Description = "Deletion in progress (or silently failed)"
@ -1055,7 +1056,7 @@ func (s *ReleaseServer) UninstallRelease(c ctx.Context, req *services.UninstallR
// From here on out, the release is currently considered to be in Status_DELETING
// state.
if err := s.env.Releases.Update(rel); err != nil {
log.Printf("uninstall: Failed to store updated release: %s", err)
s.Log("uninstall: Failed to store updated release: %s", err)
}
kept, errs := s.ReleaseModule.Delete(rel, req, s.env)
@ -1063,7 +1064,7 @@ func (s *ReleaseServer) UninstallRelease(c ctx.Context, req *services.UninstallR
es := make([]string, 0, len(errs))
for _, e := range errs {
log.Printf("error: %v", e)
s.Log("error: %v", e)
es = append(es, e.Error())
}
@ -1079,13 +1080,13 @@ func (s *ReleaseServer) UninstallRelease(c ctx.Context, req *services.UninstallR
if req.Purge {
err := s.purgeReleases(rels...)
if err != nil {
log.Printf("uninstall: Failed to purge the release: %s", err)
s.Log("uninstall: Failed to purge the release: %s", err)
}
return res, err
}
if err := s.env.Releases.Update(rel); err != nil {
log.Printf("uninstall: Failed to store updated release: %s", err)
s.Log("uninstall: Failed to store updated release: %s", err)
}
if len(es) > 0 {
@ -1122,12 +1123,12 @@ func (s *ReleaseServer) RunReleaseTest(req *services.TestReleaseRequest, stream
tSuite, err := reltesting.NewTestSuite(rel)
if err != nil {
log.Printf("Error creating test suite for %s", rel.Name)
s.Log("Error creating test suite for %s", rel.Name)
return err
}
if err := tSuite.Run(testEnv); err != nil {
log.Printf("Error running test suite for %s", rel.Name)
s.Log("Error running test suite for %s", rel.Name)
return err
}
@ -1142,7 +1143,7 @@ func (s *ReleaseServer) RunReleaseTest(req *services.TestReleaseRequest, stream
}
if err := s.env.Releases.Update(rel); err != nil {
log.Printf("test: Failed to store updated release: %s", err)
s.Log("test: Failed to store updated release: %s", err)
}
return nil

@ -105,6 +105,7 @@ func rsFixture() *ReleaseServer {
},
env: MockEnvironment(),
clientset: clientset,
Log: func(_ string, _ ...interface{}) {},
}
}

Loading…
Cancel
Save