From dd4477d9b79308e19f7b91d2c53f0a35b412e287 Mon Sep 17 00:00:00 2001 From: Adam Reese Date: Mon, 9 May 2016 10:09:20 -0700 Subject: [PATCH] feat(kube): create tunnel for client --- pkg/kube/client.go | 14 +++-- pkg/kube/tunnel.go | 134 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 142 insertions(+), 6 deletions(-) create mode 100644 pkg/kube/tunnel.go diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 0078bf12b..a14816faf 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -28,22 +28,24 @@ type ResourceActorFunc func(*resource.Info) error // // Namespace will set the namespace func (c *Client) Create(namespace string, reader io.Reader) error { - f := cmdutil.NewFactory(c.config) - return perform(f, namespace, reader, createResource) + return perform(c, namespace, reader, createResource) } // Delete deletes kubernetes resources from an io.reader // // Namespace will set the namespace func (c *Client) Delete(namespace string, reader io.Reader) error { - f := cmdutil.NewFactory(c.config) - return perform(f, namespace, reader, deleteResource) + return perform(c, namespace, reader, deleteResource) +} + +func (c *Client) factory() *cmdutil.Factory { + return cmdutil.NewFactory(c.config) } const includeThirdPartyAPIs = false -func perform(f *cmdutil.Factory, namespace string, reader io.Reader, fn ResourceActorFunc) error { - r := f.NewBuilder(includeThirdPartyAPIs). +func perform(c *Client, namespace string, reader io.Reader, fn ResourceActorFunc) error { + r := c.factory().NewBuilder(includeThirdPartyAPIs). ContinueOnError(). NamespaceParam(namespace). RequireNamespace(). diff --git a/pkg/kube/tunnel.go b/pkg/kube/tunnel.go new file mode 100644 index 000000000..508a63892 --- /dev/null +++ b/pkg/kube/tunnel.go @@ -0,0 +1,134 @@ +package kube + +import ( + "fmt" + "net/url" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/client/unversioned/portforward" + "k8s.io/kubernetes/pkg/client/unversioned/remotecommand" +) + +type tunnel interface { + Close() +} + +type Tunnel struct { + name string + stopCh chan struct{} +} + +func (t *Tunnel) Close() { + close(t.stopCh) +} + +func newTunnel(c *Client, namespace, podName string) (*Tunnel, error) { + client, err := c.factory().Client() + if err != nil { + return nil, err + } + + if err := isPodRunning(client, namespace, podName); err != nil { + return nil, err + } + + config, err := c.factory().ClientConfig() + if err != nil { + return nil, err + } + + req := client.RESTClient.Post(). + Resource("pods"). + Namespace(namespace). + Name(podName). + SubResource("portforward") + + stopCh := make(chan struct{}, 1) + + ports := []string{"44134"} + + dialer, err := remotecommand.NewExecutor(config, "POST", req.URL()) + if err != nil { + return nil, err + } + + fw, err := portforward.New(dialer, ports, stopCh) + if err != nil { + return nil, err + } + + go func() { + err = fw.ForwardPorts() + if err != nil { + fmt.Printf("Failed to forward ports %v on pod %s: %v\n", ports, podName, err) + } + }() + + return &Tunnel{stopCh: stopCh}, nil +} + +func isPodRunning(client *unversioned.Client, namespace, podName string) error { + pod, err := client.Pods(namespace).Get(podName) + if err != nil { + return err + } + + if pod.Status.Phase != api.PodRunning { + return fmt.Errorf("Unable to execute command because pod is not running. Current status=%v", pod.Status.Phase) + } + return nil +} + +type portForwarder interface { + ForwardPorts(url *url.URL, config *restclient.Config, ports []string, stopChan <-chan struct{}) error +} + +type defaultPortForwarder struct{} + +func (f *defaultPortForwarder) ForwardPorts(url *url.URL, config *restclient.Config, ports []string, stopChan <-chan struct{}) error { + dialer, err := remotecommand.NewExecutor(config, "POST", url) + if err != nil { + return err + } + fw, err := portforward.New(dialer, ports, stopChan) + if err != nil { + return err + } + return fw.ForwardPorts() +} + +func RunPortForward(f *cmdutil.Factory, namespace, podName string, args []string, fw portForwarder) error { + + client, err := f.Client() + if err != nil { + return err + } + + if err := isPodRunning(client, namespace, podName); err != nil { + return nil, err + } + + config, err := f.ClientConfig() + if err != nil { + return err + } + + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + defer signal.Stop(signals) + + stopCh := make(chan struct{}, 1) + go func() { + <-signals + close(stopCh) + }() + + req := client.RESTClient.Post(). + Resource("pods"). + Namespace(namespace). + Name(pod.Name). + SubResource("portforward") + + return fw.ForwardPorts(req.URL(), config, args, stopCh) +}