diff --git a/cmd/tiller/tiller.go b/cmd/tiller/tiller.go index e5f3a33f3..6abd35ec9 100644 --- a/cmd/tiller/tiller.go +++ b/cmd/tiller/tiller.go @@ -29,6 +29,7 @@ import ( goprom "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/spf13/cobra" + "github.com/spf13/pflag" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -69,6 +70,8 @@ var rootServer *grpc.Server // Any changes to env should be done before rootServer.Serve() is called. var env = environment.New() +var logger *log.Logger + var ( grpcAddr = ":44134" probeAddr = ":44135" @@ -93,64 +96,83 @@ Tiller is the server for Helm. It provides in-cluster resource management. By default, Tiller listens for gRPC connections on port 44134. ` -var rootCommand = &cobra.Command{ - Use: "tiller", - Short: "The Kubernetes Helm server.", - Long: globalUsage, - Run: start, +func addFlags(flags *pflag.FlagSet) { + flags.StringVarP(&grpcAddr, "listen", "l", ":44134", "address:port to listen on") + flags.StringVar(&store, "storage", storageConfigMap, "storage driver to use. One of 'configmap' or 'memory'") + flags.BoolVar(&enableTracing, "trace", false, "enable rpc tracing") + flags.BoolVar(&remoteReleaseModules, "experimental-release", false, "enable experimental release modules") + + flags.BoolVar(&tlsEnable, "tls", tlsEnableEnvVarDefault(), "enable TLS") + flags.BoolVar(&tlsVerify, "tls-verify", tlsVerifyEnvVarDefault(), "enable TLS and verify remote certificate") + flags.StringVar(&keyFile, "tls-key", tlsDefaultsFromEnv("tls-key"), "path to TLS private key file") + flags.StringVar(&certFile, "tls-cert", tlsDefaultsFromEnv("tls-cert"), "path to TLS certificate file") + flags.StringVar(&caCertFile, "tls-ca-cert", tlsDefaultsFromEnv("tls-ca-cert"), "trust certificates signed by this CA") } -func init() { - log.SetFlags(log.Flags() | log.Lshortfile) +func initLog() { + if enableTracing { + log.SetFlags(log.Lshortfile) + } + logger = newLogger("main") } func main() { - p := rootCommand.PersistentFlags() - p.StringVarP(&grpcAddr, "listen", "l", ":44134", "address:port to listen on") - p.StringVar(&store, "storage", storageConfigMap, "storage driver to use. One of 'configmap' or 'memory'") - p.BoolVar(&enableTracing, "trace", false, "enable rpc tracing") - p.BoolVar(&remoteReleaseModules, "experimental-release", false, "enable experimental release modules") - - p.BoolVar(&tlsEnable, "tls", tlsEnableEnvVarDefault(), "enable TLS") - p.BoolVar(&tlsVerify, "tls-verify", tlsVerifyEnvVarDefault(), "enable TLS and verify remote certificate") - p.StringVar(&keyFile, "tls-key", tlsDefaultsFromEnv("tls-key"), "path to TLS private key file") - p.StringVar(&certFile, "tls-cert", tlsDefaultsFromEnv("tls-cert"), "path to TLS certificate file") - p.StringVar(&caCertFile, "tls-ca-cert", tlsDefaultsFromEnv("tls-ca-cert"), "trust certificates signed by this CA") - - if err := rootCommand.Execute(); err != nil { - fmt.Fprint(os.Stderr, err) - os.Exit(1) + root := &cobra.Command{ + Use: "tiller", + Short: "The Kubernetes Helm server.", + Long: globalUsage, + Run: start, + PreRun: func(_ *cobra.Command, _ []string) { + initLog() + }, + } + addFlags(root.Flags()) + + if err := root.Execute(); err != nil { + logger.Fatal(err) } } +func newLogger(prefix string) *log.Logger { + if len(prefix) > 0 { + prefix = fmt.Sprintf("[%s] ", prefix) + } + return log.New(os.Stderr, prefix, log.Flags()) +} + func start(c *cobra.Command, args []string) { clientset, err := kube.New(nil).ClientSet() if err != nil { - fmt.Fprintf(os.Stderr, "Cannot initialize Kubernetes connection: %s\n", err) - os.Exit(1) + logger.Fatalf("Cannot initialize Kubernetes connection: %s", err) } switch store { case storageMemory: env.Releases = storage.Init(driver.NewMemory()) case storageConfigMap: - env.Releases = storage.Init(driver.NewConfigMaps(clientset.Core().ConfigMaps(namespace()))) + cfgmaps := driver.NewConfigMaps(clientset.Core().ConfigMaps(namespace())) + cfgmaps.Log = newLogger("storage/driver").Printf + + env.Releases = storage.Init(cfgmaps) + env.Releases.Log = newLogger("storage").Printf } + kubeClient := kube.New(nil) + kubeClient.Log = newLogger("kube").Printf + env.KubeClient = kubeClient + if tlsEnable || tlsVerify { opts := tlsutil.Options{CertFile: certFile, KeyFile: keyFile} if tlsVerify { opts.CaCertFile = caCertFile } - } var opts []grpc.ServerOption if tlsEnable || tlsVerify { cfg, err := tlsutil.ServerConfig(tlsOptions()) if err != nil { - fmt.Fprintf(os.Stderr, "Could not create server TLS configuration: %v\n", err) - os.Exit(1) + logger.Fatalf("Could not create server TLS configuration: %v", err) } opts = append(opts, grpc.Creds(credentials.NewTLS(cfg))) } @@ -159,14 +181,13 @@ func start(c *cobra.Command, args []string) { lstn, err := net.Listen("tcp", grpcAddr) if err != nil { - fmt.Fprintf(os.Stderr, "Server died: %s\n", err) - os.Exit(1) + logger.Fatalf("Server died: %s", err) } - fmt.Printf("Starting Tiller %s (tls=%t)\n", version.GetVersion(), tlsEnable || tlsVerify) - fmt.Printf("GRPC listening on %s\n", grpcAddr) - fmt.Printf("Probes listening on %s\n", probeAddr) - fmt.Printf("Storage driver is %s\n", env.Releases.Name()) + logger.Printf("Starting Tiller %s (tls=%t)", version.GetVersion(), tlsEnable || tlsVerify) + logger.Printf("GRPC listening on %s", grpcAddr) + logger.Printf("Probes listening on %s", probeAddr) + logger.Printf("Storage driver is %s", env.Releases.Name()) if enableTracing { startTracing(traceAddr) @@ -176,6 +197,7 @@ func start(c *cobra.Command, args []string) { probeErrCh := make(chan error) go func() { svc := tiller.NewReleaseServer(env, clientset, remoteReleaseModules) + svc.Log = newLogger("tiller").Printf services.RegisterReleaseServiceServer(rootServer, svc) if err := rootServer.Serve(lstn); err != nil { srvErrCh <- err @@ -196,10 +218,9 @@ func start(c *cobra.Command, args []string) { select { case err := <-srvErrCh: - fmt.Fprintf(os.Stderr, "Server died: %s\n", err) - os.Exit(1) + logger.Fatalf("Server died: %s", err) case err := <-probeErrCh: - fmt.Fprintf(os.Stderr, "Probes server died: %s\n", err) + logger.Printf("Probes server died: %s", err) } } diff --git a/cmd/tiller/trace.go b/cmd/tiller/trace.go index b9e0583f2..71d7e8f72 100644 --- a/cmd/tiller/trace.go +++ b/cmd/tiller/trace.go @@ -17,8 +17,6 @@ limitations under the License. package main // import "k8s.io/helm/cmd/tiller" import ( - "fmt" - "log" "net/http" _ "net/http/pprof" @@ -27,7 +25,7 @@ import ( ) func startTracing(addr string) { - fmt.Printf("Tracing server is listening on %s\n", addr) + logger.Printf("Tracing server is listening on %s\n", addr) grpc.EnableTracing = true http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { @@ -41,7 +39,7 @@ func startTracing(addr string) { go func() { if err := http.ListenAndServe(addr, nil); err != nil { - log.Printf("tracing error: %s", err) + logger.Printf("tracing error: %s", err) } }() } diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 96db3d9c7..515b34418 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -59,6 +59,8 @@ type Client struct { cmdutil.Factory // SchemaCacheDir is the path for loading cached schema. SchemaCacheDir string + + Log func(string, ...interface{}) } // New create a new Client @@ -66,6 +68,7 @@ func New(config clientcmd.ClientConfig) *Client { return &Client{ Factory: cmdutil.NewFactory(config), SchemaCacheDir: clientcmd.RecommendedSchemaFile, + Log: func(_ string, _ ...interface{}) {}, } } @@ -99,7 +102,7 @@ func (c *Client) Create(namespace string, reader io.Reader, timeout int64, shoul func (c *Client) newBuilder(namespace string, reader io.Reader) *resource.Result { schema, err := c.Validator(true, c.SchemaCacheDir) if err != nil { - log.Printf("warning: failed to load schema: %s", err) + c.Log("warning: failed to load schema: %s", err) } return c.NewBuilder(). ContinueOnError(). @@ -115,12 +118,12 @@ func (c *Client) newBuilder(namespace string, reader io.Reader) *resource.Result func (c *Client) BuildUnstructured(namespace string, reader io.Reader) (Result, error) { schema, err := c.Validator(true, c.SchemaCacheDir) if err != nil { - log.Printf("warning: failed to load schema: %s", err) + c.Log("warning: failed to load schema: %s", err) } mapper, typer, err := c.UnstructuredObject() if err != nil { - log.Printf("failed to load mapper: %s", err) + c.Log("failed to load mapper: %s", err) return nil, err } var result Result @@ -155,9 +158,9 @@ func (c *Client) Get(namespace string, reader io.Reader) (string, error) { } missing := []string{} err = perform(infos, func(info *resource.Info) error { - log.Printf("Doing get for %s: %q", info.Mapping.GroupVersionKind.Kind, info.Name) + c.Log("Doing get for %s: %q", info.Mapping.GroupVersionKind.Kind, info.Name) if err := info.Get(); err != nil { - log.Printf("WARNING: Failed Get for resource %q: %s", info.Name, err) + c.Log("WARNING: Failed Get for resource %q: %s", info.Name, err) missing = append(missing, fmt.Sprintf("%v\t\t%s", info.Mapping.Resource, info.Name)) return nil } @@ -185,7 +188,7 @@ func (c *Client) Get(namespace string, reader io.Reader) (string, error) { } for _, o := range ot { if err := p.PrintObj(o, buf); err != nil { - log.Printf("failed to print object type %s, object: %q :\n %v", t, o, err) + c.Log("failed to print object type %s, object: %q :\n %v", t, o, err) return "", err } } @@ -238,7 +241,7 @@ func (c *Client) Update(namespace string, originalReader, targetReader io.Reader } kind := info.Mapping.GroupVersionKind.Kind - log.Printf("Created a new %s called %q\n", kind, info.Name) + c.Log("Created a new %s called %q\n", kind, info.Name) return nil } @@ -248,7 +251,7 @@ func (c *Client) Update(namespace string, originalReader, targetReader io.Reader } if err := updateResource(c, info, originalInfo.Object, recreate); err != nil { - log.Printf("error updating the resource %q:\n\t %v", info.Name, err) + c.Log("error updating the resource %q:\n\t %v", info.Name, err) updateErrors = append(updateErrors, err.Error()) } @@ -263,9 +266,9 @@ func (c *Client) Update(namespace string, originalReader, targetReader io.Reader } for _, info := range original.Difference(target) { - log.Printf("Deleting %q in %s...", info.Name, info.Namespace) + c.Log("Deleting %q in %s...", info.Name, info.Namespace) if err := deleteResource(c, info); err != nil { - log.Printf("Failed to delete %q, err: %s", info.Name, err) + c.Log("Failed to delete %q, err: %s", info.Name, err) } } if shouldWait { @@ -283,23 +286,23 @@ func (c *Client) Delete(namespace string, reader io.Reader) error { return err } return perform(infos, func(info *resource.Info) error { - log.Printf("Starting delete for %q %s", info.Name, info.Mapping.GroupVersionKind.Kind) + c.Log("Starting delete for %q %s", info.Name, info.Mapping.GroupVersionKind.Kind) err := deleteResource(c, info) - return skipIfNotFound(err) + return c.skipIfNotFound(err) }) } -func skipIfNotFound(err error) error { +func (c *Client) skipIfNotFound(err error) error { if errors.IsNotFound(err) { - log.Printf("%v", err) + c.Log("%v", err) return nil } return err } -func watchTimeout(t time.Duration) ResourceActorFunc { +func (c *Client) watchTimeout(t time.Duration) ResourceActorFunc { return func(info *resource.Info) error { - return watchUntilReady(t, info) + return c.watchUntilReady(t, info) } } @@ -322,7 +325,7 @@ func (c *Client) WatchUntilReady(namespace string, reader io.Reader, timeout int } // For jobs, there's also the option to do poll c.Jobs(namespace).Get(): // https://github.com/adamreese/kubernetes/blob/master/test/e2e/job.go#L291-L300 - return perform(infos, watchTimeout(time.Duration(timeout)*time.Second)) + return perform(infos, c.watchTimeout(time.Duration(timeout)*time.Second)) } func perform(infos Result, fn ResourceActorFunc) error { @@ -355,7 +358,7 @@ func deleteResource(c *Client, info *resource.Info) error { } return err } - log.Printf("Using reaper for deleting %q", info.Name) + c.Log("Using reaper for deleting %q", info.Name) return reaper.Stop(info.Namespace, info.Name, 0, nil) } @@ -395,7 +398,7 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, return fmt.Errorf("failed to create patch: %s", err) } if patch == nil { - log.Printf("Looks like there are no changes for %s %q", target.Mapping.GroupVersionKind.Kind, target.Name) + c.Log("Looks like there are no changes for %s %q", target.Mapping.GroupVersionKind.Kind, target.Name) // This needs to happen to make sure that tiller has the latest info from the API // Otherwise there will be no labels and other functions that use labels will panic if err := target.Get(); err != nil { @@ -445,7 +448,7 @@ func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, // Restart pods for _, pod := range pods.Items { - log.Printf("Restarting pod: %v/%v", pod.Namespace, pod.Name) + c.Log("Restarting pod: %v/%v", pod.Namespace, pod.Name) // Delete each pod for get them restarted with changed spec. if err := client.Core().Pods(pod.Namespace).Delete(pod.Name, metav1.NewPreconditionDeleteOptions(string(pod.UID))); err != nil { @@ -474,14 +477,14 @@ func getSelectorFromObject(obj runtime.Object) (map[string]string, error) { } } -func watchUntilReady(timeout time.Duration, info *resource.Info) error { +func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error { w, err := resource.NewHelper(info.Client, info.Mapping).WatchSingle(info.Namespace, info.Name, info.ResourceVersion) if err != nil { return err } kind := info.Mapping.GroupVersionKind.Kind - log.Printf("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeout) + c.Log("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeout) // What we watch for depends on the Kind. // - For a Job, we watch for completion. @@ -496,17 +499,17 @@ func watchUntilReady(timeout time.Duration, info *resource.Info) error { // we get. We care mostly about jobs, where what we want to see is // the status go into a good state. For other types, like ReplicaSet // we don't really do anything to support these as hooks. - log.Printf("Add/Modify event for %s: %v", info.Name, e.Type) + c.Log("Add/Modify event for %s: %v", info.Name, e.Type) if kind == "Job" { - return waitForJob(e, info.Name) + return c.waitForJob(e, info.Name) } return true, nil case watch.Deleted: - log.Printf("Deleted event for %s", info.Name) + c.Log("Deleted event for %s", info.Name) return true, nil case watch.Error: // Handle error and return with an error. - log.Printf("Error event for %s", info.Name) + c.Log("Error event for %s", info.Name) return true, fmt.Errorf("Failed to deploy %s", info.Name) default: return false, nil @@ -529,7 +532,7 @@ func (c *Client) AsVersionedObject(obj runtime.Object) (runtime.Object, error) { // waitForJob is a helper that waits for a job to complete. // // This operates on an event returned from a watcher. -func waitForJob(e watch.Event, name string) (bool, error) { +func (c *Client) waitForJob(e watch.Event, name string) (bool, error) { o, ok := e.Object.(*batchinternal.Job) if !ok { return true, fmt.Errorf("Expected %s to be a *batch.Job, got %T", name, e.Object) @@ -543,7 +546,7 @@ func waitForJob(e watch.Event, name string) (bool, error) { } } - log.Printf("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", name, o.Status.Active, o.Status.Failed, o.Status.Succeeded) + c.Log("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", name, o.Status.Active, o.Status.Failed, o.Status.Succeeded) return false, nil } @@ -574,7 +577,7 @@ func (c *Client) WaitAndGetCompletedPodPhase(namespace string, reader io.Reader, return api.PodUnknown, fmt.Errorf("%s is not a Pod", info.Name) } - if err := watchPodUntilComplete(timeout, info); err != nil { + if err := c.watchPodUntilComplete(timeout, info); err != nil { return api.PodUnknown, err } @@ -586,13 +589,13 @@ func (c *Client) WaitAndGetCompletedPodPhase(namespace string, reader io.Reader, return status, nil } -func watchPodUntilComplete(timeout time.Duration, info *resource.Info) error { +func (c *Client) watchPodUntilComplete(timeout time.Duration, info *resource.Info) error { w, err := resource.NewHelper(info.Client, info.Mapping).WatchSingle(info.Namespace, info.Name, info.ResourceVersion) if err != nil { return err } - log.Printf("Watching pod %s for completion with timeout of %v", info.Name, timeout) + c.Log("Watching pod %s for completion with timeout of %v", info.Name, timeout) _, err = watch.Until(timeout, w, func(e watch.Event) (bool, error) { return conditions.PodCompleted(e) }) diff --git a/pkg/kube/client_test.go b/pkg/kube/client_test.go index 1f2032bb2..26d38f4b3 100644 --- a/pkg/kube/client_test.go +++ b/pkg/kube/client_test.go @@ -138,6 +138,12 @@ func encodeAndMarshalEvent(e *watch.Event) ([]byte, error) { return json.Marshal(encodedEvent) } +func newTestClient(f cmdutil.Factory) *Client { + c := New(nil) + c.Factory = f + return c +} + func TestUpdate(t *testing.T) { listA := newPodList("starfish", "otter", "squid") listB := newPodList("starfish", "otter", "dolphin") @@ -186,7 +192,7 @@ func TestUpdate(t *testing.T) { reaper := &fakeReaper{} rf := &fakeReaperFactory{Factory: f, reaper: reaper} - c := &Client{Factory: rf} + c := newTestClient(rf) if err := c.Update(api.NamespaceDefault, objBody(codec, &listA), objBody(codec, &listB), false, 0, false); err != nil { t.Fatal(err) } @@ -251,7 +257,7 @@ func TestBuild(t *testing.T) { for _, tt := range tests { f, tf, _, _ := cmdtesting.NewAPIFactory() - c := &Client{Factory: f} + c := newTestClient(f) if tt.swaggerFile != "" { data, err := ioutil.ReadFile(tt.swaggerFile) if err != nil { @@ -320,7 +326,7 @@ func TestGet(t *testing.T) { } }), } - c := &Client{Factory: f} + c := newTestClient(f) // Test Success data := strings.NewReader("kind: Pod\napiVersion: v1\nmetadata:\n name: otter") @@ -380,7 +386,7 @@ func TestPerform(t *testing.T) { } f, tf, _, _ := cmdtesting.NewAPIFactory() - c := &Client{Factory: f} + c := newTestClient(f) if tt.swaggerFile != "" { data, err := ioutil.ReadFile(tt.swaggerFile) if err != nil { @@ -464,7 +470,7 @@ func TestWaitAndGetCompletedPodPhase(t *testing.T) { }), } - c := &Client{Factory: f} + c := newTestClient(f) phase, err := c.WaitAndGetCompletedPodPhase("test", objBody(codec, &testPodList), 1*time.Second) if (err != nil) != tt.err { diff --git a/pkg/storage/driver/cfgmaps.go b/pkg/storage/driver/cfgmaps.go index d7d48a032..1c30951f3 100644 --- a/pkg/storage/driver/cfgmaps.go +++ b/pkg/storage/driver/cfgmaps.go @@ -22,7 +22,6 @@ import ( "encoding/base64" "fmt" "io/ioutil" - "log" "strconv" "strings" "time" @@ -51,12 +50,16 @@ var magicGzip = []byte{0x1f, 0x8b, 0x08} // ConfigMapsInterface. type ConfigMaps struct { impl internalversion.ConfigMapInterface + Log func(string, ...interface{}) } // NewConfigMaps initializes a new ConfigMaps wrapping an implmenetation of // the kubernetes ConfigMapsInterface. func NewConfigMaps(impl internalversion.ConfigMapInterface) *ConfigMaps { - return &ConfigMaps{impl: impl} + return &ConfigMaps{ + impl: impl, + Log: func(_ string, _ ...interface{}) {}, + } } // Name returns the name of the driver. @@ -74,13 +77,13 @@ func (cfgmaps *ConfigMaps) Get(key string) (*rspb.Release, error) { return nil, ErrReleaseNotFound(key) } - logerrf(err, "get: failed to get %q", key) + cfgmaps.Log("get: failed to get %q: %s", key, err) return nil, err } // found the configmap, decode the base64 data string r, err := decodeRelease(obj.Data["release"]) if err != nil { - logerrf(err, "get: failed to decode data %q", key) + cfgmaps.Log("get: failed to decode data %q: %s", key, err) return nil, err } // return the release object @@ -96,7 +99,7 @@ func (cfgmaps *ConfigMaps) List(filter func(*rspb.Release) bool) ([]*rspb.Releas list, err := cfgmaps.impl.List(opts) if err != nil { - logerrf(err, "list: failed to list") + cfgmaps.Log("list: failed to list: %s", err) return nil, err } @@ -107,7 +110,7 @@ func (cfgmaps *ConfigMaps) List(filter func(*rspb.Release) bool) ([]*rspb.Releas for _, item := range list.Items { rls, err := decodeRelease(item.Data["release"]) if err != nil { - logerrf(err, "list: failed to decode release: %v", item) + cfgmaps.Log("list: failed to decode release: %v: %s", item, err) continue } if filter(rls) { @@ -132,7 +135,7 @@ func (cfgmaps *ConfigMaps) Query(labels map[string]string) ([]*rspb.Release, err list, err := cfgmaps.impl.List(opts) if err != nil { - logerrf(err, "query: failed to query with labels") + cfgmaps.Log("query: failed to query with labels: %s", err) return nil, err } @@ -144,7 +147,7 @@ func (cfgmaps *ConfigMaps) Query(labels map[string]string) ([]*rspb.Release, err for _, item := range list.Items { rls, err := decodeRelease(item.Data["release"]) if err != nil { - logerrf(err, "query: failed to decode release: %s", err) + cfgmaps.Log("query: failed to decode release: %s", err) continue } results = append(results, rls) @@ -164,7 +167,7 @@ func (cfgmaps *ConfigMaps) Create(key string, rls *rspb.Release) error { // create a new configmap to hold the release obj, err := newConfigMapsObject(key, rls, lbs) if err != nil { - logerrf(err, "create: failed to encode release %q", rls.Name) + cfgmaps.Log("create: failed to encode release %q: %s", rls.Name, err) return err } // push the configmap object out into the kubiverse @@ -173,7 +176,7 @@ func (cfgmaps *ConfigMaps) Create(key string, rls *rspb.Release) error { return ErrReleaseExists(rls.Name) } - logerrf(err, "create: failed to create") + cfgmaps.Log("create: failed to create: %s", err) return err } return nil @@ -191,13 +194,13 @@ func (cfgmaps *ConfigMaps) Update(key string, rls *rspb.Release) error { // create a new configmap object to hold the release obj, err := newConfigMapsObject(key, rls, lbs) if err != nil { - logerrf(err, "update: failed to encode release %q", rls.Name) + cfgmaps.Log("update: failed to encode release %q: %s", rls.Name, err) return err } // push the configmap object out into the kubiverse _, err = cfgmaps.impl.Update(obj) if err != nil { - logerrf(err, "update: failed to update") + cfgmaps.Log("update: failed to update: %s", err) return err } return nil @@ -211,7 +214,7 @@ func (cfgmaps *ConfigMaps) Delete(key string) (rls *rspb.Release, err error) { return nil, ErrReleaseExists(rls.Name) } - logerrf(err, "delete: failed to get release %q", key) + cfgmaps.Log("delete: failed to get release %q: %s", key, err) return nil, err } // delete the release @@ -316,8 +319,3 @@ func decodeRelease(data string) (*rspb.Release, error) { } return &rls, nil } - -// logerrf wraps an error with a formatted string (used for debugging) -func logerrf(err error, format string, args ...interface{}) { - log.Printf("configmaps: %s: %s\n", fmt.Sprintf(format, args...), err) -} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 46bd21d4a..48a41fd12 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -18,7 +18,6 @@ package storage // import "k8s.io/helm/pkg/storage" import ( "fmt" - "log" "sync" rspb "k8s.io/helm/pkg/proto/hapi/release" @@ -34,13 +33,15 @@ type Storage struct { releaseLocks map[string]*sync.Mutex // releaseLocksLock is a mutex for accessing releaseLocks releaseLocksLock *sync.Mutex + + Log func(string, ...interface{}) } // Get retrieves the release from storage. An error is returned // if the storage driver failed to fetch the release, or the // release identified by the key, version pair does not exist. func (s *Storage) Get(name string, version int32) (*rspb.Release, error) { - log.Printf("Getting release %q (v%d) from storage\n", name, version) + s.Log("Getting release %q", makeKey(name, version)) return s.Driver.Get(makeKey(name, version)) } @@ -48,7 +49,7 @@ func (s *Storage) Get(name string, version int32) (*rspb.Release, error) { // error is returned if the storage driver failed to store the // release, or a release with identical an key already exists. func (s *Storage) Create(rls *rspb.Release) error { - log.Printf("Create release %q (v%d) in storage\n", rls.Name, rls.Version) + s.Log("Creating release %q", makeKey(rls.Name, rls.Version)) return s.Driver.Create(makeKey(rls.Name, rls.Version), rls) } @@ -56,7 +57,7 @@ func (s *Storage) Create(rls *rspb.Release) error { // storage backend fails to update the release or if the release // does not exist. func (s *Storage) Update(rls *rspb.Release) error { - log.Printf("Updating %q (v%d) in storage\n", rls.Name, rls.Version) + s.Log("Updating release %q", makeKey(rls.Name, rls.Version)) return s.Driver.Update(makeKey(rls.Name, rls.Version), rls) } @@ -64,21 +65,21 @@ func (s *Storage) Update(rls *rspb.Release) error { // the storage backend fails to delete the release or if the release // does not exist. func (s *Storage) Delete(name string, version int32) (*rspb.Release, error) { - log.Printf("Deleting release %q (v%d) from storage\n", name, version) + s.Log("Deleting release %q", makeKey(name, version)) return s.Driver.Delete(makeKey(name, version)) } // ListReleases returns all releases from storage. An error is returned if the // storage backend fails to retrieve the releases. func (s *Storage) ListReleases() ([]*rspb.Release, error) { - log.Println("Listing all releases in storage") + s.Log("Listing all releases in storage") return s.Driver.List(func(_ *rspb.Release) bool { return true }) } // ListDeleted returns all releases with Status == DELETED. An error is returned // if the storage backend fails to retrieve the releases. func (s *Storage) ListDeleted() ([]*rspb.Release, error) { - log.Println("List deleted releases in storage") + s.Log("Listing deleted releases in storage") return s.Driver.List(func(rls *rspb.Release) bool { return relutil.StatusFilter(rspb.Status_DELETED).Check(rls) }) @@ -87,7 +88,7 @@ func (s *Storage) ListDeleted() ([]*rspb.Release, error) { // ListDeployed returns all releases with Status == DEPLOYED. An error is returned // if the storage backend fails to retrieve the releases. func (s *Storage) ListDeployed() ([]*rspb.Release, error) { - log.Println("Listing all deployed releases in storage") + s.Log("Listing all deployed releases in storage") return s.Driver.List(func(rls *rspb.Release) bool { return relutil.StatusFilter(rspb.Status_DEPLOYED).Check(rls) }) @@ -97,7 +98,7 @@ func (s *Storage) ListDeployed() ([]*rspb.Release, error) { // (filter0 && filter1 && ... && filterN), i.e. a Release is included in the results // if and only if all filters return true. func (s *Storage) ListFilterAll(fns ...relutil.FilterFunc) ([]*rspb.Release, error) { - log.Println("Listing all releases with filter") + s.Log("Listing all releases with filter") return s.Driver.List(func(rls *rspb.Release) bool { return relutil.All(fns...).Check(rls) }) @@ -107,7 +108,7 @@ func (s *Storage) ListFilterAll(fns ...relutil.FilterFunc) ([]*rspb.Release, err // (filter0 || filter1 || ... || filterN), i.e. a Release is included in the results // if at least one of the filters returns true. func (s *Storage) ListFilterAny(fns ...relutil.FilterFunc) ([]*rspb.Release, error) { - log.Println("Listing any releases with filter") + s.Log("Listing any releases with filter") return s.Driver.List(func(rls *rspb.Release) bool { return relutil.Any(fns...).Check(rls) }) @@ -116,7 +117,7 @@ func (s *Storage) ListFilterAny(fns ...relutil.FilterFunc) ([]*rspb.Release, err // Deployed returns the deployed release with the provided release name, or // returns ErrReleaseNotFound if not found. func (s *Storage) Deployed(name string) (*rspb.Release, error) { - log.Printf("Getting deployed release from '%s' history\n", name) + s.Log("Getting deployed release from %q history", name) ls, err := s.Driver.Query(map[string]string{ "NAME": name, @@ -127,7 +128,7 @@ func (s *Storage) Deployed(name string) (*rspb.Release, error) { case err != nil: return nil, err case len(ls) == 0: - return nil, fmt.Errorf("'%s' has no deployed releases", name) + return nil, fmt.Errorf("%q has no deployed releases", name) default: return ls[0], nil } @@ -136,17 +137,14 @@ func (s *Storage) Deployed(name string) (*rspb.Release, error) { // History returns the revision history for the release with the provided name, or // returns ErrReleaseNotFound if no such release name exists. func (s *Storage) History(name string) ([]*rspb.Release, error) { - log.Printf("Getting release history for '%s'\n", name) + s.Log("Getting release history for %q", name) - l, err := s.Driver.Query(map[string]string{"NAME": name, "OWNER": "TILLER"}) - if err != nil { - return nil, err - } - return l, nil + return s.Driver.Query(map[string]string{"NAME": name, "OWNER": "TILLER"}) } // Last fetches the last revision of the named release. func (s *Storage) Last(name string) (*rspb.Release, error) { + s.Log("Getting last revision of %q", name) h, err := s.History(name) if err != nil { return nil, err @@ -180,7 +178,7 @@ func (s *Storage) LockRelease(name string) error { } } if !found { - return fmt.Errorf("Unable to lock release %s: release not found", name) + return fmt.Errorf("Unable to lock release %q: release not found", name) } lock = &sync.Mutex{} @@ -222,5 +220,6 @@ func Init(d driver.Driver) *Storage { Driver: d, releaseLocks: make(map[string]*sync.Mutex), releaseLocksLock: &sync.Mutex{}, + Log: func(_ string, _ ...interface{}) {}, } } diff --git a/pkg/tiller/environment/environment.go b/pkg/tiller/environment/environment.go index 26516474b..27727f1ff 100644 --- a/pkg/tiller/environment/environment.go +++ b/pkg/tiller/environment/environment.go @@ -235,6 +235,5 @@ func New() *Environment { return &Environment{ EngineYard: ey, Releases: storage.Init(driver.NewMemory()), - KubeClient: kube.New(nil), } } diff --git a/pkg/tiller/release_server.go b/pkg/tiller/release_server.go index 33d3423ae..363f2e692 100644 --- a/pkg/tiller/release_server.go +++ b/pkg/tiller/release_server.go @@ -20,7 +20,6 @@ import ( "bytes" "errors" "fmt" - "log" "path" "regexp" "strings" @@ -84,6 +83,7 @@ type ReleaseServer struct { ReleaseModule env *environment.Environment clientset internalclientset.Interface + Log func(string, ...interface{}) } // NewReleaseServer creates a new release server. @@ -101,6 +101,7 @@ func NewReleaseServer(env *environment.Environment, clientset internalclientset. env: env, clientset: clientset, ReleaseModule: releaseModule, + Log: func(_ string, _ ...interface{}) {}, } } @@ -268,7 +269,7 @@ func (s *ReleaseServer) GetReleaseStatus(c ctx.Context, req *services.GetRelease // Skip errors if this is already deleted or failed. return statusResp, nil } else if err != nil { - log.Printf("warning: Get for %s failed: %v", rel.Name, err) + s.Log("warning: Get for %s failed: %v", rel.Name, err) return nil, err } rel.Info.Status.Resources = resp @@ -321,7 +322,7 @@ func (s *ReleaseServer) performUpdate(originalRelease, updatedRelease *release.R res := &services.UpdateReleaseResponse{Release: updatedRelease} if req.DryRun { - log.Printf("Dry run for %s", updatedRelease.Name) + s.Log("Dry run for %s", updatedRelease.Name) res.Release.Info.Description = "Dry run complete" return res, nil } @@ -334,7 +335,7 @@ func (s *ReleaseServer) performUpdate(originalRelease, updatedRelease *release.R } if err := s.ReleaseModule.Update(originalRelease, updatedRelease, req, s.env); err != nil { msg := fmt.Sprintf("Upgrade %q failed: %s", updatedRelease.Name, err) - log.Printf("warning: %s", msg) + s.Log("warning: %s", msg) originalRelease.Info.Status.Code = release.Status_SUPERSEDED updatedRelease.Info.Status.Code = release.Status_FAILED updatedRelease.Info.Description = msg @@ -370,19 +371,19 @@ func (s *ReleaseServer) performUpdate(originalRelease, updatedRelease *release.R func (s *ReleaseServer) reuseValues(req *services.UpdateReleaseRequest, current *release.Release) error { if req.ResetValues { // If ResetValues is set, we comletely ignore current.Config. - log.Print("Reset values to the chart's original version.") + s.Log("Reset values to the chart's original version.") return nil } // If the ReuseValues flag is set, we always copy the old values over the new config's values. if req.ReuseValues { - log.Print("Reusing the old release's values") + s.Log("Reusing the old release's values") // We have to regenerate the old coalesced values: oldVals, err := chartutil.CoalesceValues(current.Chart, current.Config) if err != nil { err := fmt.Errorf("failed to rebuild old values: %s", err) - log.Print(err) + s.Log("%s", err) return err } nv, err := oldVals.YAML() @@ -399,7 +400,7 @@ func (s *ReleaseServer) reuseValues(req *services.UpdateReleaseRequest, current current.Config != nil && current.Config.Raw != "" && current.Config.Raw != "{}\n" { - log.Printf("Copying values from %s (v%d) to new release.", current.Name, current.Version) + s.Log("Copying values from %s (v%d) to new release.", current.Name, current.Version) req.Values = current.Config } return nil @@ -508,7 +509,7 @@ func (s *ReleaseServer) performRollback(currentRelease, targetRelease *release.R res := &services.RollbackReleaseResponse{Release: targetRelease} if req.DryRun { - log.Printf("Dry run for %s", targetRelease.Name) + s.Log("Dry run for %s", targetRelease.Name) return res, nil } @@ -521,7 +522,7 @@ func (s *ReleaseServer) performRollback(currentRelease, targetRelease *release.R if err := s.ReleaseModule.Rollback(currentRelease, targetRelease, req, s.env); err != nil { msg := fmt.Sprintf("Rollback %q failed: %s", targetRelease.Name, err) - log.Printf("warning: %s", msg) + s.Log("warning: %s", msg) currentRelease.Info.Status.Code = release.Status_SUPERSEDED targetRelease.Info.Status.Code = release.Status_FAILED targetRelease.Info.Description = msg @@ -565,7 +566,7 @@ func (s *ReleaseServer) prepareRollback(req *services.RollbackReleaseRequest) (* rbv = crls.Version - 1 } - log.Printf("rolling back %s (current: v%d, target: v%d)", req.Name, crls.Version, rbv) + s.Log("rolling back %s (current: v%d, target: v%d)", req.Name, crls.Version, rbv) prls, err := s.env.Releases.Get(req.Name, rbv) if err != nil { @@ -617,7 +618,7 @@ func (s *ReleaseServer) uniqName(start string, reuse bool) (string, error) { if st := rel.Info.Status.Code; reuse && (st == release.Status_DELETED || st == release.Status_FAILED) { // Allowe re-use of names if the previous release is marked deleted. - log.Printf("reusing name %q", start) + s.Log("reusing name %q", start) return start, nil } else if reuse { return "", errors.New("cannot re-use a name that is still in use") @@ -636,9 +637,9 @@ func (s *ReleaseServer) uniqName(start string, reuse bool) (string, error) { if _, err := s.env.Releases.Get(name, 1); strings.Contains(err.Error(), "not found") { return name, nil } - log.Printf("info: Name %q is taken. Searching again.", name) + s.Log("info: Name %q is taken. Searching again.", name) } - log.Printf("warning: No available release names found after %d tries", maxTries) + s.Log("warning: No available release names found after %d tries", maxTries) return "ERROR", errors.New("no available release name found") } @@ -648,7 +649,7 @@ func (s *ReleaseServer) engine(ch *chart.Chart) environment.Engine { if r, ok := s.env.EngineYard.Get(ch.Metadata.Engine); ok { renderer = r } else { - log.Printf("warning: %s requested non-existent template engine %s", ch.Metadata.Name, ch.Metadata.Engine) + s.Log("warning: %s requested non-existent template engine %s", ch.Metadata.Name, ch.Metadata.Engine) } } return renderer @@ -658,7 +659,7 @@ func (s *ReleaseServer) engine(ch *chart.Chart) environment.Engine { func (s *ReleaseServer) InstallRelease(c ctx.Context, req *services.InstallReleaseRequest) (*services.InstallReleaseResponse, error) { rel, err := s.prepareRelease(req) if err != nil { - log.Printf("Failed install prepare step: %s", err) + s.Log("Failed install prepare step: %s", err) res := &services.InstallReleaseResponse{Release: rel} // On dry run, append the manifest contents to a failed release. This is @@ -671,7 +672,7 @@ func (s *ReleaseServer) InstallRelease(c ctx.Context, req *services.InstallRelea res, err := s.performRelease(rel, req) if err != nil { - log.Printf("Failed install perform step: %s", err) + s.Log("Failed install perform step: %s", err) } return res, err } @@ -854,10 +855,10 @@ func (s *ReleaseServer) renderResources(ch *chart.Chart, values chartutil.Values func (s *ReleaseServer) recordRelease(r *release.Release, reuse bool) { if reuse { if err := s.env.Releases.Update(r); err != nil { - log.Printf("warning: Failed to update release %q: %s", r.Name, err) + s.Log("warning: Failed to update release %q: %s", r.Name, err) } } else if err := s.env.Releases.Create(r); err != nil { - log.Printf("warning: Failed to record release %q: %s", r.Name, err) + s.Log("warning: Failed to record release %q: %s", r.Name, err) } } @@ -866,7 +867,7 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install res := &services.InstallReleaseResponse{Release: r} if req.DryRun { - log.Printf("Dry run for %s", r.Name) + s.Log("Dry run for %s", r.Name) res.Release.Info.Description = "Dry run complete" return res, nil } @@ -901,7 +902,7 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install } if err := s.ReleaseModule.Update(old, r, updateReq, s.env); err != nil { msg := fmt.Sprintf("Release replace %q failed: %s", r.Name, err) - log.Printf("warning: %s", msg) + s.Log("warning: %s", msg) old.Info.Status.Code = release.Status_SUPERSEDED r.Info.Status.Code = release.Status_FAILED r.Info.Description = msg @@ -915,7 +916,7 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install // regular manifests if err := s.ReleaseModule.Create(r, req, s.env); err != nil { msg := fmt.Sprintf("Release %q failed: %s", r.Name, err) - log.Printf("warning: %s", msg) + s.Log("warning: %s", msg) r.Info.Status.Code = release.Status_FAILED r.Info.Description = msg s.recordRelease(r, false) @@ -927,7 +928,7 @@ func (s *ReleaseServer) performRelease(r *release.Release, req *services.Install if !req.DisableHooks { if err := s.execHook(r.Hooks, r.Name, r.Namespace, hooks.PostInstall, req.Timeout); err != nil { msg := fmt.Sprintf("Release %q failed post-install: %s", r.Name, err) - log.Printf("warning: %s", msg) + s.Log("warning: %s", msg) r.Info.Status.Code = release.Status_FAILED r.Info.Description = msg s.recordRelease(r, false) @@ -956,7 +957,7 @@ func (s *ReleaseServer) execHook(hs []*release.Hook, name, namespace, hook strin return fmt.Errorf("unknown hook %q", hook) } - log.Printf("Executing %s hooks for %s", hook, name) + s.Log("Executing %s hooks for %s", hook, name) executingHooks := []*release.Hook{} for _, h := range hs { for _, e := range h.Events { @@ -972,20 +973,20 @@ func (s *ReleaseServer) execHook(hs []*release.Hook, name, namespace, hook strin b := bytes.NewBufferString(h.Manifest) if err := kubeCli.Create(namespace, b, timeout, false); err != nil { - log.Printf("warning: Release %q %s %s failed: %s", name, hook, h.Path, err) + s.Log("warning: Release %q %s %s failed: %s", name, hook, h.Path, err) return err } // No way to rewind a bytes.Buffer()? b.Reset() b.WriteString(h.Manifest) if err := kubeCli.WatchUntilReady(namespace, b, timeout, false); err != nil { - log.Printf("warning: Release %q %s %s could not complete: %s", name, hook, h.Path, err) + s.Log("warning: Release %q %s %s could not complete: %s", name, hook, h.Path, err) return err } h.LastRun = timeconv.Now() } - log.Printf("Hooks complete for %s %s", hook, name) + s.Log("Hooks complete for %s %s", hook, name) return nil } @@ -1007,7 +1008,7 @@ func (s *ReleaseServer) UninstallRelease(c ctx.Context, req *services.UninstallR defer s.env.Releases.UnlockRelease(req.Name) if !ValidName.MatchString(req.Name) { - log.Printf("uninstall: Release not found: %s", req.Name) + s.Log("uninstall: Release not found: %s", req.Name) return nil, errMissingRelease } @@ -1017,7 +1018,7 @@ func (s *ReleaseServer) UninstallRelease(c ctx.Context, req *services.UninstallR rels, err := s.env.Releases.History(req.Name) if err != nil { - log.Printf("uninstall: Release not loaded: %s", req.Name) + s.Log("uninstall: Release not loaded: %s", req.Name) return nil, err } if len(rels) < 1 { @@ -1032,7 +1033,7 @@ func (s *ReleaseServer) UninstallRelease(c ctx.Context, req *services.UninstallR if rel.Info.Status.Code == release.Status_DELETED { if req.Purge { if err := s.purgeReleases(rels...); err != nil { - log.Printf("uninstall: Failed to purge the release: %s", err) + s.Log("uninstall: Failed to purge the release: %s", err) return nil, err } return &services.UninstallReleaseResponse{Release: rel}, nil @@ -1040,7 +1041,7 @@ func (s *ReleaseServer) UninstallRelease(c ctx.Context, req *services.UninstallR return nil, fmt.Errorf("the release named %q is already deleted", req.Name) } - log.Printf("uninstall: Deleting %s", req.Name) + s.Log("uninstall: Deleting %s", req.Name) rel.Info.Status.Code = release.Status_DELETING rel.Info.Deleted = timeconv.Now() rel.Info.Description = "Deletion in progress (or silently failed)" @@ -1055,7 +1056,7 @@ func (s *ReleaseServer) UninstallRelease(c ctx.Context, req *services.UninstallR // From here on out, the release is currently considered to be in Status_DELETING // state. if err := s.env.Releases.Update(rel); err != nil { - log.Printf("uninstall: Failed to store updated release: %s", err) + s.Log("uninstall: Failed to store updated release: %s", err) } kept, errs := s.ReleaseModule.Delete(rel, req, s.env) @@ -1063,7 +1064,7 @@ func (s *ReleaseServer) UninstallRelease(c ctx.Context, req *services.UninstallR es := make([]string, 0, len(errs)) for _, e := range errs { - log.Printf("error: %v", e) + s.Log("error: %v", e) es = append(es, e.Error()) } @@ -1079,13 +1080,13 @@ func (s *ReleaseServer) UninstallRelease(c ctx.Context, req *services.UninstallR if req.Purge { err := s.purgeReleases(rels...) if err != nil { - log.Printf("uninstall: Failed to purge the release: %s", err) + s.Log("uninstall: Failed to purge the release: %s", err) } return res, err } if err := s.env.Releases.Update(rel); err != nil { - log.Printf("uninstall: Failed to store updated release: %s", err) + s.Log("uninstall: Failed to store updated release: %s", err) } if len(es) > 0 { @@ -1122,12 +1123,12 @@ func (s *ReleaseServer) RunReleaseTest(req *services.TestReleaseRequest, stream tSuite, err := reltesting.NewTestSuite(rel) if err != nil { - log.Printf("Error creating test suite for %s", rel.Name) + s.Log("Error creating test suite for %s", rel.Name) return err } if err := tSuite.Run(testEnv); err != nil { - log.Printf("Error running test suite for %s", rel.Name) + s.Log("Error running test suite for %s", rel.Name) return err } @@ -1142,7 +1143,7 @@ func (s *ReleaseServer) RunReleaseTest(req *services.TestReleaseRequest, stream } if err := s.env.Releases.Update(rel); err != nil { - log.Printf("test: Failed to store updated release: %s", err) + s.Log("test: Failed to store updated release: %s", err) } return nil diff --git a/pkg/tiller/release_server_test.go b/pkg/tiller/release_server_test.go index fd09fa47b..8a5934b79 100644 --- a/pkg/tiller/release_server_test.go +++ b/pkg/tiller/release_server_test.go @@ -105,6 +105,7 @@ func rsFixture() *ReleaseServer { }, env: MockEnvironment(), clientset: clientset, + Log: func(_ string, _ ...interface{}) {}, } }