From b481c5920f7c843288abdb8a5514e85978510437 Mon Sep 17 00:00:00 2001 From: Adam Reese Date: Tue, 10 May 2016 10:37:43 -0700 Subject: [PATCH] feat(helm): use port forwarding for tiller connection --- cmd/helm/install.go | 17 +++--- cmd/helm/list.go | 5 ++ cmd/helm/port_forward.go | 36 +++++++++++ glide.lock | 66 ++++++++++---------- glide.yaml | 1 + pkg/kube/client.go | 10 +-- pkg/kube/client_test.go | 7 +-- pkg/kube/tunnel.go | 127 ++++++++++++--------------------------- pkg/kube/tunnel_test.go | 15 +++++ pkg/tiller/tiller.go | 6 ++ 10 files changed, 151 insertions(+), 139 deletions(-) create mode 100644 cmd/helm/port_forward.go create mode 100644 pkg/kube/tunnel_test.go create mode 100644 pkg/tiller/tiller.go diff --git a/cmd/helm/install.go b/cmd/helm/install.go index 7ba4689b3..85e2ea0ac 100644 --- a/cmd/helm/install.go +++ b/cmd/helm/install.go @@ -8,6 +8,7 @@ import ( "github.com/kubernetes/helm/pkg/helm" "github.com/kubernetes/helm/pkg/proto/hapi/release" + "github.com/kubernetes/helm/pkg/tiller" "github.com/kubernetes/helm/pkg/timeconv" ) @@ -19,11 +20,6 @@ path to a chart directory or the name of a chart in the current working directory. ` -const ( - hostEnvVar = "TILLER_HOST" - defaultHost = ":44134" -) - // install flags & args var ( // installArg is the name or relative path of the chart to install @@ -48,6 +44,11 @@ func runInstall(cmd *cobra.Command, args []string) error { installArg = args[0] setupInstallEnv() + pf, err := newTillerPortForwarder() + defer pf.Close() + + helm.Config.ServAddr = fmt.Sprintf(":%d", pf.Local) + res, err := helm.InstallRelease(installArg, installDryRun) if err != nil { return prettyError(err) @@ -79,8 +80,8 @@ func setupInstallEnv() { // // bug: except that if the host flag happens to set the host to the same // value as the defaultHost, the env var will be used instead. - if tillerHost == defaultHost { - host := os.Getenv(hostEnvVar) + if tillerHost == tiller.DefaultHost { + host := os.Getenv(tiller.HostEnvVar) if host != "" { tillerHost = host } @@ -90,7 +91,7 @@ func setupInstallEnv() { } func init() { - installCmd.Flags().StringVar(&tillerHost, "host", defaultHost, "address of tiller server") + installCmd.Flags().StringVar(&tillerHost, "host", tiller.DefaultHost, "address of tiller server") installCmd.Flags().BoolVar(&installDryRun, "dry-run", false, "simulate an install") RootCommand.AddCommand(installCmd) diff --git a/cmd/helm/list.go b/cmd/helm/list.go index 6df616161..6ef51dd46 100644 --- a/cmd/helm/list.go +++ b/cmd/helm/list.go @@ -45,6 +45,11 @@ func listCmd(cmd *cobra.Command, args []string) error { fmt.Println("TODO: Implement filter.") } + pf, err := newTillerPortForwarder() + defer pf.Close() + + helm.Config.ServAddr = fmt.Sprintf(":%d", pf.Local) + res, err := helm.ListReleases(listMax, listOffset) if err != nil { return prettyError(err) diff --git a/cmd/helm/port_forward.go b/cmd/helm/port_forward.go new file mode 100644 index 000000000..34e696e51 --- /dev/null +++ b/cmd/helm/port_forward.go @@ -0,0 +1,36 @@ +package main + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/labels" + + "github.com/kubernetes/helm/pkg/kube" +) + +func newTillerPortForwarder() (*kube.Tunnel, error) { + podName, err := getTillerPodName("helm") + if err != nil { + return nil, err + } + return kube.New(nil).ForwardPort("helm", podName, 44134) +} + +func getTillerPodName(namespace string) (string, error) { + client, err := kube.New(nil).Client() + if err != nil { + return "", err + } + + selector := labels.Set{"app": "helm", "name": "tiller"}.AsSelector() + options := api.ListOptions{LabelSelector: selector} + pods, err := client.Pods(namespace).List(options) + if err != nil { + return "", err + } + if len(pods.Items) < 1 { + return "", fmt.Errorf("I could not find tiller") + } + return pods.Items[0].ObjectMeta.GetName(), nil +} diff --git a/glide.lock b/glide.lock index d154ab7f5..64ad38803 100644 --- a/glide.lock +++ b/glide.lock @@ -1,8 +1,8 @@ -hash: 43d0eadd3ed4b3563b4a6a49d3bd6d083bcb8fbe9d201dfef763b90c1677738c -updated: 2016-05-09T10:30:30.592692551-07:00 +hash: 63afc29bc3d5032c6c2829b7bf9ffba43555ad5a3faf72a14ef2a86155c0514c +updated: 2016-05-11T14:15:54.42108644-07:00 imports: - name: bitbucket.org/ww/goautoneg - version: 75cd24fc2f2c + version: 75cd24fc2f2c2a2088577d12123ddee5f54e0675 - name: github.com/aokoli/goutils version: 9c37978a95bd5c709a15883b6242714ea6709e64 - name: github.com/beorn7/perks @@ -245,13 +245,13 @@ imports: - pkg/client/unversioned/remotecommand - pkg/kubectl/cmd/util - pkg/kubectl/resource + - pkg/labels - pkg/api/meta - pkg/api/resource - pkg/api/unversioned - pkg/auth/user - pkg/conversion - pkg/fields - - pkg/labels - pkg/runtime - pkg/runtime/serializer - pkg/types @@ -259,6 +259,7 @@ imports: - pkg/util/intstr - pkg/util/rand - pkg/util/sets + - pkg/util/validation - pkg/api/errors - pkg/api/v1 - pkg/api/validation @@ -271,34 +272,23 @@ imports: - pkg/version - pkg/watch - pkg/watch/json - - pkg/api/install - - pkg/apimachinery/registered - - pkg/apis/apps - - pkg/apis/apps/install - - pkg/apis/authorization/install - - pkg/apis/autoscaling - - pkg/apis/autoscaling/install - - pkg/apis/batch - - pkg/apis/batch/install - - pkg/apis/componentconfig/install - - pkg/apis/extensions - - pkg/apis/extensions/install - - pkg/apis/metrics/install - - pkg/client/typed/discovery - - pkg/util/wait - - plugin/pkg/client/auth - pkg/client/unversioned/auth - pkg/client/unversioned/clientcmd/api/latest - pkg/util/errors - pkg/util/homedir - - pkg/util/validation - pkg/kubelet/server/portforward - pkg/util/httpstream - pkg/util/runtime - pkg/kubelet/server/remotecommand - pkg/util/httpstream/spdy - pkg/apimachinery + - pkg/apimachinery/registered + - pkg/apis/apps + - pkg/apis/autoscaling + - pkg/apis/batch + - pkg/apis/extensions - pkg/apis/metrics + - pkg/client/typed/discovery - pkg/client/unversioned/adapters/internalclientset - pkg/kubectl - pkg/registry/thirdpartyresourcedata @@ -312,6 +302,7 @@ imports: - pkg/runtime/serializer/protobuf - pkg/runtime/serializer/recognizer - pkg/runtime/serializer/versioning + - pkg/util/wait - pkg/util/validation/field - pkg/util/parsers - pkg/watch/versioned @@ -321,37 +312,44 @@ imports: - pkg/api/util - pkg/capabilities - pkg/util/integer - - pkg/apis/apps/v1alpha1 - - pkg/apis/authorization - - pkg/apis/authorization/v1beta1 - - pkg/apis/autoscaling/v1 - - pkg/apis/batch/v1 - - pkg/apis/componentconfig - - pkg/apis/componentconfig/v1alpha1 - - pkg/apis/extensions/v1beta1 - - pkg/apis/metrics/v1alpha1 - - plugin/pkg/client/auth/gcp - pkg/client/unversioned/clientcmd/api/v1 - pkg/httplog - pkg/util/wsstream - third_party/golang/netutil + - pkg/api/install + - pkg/apis/apps/install + - pkg/apis/authorization/install + - pkg/apis/autoscaling/install + - pkg/apis/batch/install + - pkg/apis/componentconfig/install + - pkg/apis/extensions/install + - pkg/apis/metrics/install + - plugin/pkg/client/auth - pkg/client/clientset_generated/internalclientset - pkg/client/clientset_generated/internalclientset/typed/core/unversioned - pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned + - pkg/apis/batch/v1 - pkg/credentialprovider - pkg/fieldpath - pkg/kubelet/qos/util - pkg/util/deployment - pkg/util/jsonpath - pkg/api/rest + - pkg/apis/extensions/v1beta1 - pkg/apis/extensions/validation - pkg/registry/generic - pkg/util/framer - third_party/forked/json - pkg/util/hash - pkg/util/net/sets - - pkg/kubelet/qos - - pkg/master/ports + - pkg/apis/apps/v1alpha1 + - pkg/apis/authorization + - pkg/apis/authorization/v1beta1 + - pkg/apis/autoscaling/v1 + - pkg/apis/componentconfig + - pkg/apis/componentconfig/v1alpha1 + - pkg/apis/metrics/v1alpha1 + - plugin/pkg/client/auth/gcp - pkg/client/clientset_generated/internalclientset/typed/batch/unversioned - pkg/controller - pkg/util/labels @@ -360,6 +358,8 @@ imports: - pkg/api/unversioned/validation - pkg/controller/podautoscaler - pkg/storage + - pkg/kubelet/qos + - pkg/master/ports - pkg/client/cache - pkg/client/record - pkg/controller/framework diff --git a/glide.yaml b/glide.yaml index 84cd74614..acc571a02 100644 --- a/glide.yaml +++ b/glide.yaml @@ -34,4 +34,5 @@ import: - pkg/client/unversioned/remotecommand - pkg/kubectl/cmd/util - pkg/kubectl/resource + - pkg/labels - package: github.com/gosuri/uitable diff --git a/pkg/kube/client.go b/pkg/kube/client.go index a14816faf..b6a22e9ec 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -11,13 +11,13 @@ import ( // Client represents a client capable of communicating with the Kubernetes API. type Client struct { - config clientcmd.ClientConfig + *cmdutil.Factory } // New create a new Client func New(config clientcmd.ClientConfig) *Client { return &Client{ - config: config, + Factory: cmdutil.NewFactory(config), } } @@ -38,14 +38,10 @@ func (c *Client) Delete(namespace string, reader io.Reader) error { return perform(c, namespace, reader, deleteResource) } -func (c *Client) factory() *cmdutil.Factory { - return cmdutil.NewFactory(c.config) -} - const includeThirdPartyAPIs = false func perform(c *Client, namespace string, reader io.Reader, fn ResourceActorFunc) error { - r := c.factory().NewBuilder(includeThirdPartyAPIs). + r := c.NewBuilder(includeThirdPartyAPIs). ContinueOnError(). NamespaceParam(namespace). RequireNamespace(). diff --git a/pkg/kube/client_test.go b/pkg/kube/client_test.go index c3c81b4a2..7357f584c 100644 --- a/pkg/kube/client_test.go +++ b/pkg/kube/client_test.go @@ -7,7 +7,6 @@ import ( "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/client/unversioned/fake" - cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/kubectl/resource" ) @@ -46,12 +45,12 @@ func TestPerform(t *testing.T) { return nil } - f := cmdutil.NewFactory(nil) - f.ClientForMapping = func(mapping *meta.RESTMapping) (resource.RESTClient, error) { + c := New(nil) + c.ClientForMapping = func(mapping *meta.RESTMapping) (resource.RESTClient, error) { return &fake.RESTClient{}, nil } - err := perform(f, tt.namespace, tt.reader, fn) + err := perform(c, tt.namespace, tt.reader, fn) if (err != nil) != tt.err { t.Errorf("%q. expected error: %v, got %v", tt.name, tt.err, err) } diff --git a/pkg/kube/tunnel.go b/pkg/kube/tunnel.go index 57325380f..db858cc25 100644 --- a/pkg/kube/tunnel.go +++ b/pkg/kube/tunnel.go @@ -2,136 +2,89 @@ package kube import ( "fmt" - "net/url" - "os" - "os/signal" + "net" + "strconv" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/restclient" - "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/portforward" "k8s.io/kubernetes/pkg/client/unversioned/remotecommand" ) -type tunnel interface { - Close() -} - type Tunnel struct { - name string - stopCh chan struct{} + Local int + Remote int + stopChan chan struct{} } +// Close disconnects a tunnel connection func (t *Tunnel) Close() { - close(t.stopCh) + close(t.stopChan) } -func newTunnel(c *Client, namespace, podName string) (*Tunnel, error) { - client, err := c.factory().Client() +// ForwardPort opens a tunnel to a kubernetes pod +func (c *Client) ForwardPort(namespace, podName string, remote int) (*Tunnel, error) { + client, err := c.Client() if err != nil { return nil, err } - if err := isPodRunning(client, namespace, podName); err != nil { - return nil, err - } - - config, err := c.factory().ClientConfig() + config, err := c.ClientConfig() if err != nil { return nil, err } - req := client.RESTClient.Post(). + // http://192.168.64.94:8080/api/v1/namespaces/helm/pods/tiller-rc-9itlq/portforward + u := client.RESTClient.Post(). Resource("pods"). Namespace(namespace). Name(podName). - SubResource("portforward") + SubResource("portforward").URL() - stopCh := make(chan struct{}, 1) - - ports := []string{"44134"} - - dialer, err := remotecommand.NewExecutor(config, "POST", req.URL()) + dialer, err := remotecommand.NewExecutor(config, "POST", u) if err != nil { return nil, err } - fw, err := portforward.New(dialer, ports, stopCh) + local, err := getAvailablePort() if err != nil { return nil, err } - go func() { - err = fw.ForwardPorts() - if err != nil { - fmt.Printf("Failed to forward ports %v on pod %s: %v\n", ports, podName, err) - } - }() + t := &Tunnel{ + Local: local, + Remote: remote, + stopChan: make(chan struct{}, 1), + } - return &Tunnel{stopCh: stopCh}, nil -} + ports := []string{fmt.Sprintf("%d:%d", local, remote)} -func isPodRunning(client *unversioned.Client, namespace, podName string) error { - pod, err := client.Pods(namespace).Get(podName) + pf, err := portforward.New(dialer, ports, t.stopChan) if err != nil { - return err - } - - if pod.Status.Phase != api.PodRunning { - return fmt.Errorf("Unable to execute command because pod is not running. Current status=%v", pod.Status.Phase) + return nil, err } - return nil -} -type portForwarder interface { - ForwardPorts(url *url.URL, config *restclient.Config, ports []string, stopChan <-chan struct{}) error -} + go func() { + if err := pf.ForwardPorts(); err != nil { + fmt.Printf("Error forwarding ports: %v\n", err) + } + }() -type defaultPortForwarder struct{} + // wait for listeners to start + <-pf.Ready -func (f *defaultPortForwarder) ForwardPorts(url *url.URL, config *restclient.Config, ports []string, stopChan <-chan struct{}) error { - dialer, err := remotecommand.NewExecutor(config, "POST", url) - if err != nil { - return err - } - fw, err := portforward.New(dialer, ports, stopChan) - if err != nil { - return err - } - return fw.ForwardPorts() + return t, nil } -func RunPortForward(f *cmdutil.Factory, namespace, podName string, args []string, fw portForwarder) error { - - client, err := f.Client() +func getAvailablePort() (int, error) { + l, err := net.Listen("tcp", ":0") if err != nil { - return err - } - - if err := isPodRunning(client, namespace, podName); err != nil { - return nil, err + return 0, err } + defer l.Close() - config, err := f.ClientConfig() + _, p, err := net.SplitHostPort(l.Addr().String()) + port, err := strconv.Atoi(p) if err != nil { - return err + return 0, err } - - signals := make(chan os.Signal, 1) - signal.Notify(signals, os.Interrupt) - defer signal.Stop(signals) - - stopCh := make(chan struct{}, 1) - go func() { - <-signals - close(stopCh) - }() - - req := client.RESTClient.Post(). - Resource("pods"). - Namespace(namespace). - Name(pod.Name). - SubResource("portforward") - - return fw.ForwardPorts(req.URL(), config, args, stopCh) + return port, err } diff --git a/pkg/kube/tunnel_test.go b/pkg/kube/tunnel_test.go new file mode 100644 index 000000000..5833b1bc8 --- /dev/null +++ b/pkg/kube/tunnel_test.go @@ -0,0 +1,15 @@ +package kube + +import ( + "testing" +) + +func TestAvailablePort(t *testing.T) { + port, err := getAvailablePort() + if err != nil { + t.Fatal(err) + } + if port < 1 { + t.Fatalf("generated port should be > 1, got %d", port) + } +} diff --git a/pkg/tiller/tiller.go b/pkg/tiller/tiller.go new file mode 100644 index 000000000..f48f79ebd --- /dev/null +++ b/pkg/tiller/tiller.go @@ -0,0 +1,6 @@ +package tiller + +const ( + HostEnvVar = "TILLER_HOST" + DefaultHost = ":44134" +)