feat(helm): use port forwarding for tiller connection

pull/722/head
Adam Reese 10 years ago
parent cf03809eb1
commit b481c5920f

@ -8,6 +8,7 @@ import (
"github.com/kubernetes/helm/pkg/helm"
"github.com/kubernetes/helm/pkg/proto/hapi/release"
"github.com/kubernetes/helm/pkg/tiller"
"github.com/kubernetes/helm/pkg/timeconv"
)
@ -19,11 +20,6 @@ path to a chart directory or the name of a
chart in the current working directory.
`
const (
hostEnvVar = "TILLER_HOST"
defaultHost = ":44134"
)
// install flags & args
var (
// installArg is the name or relative path of the chart to install
@ -48,6 +44,11 @@ func runInstall(cmd *cobra.Command, args []string) error {
installArg = args[0]
setupInstallEnv()
pf, err := newTillerPortForwarder()
defer pf.Close()
helm.Config.ServAddr = fmt.Sprintf(":%d", pf.Local)
res, err := helm.InstallRelease(installArg, installDryRun)
if err != nil {
return prettyError(err)
@ -79,8 +80,8 @@ func setupInstallEnv() {
//
// bug: except that if the host flag happens to set the host to the same
// value as the defaultHost, the env var will be used instead.
if tillerHost == defaultHost {
host := os.Getenv(hostEnvVar)
if tillerHost == tiller.DefaultHost {
host := os.Getenv(tiller.HostEnvVar)
if host != "" {
tillerHost = host
}
@ -90,7 +91,7 @@ func setupInstallEnv() {
}
func init() {
installCmd.Flags().StringVar(&tillerHost, "host", defaultHost, "address of tiller server")
installCmd.Flags().StringVar(&tillerHost, "host", tiller.DefaultHost, "address of tiller server")
installCmd.Flags().BoolVar(&installDryRun, "dry-run", false, "simulate an install")
RootCommand.AddCommand(installCmd)

@ -45,6 +45,11 @@ func listCmd(cmd *cobra.Command, args []string) error {
fmt.Println("TODO: Implement filter.")
}
pf, err := newTillerPortForwarder()
defer pf.Close()
helm.Config.ServAddr = fmt.Sprintf(":%d", pf.Local)
res, err := helm.ListReleases(listMax, listOffset)
if err != nil {
return prettyError(err)

@ -0,0 +1,36 @@
package main
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
"github.com/kubernetes/helm/pkg/kube"
)
func newTillerPortForwarder() (*kube.Tunnel, error) {
podName, err := getTillerPodName("helm")
if err != nil {
return nil, err
}
return kube.New(nil).ForwardPort("helm", podName, 44134)
}
func getTillerPodName(namespace string) (string, error) {
client, err := kube.New(nil).Client()
if err != nil {
return "", err
}
selector := labels.Set{"app": "helm", "name": "tiller"}.AsSelector()
options := api.ListOptions{LabelSelector: selector}
pods, err := client.Pods(namespace).List(options)
if err != nil {
return "", err
}
if len(pods.Items) < 1 {
return "", fmt.Errorf("I could not find tiller")
}
return pods.Items[0].ObjectMeta.GetName(), nil
}

66
glide.lock generated

@ -1,8 +1,8 @@
hash: 43d0eadd3ed4b3563b4a6a49d3bd6d083bcb8fbe9d201dfef763b90c1677738c
updated: 2016-05-09T10:30:30.592692551-07:00
hash: 63afc29bc3d5032c6c2829b7bf9ffba43555ad5a3faf72a14ef2a86155c0514c
updated: 2016-05-11T14:15:54.42108644-07:00
imports:
- name: bitbucket.org/ww/goautoneg
version: 75cd24fc2f2c
version: 75cd24fc2f2c2a2088577d12123ddee5f54e0675
- name: github.com/aokoli/goutils
version: 9c37978a95bd5c709a15883b6242714ea6709e64
- name: github.com/beorn7/perks
@ -245,13 +245,13 @@ imports:
- pkg/client/unversioned/remotecommand
- pkg/kubectl/cmd/util
- pkg/kubectl/resource
- pkg/labels
- pkg/api/meta
- pkg/api/resource
- pkg/api/unversioned
- pkg/auth/user
- pkg/conversion
- pkg/fields
- pkg/labels
- pkg/runtime
- pkg/runtime/serializer
- pkg/types
@ -259,6 +259,7 @@ imports:
- pkg/util/intstr
- pkg/util/rand
- pkg/util/sets
- pkg/util/validation
- pkg/api/errors
- pkg/api/v1
- pkg/api/validation
@ -271,34 +272,23 @@ imports:
- pkg/version
- pkg/watch
- pkg/watch/json
- pkg/api/install
- pkg/apimachinery/registered
- pkg/apis/apps
- pkg/apis/apps/install
- pkg/apis/authorization/install
- pkg/apis/autoscaling
- pkg/apis/autoscaling/install
- pkg/apis/batch
- pkg/apis/batch/install
- pkg/apis/componentconfig/install
- pkg/apis/extensions
- pkg/apis/extensions/install
- pkg/apis/metrics/install
- pkg/client/typed/discovery
- pkg/util/wait
- plugin/pkg/client/auth
- pkg/client/unversioned/auth
- pkg/client/unversioned/clientcmd/api/latest
- pkg/util/errors
- pkg/util/homedir
- pkg/util/validation
- pkg/kubelet/server/portforward
- pkg/util/httpstream
- pkg/util/runtime
- pkg/kubelet/server/remotecommand
- pkg/util/httpstream/spdy
- pkg/apimachinery
- pkg/apimachinery/registered
- pkg/apis/apps
- pkg/apis/autoscaling
- pkg/apis/batch
- pkg/apis/extensions
- pkg/apis/metrics
- pkg/client/typed/discovery
- pkg/client/unversioned/adapters/internalclientset
- pkg/kubectl
- pkg/registry/thirdpartyresourcedata
@ -312,6 +302,7 @@ imports:
- pkg/runtime/serializer/protobuf
- pkg/runtime/serializer/recognizer
- pkg/runtime/serializer/versioning
- pkg/util/wait
- pkg/util/validation/field
- pkg/util/parsers
- pkg/watch/versioned
@ -321,37 +312,44 @@ imports:
- pkg/api/util
- pkg/capabilities
- pkg/util/integer
- pkg/apis/apps/v1alpha1
- pkg/apis/authorization
- pkg/apis/authorization/v1beta1
- pkg/apis/autoscaling/v1
- pkg/apis/batch/v1
- pkg/apis/componentconfig
- pkg/apis/componentconfig/v1alpha1
- pkg/apis/extensions/v1beta1
- pkg/apis/metrics/v1alpha1
- plugin/pkg/client/auth/gcp
- pkg/client/unversioned/clientcmd/api/v1
- pkg/httplog
- pkg/util/wsstream
- third_party/golang/netutil
- pkg/api/install
- pkg/apis/apps/install
- pkg/apis/authorization/install
- pkg/apis/autoscaling/install
- pkg/apis/batch/install
- pkg/apis/componentconfig/install
- pkg/apis/extensions/install
- pkg/apis/metrics/install
- plugin/pkg/client/auth
- pkg/client/clientset_generated/internalclientset
- pkg/client/clientset_generated/internalclientset/typed/core/unversioned
- pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned
- pkg/apis/batch/v1
- pkg/credentialprovider
- pkg/fieldpath
- pkg/kubelet/qos/util
- pkg/util/deployment
- pkg/util/jsonpath
- pkg/api/rest
- pkg/apis/extensions/v1beta1
- pkg/apis/extensions/validation
- pkg/registry/generic
- pkg/util/framer
- third_party/forked/json
- pkg/util/hash
- pkg/util/net/sets
- pkg/kubelet/qos
- pkg/master/ports
- pkg/apis/apps/v1alpha1
- pkg/apis/authorization
- pkg/apis/authorization/v1beta1
- pkg/apis/autoscaling/v1
- pkg/apis/componentconfig
- pkg/apis/componentconfig/v1alpha1
- pkg/apis/metrics/v1alpha1
- plugin/pkg/client/auth/gcp
- pkg/client/clientset_generated/internalclientset/typed/batch/unversioned
- pkg/controller
- pkg/util/labels
@ -360,6 +358,8 @@ imports:
- pkg/api/unversioned/validation
- pkg/controller/podautoscaler
- pkg/storage
- pkg/kubelet/qos
- pkg/master/ports
- pkg/client/cache
- pkg/client/record
- pkg/controller/framework

@ -34,4 +34,5 @@ import:
- pkg/client/unversioned/remotecommand
- pkg/kubectl/cmd/util
- pkg/kubectl/resource
- pkg/labels
- package: github.com/gosuri/uitable

@ -11,13 +11,13 @@ import (
// Client represents a client capable of communicating with the Kubernetes API.
type Client struct {
config clientcmd.ClientConfig
*cmdutil.Factory
}
// New create a new Client
func New(config clientcmd.ClientConfig) *Client {
return &Client{
config: config,
Factory: cmdutil.NewFactory(config),
}
}
@ -38,14 +38,10 @@ func (c *Client) Delete(namespace string, reader io.Reader) error {
return perform(c, namespace, reader, deleteResource)
}
func (c *Client) factory() *cmdutil.Factory {
return cmdutil.NewFactory(c.config)
}
const includeThirdPartyAPIs = false
func perform(c *Client, namespace string, reader io.Reader, fn ResourceActorFunc) error {
r := c.factory().NewBuilder(includeThirdPartyAPIs).
r := c.NewBuilder(includeThirdPartyAPIs).
ContinueOnError().
NamespaceParam(namespace).
RequireNamespace().

@ -7,7 +7,6 @@ import (
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/client/unversioned/fake"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/resource"
)
@ -46,12 +45,12 @@ func TestPerform(t *testing.T) {
return nil
}
f := cmdutil.NewFactory(nil)
f.ClientForMapping = func(mapping *meta.RESTMapping) (resource.RESTClient, error) {
c := New(nil)
c.ClientForMapping = func(mapping *meta.RESTMapping) (resource.RESTClient, error) {
return &fake.RESTClient{}, nil
}
err := perform(f, tt.namespace, tt.reader, fn)
err := perform(c, tt.namespace, tt.reader, fn)
if (err != nil) != tt.err {
t.Errorf("%q. expected error: %v, got %v", tt.name, tt.err, err)
}

@ -2,136 +2,89 @@ package kube
import (
"fmt"
"net/url"
"os"
"os/signal"
"net"
"strconv"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/portforward"
"k8s.io/kubernetes/pkg/client/unversioned/remotecommand"
)
type tunnel interface {
Close()
}
type Tunnel struct {
name string
stopCh chan struct{}
Local int
Remote int
stopChan chan struct{}
}
// Close disconnects a tunnel connection
func (t *Tunnel) Close() {
close(t.stopCh)
close(t.stopChan)
}
func newTunnel(c *Client, namespace, podName string) (*Tunnel, error) {
client, err := c.factory().Client()
// ForwardPort opens a tunnel to a kubernetes pod
func (c *Client) ForwardPort(namespace, podName string, remote int) (*Tunnel, error) {
client, err := c.Client()
if err != nil {
return nil, err
}
if err := isPodRunning(client, namespace, podName); err != nil {
return nil, err
}
config, err := c.factory().ClientConfig()
config, err := c.ClientConfig()
if err != nil {
return nil, err
}
req := client.RESTClient.Post().
// http://192.168.64.94:8080/api/v1/namespaces/helm/pods/tiller-rc-9itlq/portforward
u := client.RESTClient.Post().
Resource("pods").
Namespace(namespace).
Name(podName).
SubResource("portforward")
SubResource("portforward").URL()
stopCh := make(chan struct{}, 1)
ports := []string{"44134"}
dialer, err := remotecommand.NewExecutor(config, "POST", req.URL())
dialer, err := remotecommand.NewExecutor(config, "POST", u)
if err != nil {
return nil, err
}
fw, err := portforward.New(dialer, ports, stopCh)
local, err := getAvailablePort()
if err != nil {
return nil, err
}
go func() {
err = fw.ForwardPorts()
if err != nil {
fmt.Printf("Failed to forward ports %v on pod %s: %v\n", ports, podName, err)
t := &Tunnel{
Local: local,
Remote: remote,
stopChan: make(chan struct{}, 1),
}
}()
return &Tunnel{stopCh: stopCh}, nil
}
ports := []string{fmt.Sprintf("%d:%d", local, remote)}
func isPodRunning(client *unversioned.Client, namespace, podName string) error {
pod, err := client.Pods(namespace).Get(podName)
pf, err := portforward.New(dialer, ports, t.stopChan)
if err != nil {
return err
return nil, err
}
if pod.Status.Phase != api.PodRunning {
return fmt.Errorf("Unable to execute command because pod is not running. Current status=%v", pod.Status.Phase)
go func() {
if err := pf.ForwardPorts(); err != nil {
fmt.Printf("Error forwarding ports: %v\n", err)
}
return nil
}
type portForwarder interface {
ForwardPorts(url *url.URL, config *restclient.Config, ports []string, stopChan <-chan struct{}) error
}
}()
type defaultPortForwarder struct{}
// wait for listeners to start
<-pf.Ready
func (f *defaultPortForwarder) ForwardPorts(url *url.URL, config *restclient.Config, ports []string, stopChan <-chan struct{}) error {
dialer, err := remotecommand.NewExecutor(config, "POST", url)
if err != nil {
return err
}
fw, err := portforward.New(dialer, ports, stopChan)
if err != nil {
return err
}
return fw.ForwardPorts()
return t, nil
}
func RunPortForward(f *cmdutil.Factory, namespace, podName string, args []string, fw portForwarder) error {
client, err := f.Client()
func getAvailablePort() (int, error) {
l, err := net.Listen("tcp", ":0")
if err != nil {
return err
}
if err := isPodRunning(client, namespace, podName); err != nil {
return nil, err
return 0, err
}
defer l.Close()
config, err := f.ClientConfig()
_, p, err := net.SplitHostPort(l.Addr().String())
port, err := strconv.Atoi(p)
if err != nil {
return err
return 0, err
}
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
defer signal.Stop(signals)
stopCh := make(chan struct{}, 1)
go func() {
<-signals
close(stopCh)
}()
req := client.RESTClient.Post().
Resource("pods").
Namespace(namespace).
Name(pod.Name).
SubResource("portforward")
return fw.ForwardPorts(req.URL(), config, args, stopCh)
return port, err
}

@ -0,0 +1,15 @@
package kube
import (
"testing"
)
func TestAvailablePort(t *testing.T) {
port, err := getAvailablePort()
if err != nil {
t.Fatal(err)
}
if port < 1 {
t.Fatalf("generated port should be > 1, got %d", port)
}
}

@ -0,0 +1,6 @@
package tiller
const (
HostEnvVar = "TILLER_HOST"
DefaultHost = ":44134"
)
Loading…
Cancel
Save