Retry setup connection until times out (Fixes #3379)

Signed-off-by: Alex Johnson <ajohnson@bombora.com>
pull/3490/head
Alex Johnson 8 years ago
parent c5780f38b1
commit b56d82addf

@ -23,16 +23,17 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"time"
"github.com/spf13/cobra" "github.com/spf13/cobra"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/kubernetes"
"k8s.io/apimachinery/pkg/util/yaml" "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/kubernetes"
"k8s.io/helm/cmd/helm/installer" "k8s.io/helm/cmd/helm/installer"
"k8s.io/helm/pkg/getter" "k8s.io/helm/pkg/getter"
"k8s.io/helm/pkg/helm" "k8s.io/helm/pkg/helm"
"k8s.io/helm/pkg/helm/helmpath" "k8s.io/helm/pkg/helm/helmpath"
"k8s.io/helm/pkg/helm/portforwarder"
"k8s.io/helm/pkg/repo" "k8s.io/helm/pkg/repo"
) )
@ -87,6 +88,7 @@ type initCmd struct {
serviceAccount string serviceAccount string
maxHistory int maxHistory int
wait bool wait bool
timeout int
} }
func newInitCmd(out io.Writer) *cobra.Command { func newInitCmd(out io.Writer) *cobra.Command {
@ -102,10 +104,11 @@ func newInitCmd(out io.Writer) *cobra.Command {
} }
i.namespace = settings.TillerNamespace i.namespace = settings.TillerNamespace
i.home = settings.Home i.home = settings.Home
i.client = ensureHelmClient(i.client)
return i.run() return i.run()
}, },
PostRunE: func(cmd *cobra.Command, args []string) error {
return i.ensureTillerRunning()
},
} }
f := cmd.Flags() f := cmd.Flags()
@ -117,6 +120,7 @@ func newInitCmd(out io.Writer) *cobra.Command {
f.BoolVar(&i.dryRun, "dry-run", false, "do not install local or remote") f.BoolVar(&i.dryRun, "dry-run", false, "do not install local or remote")
f.BoolVar(&i.skipRefresh, "skip-refresh", false, "do not refresh (download) the local repository cache") f.BoolVar(&i.skipRefresh, "skip-refresh", false, "do not refresh (download) the local repository cache")
f.BoolVar(&i.wait, "wait", false, "block until Tiller is running and ready to receive requests") f.BoolVar(&i.wait, "wait", false, "block until Tiller is running and ready to receive requests")
f.IntVar(&i.timeout, "timeout", 10, "number of seconds to wait for tiller before timing out")
f.BoolVar(&tlsEnable, "tiller-tls", false, "install Tiller with TLS enabled") f.BoolVar(&tlsEnable, "tiller-tls", false, "install Tiller with TLS enabled")
f.BoolVar(&tlsVerify, "tiller-tls-verify", false, "install Tiller with TLS enabled and to verify remote certificates") f.BoolVar(&tlsVerify, "tiller-tls-verify", false, "install Tiller with TLS enabled and to verify remote certificates")
@ -298,18 +302,12 @@ func (i *initCmd) run() error {
if err := installer.Upgrade(i.kubeClient, &i.opts); err != nil { if err := installer.Upgrade(i.kubeClient, &i.opts); err != nil {
return fmt.Errorf("error when upgrading: %s", err) return fmt.Errorf("error when upgrading: %s", err)
} }
if err := i.ping(); err != nil {
return err
}
fmt.Fprintln(i.out, "\nTiller (the Helm server-side component) has been upgraded to the current version.") fmt.Fprintln(i.out, "\nTiller (the Helm server-side component) has been upgraded to the current version.")
} else { } else {
fmt.Fprintln(i.out, "Warning: Tiller is already installed in the cluster.\n"+ fmt.Fprintln(i.out, "Warning: Tiller is already installed in the cluster.\n"+
"(Use --client-only to suppress this message, or --upgrade to upgrade Tiller to the current version.)") "(Use --client-only to suppress this message, or --upgrade to upgrade Tiller to the current version.)")
} }
} else { } else {
if err := i.ping(); err != nil {
return err
}
fmt.Fprintln(i.out, "\nTiller (the Helm server-side component) has been installed into your Kubernetes Cluster.") fmt.Fprintln(i.out, "\nTiller (the Helm server-side component) has been installed into your Kubernetes Cluster.")
} }
} else { } else {
@ -320,11 +318,27 @@ func (i *initCmd) run() error {
return nil return nil
} }
func (i *initCmd) ping() error { func (i *initCmd) ensureTillerRunning() error {
if i.wait { if !i.wait {
if err := i.client.PingTiller(); err != nil { return nil
return fmt.Errorf("could not ping Tiller: %s", err) }
var retryCount int
retry:
config, client, err := getKubeClient(settings.KubeContext)
if err != nil {
return err
}
_, err = portforwarder.New(settings.TillerNamespace, client, config)
if err != nil {
if err == portforwarder.ErrTillerNotFound && retryCount <= i.timeout {
retryCount++
time.Sleep(time.Second)
goto retry
} }
return err
} }
return nil return nil

@ -51,6 +51,7 @@ helm init
--tiller-tls-cert string path to TLS certificate file to install with Tiller --tiller-tls-cert string path to TLS certificate file to install with Tiller
--tiller-tls-key string path to TLS key file to install with Tiller --tiller-tls-key string path to TLS key file to install with Tiller
--tiller-tls-verify install Tiller with TLS enabled and to verify remote certificates --tiller-tls-verify install Tiller with TLS enabled and to verify remote certificates
--timeout int number of seconds to wait for tiller before timing out (default 10)
--tls-ca-cert string path to CA root certificate --tls-ca-cert string path to CA root certificate
--upgrade upgrade if Tiller is already installed --upgrade upgrade if Tiller is already installed
--wait block until Tiller is running and ready to receive requests --wait block until Tiller is running and ready to receive requests
@ -69,4 +70,4 @@ helm init
### SEE ALSO ### SEE ALSO
* [helm](helm.md) - The Helm package manager for Kubernetes. * [helm](helm.md) - The Helm package manager for Kubernetes.
###### Auto generated by spf13/cobra on 25-Jan-2018 ###### Auto generated by spf13/cobra on 9-Feb-2018

@ -293,12 +293,6 @@ func (h *Client) RunReleaseTest(rlsName string, opts ...ReleaseTestOption) (<-ch
return h.test(ctx, req) return h.test(ctx, req)
} }
// PingTiller pings the Tiller pod and ensure's that it is up and running
func (h *Client) PingTiller() error {
ctx := NewContext()
return h.ping(ctx)
}
// connect returns a gRPC connection to Tiller or error. The gRPC dial options // connect returns a gRPC connection to Tiller or error. The gRPC dial options
// are constructed here. // are constructed here.
func (h *Client) connect(ctx context.Context) (conn *grpc.ClientConn, err error) { func (h *Client) connect(ctx context.Context) (conn *grpc.ClientConn, err error) {
@ -473,15 +467,3 @@ func (h *Client) test(ctx context.Context, req *rls.TestReleaseRequest) (<-chan
return ch, errc return ch, errc
} }
// Executes tiller.Ping RPC.
func (h *Client) ping(ctx context.Context) error {
c, err := h.connect(ctx)
if err != nil {
return err
}
defer c.Close()
rlc := rls.NewReleaseServiceClient(c)
return rlc.PingTiller(ctx)
}

@ -184,11 +184,6 @@ func (c *FakeClient) RunReleaseTest(rlsName string, opts ...ReleaseTestOption) (
return results, errc return results, errc
} }
// PingTiller pings the Tiller pod and ensure's that it is up and running
func (c *FakeClient) PingTiller() error {
return nil
}
// MockHookTemplate is the hook template used for all mock release objects. // MockHookTemplate is the hook template used for all mock release objects.
var MockHookTemplate = `apiVersion: v1 var MockHookTemplate = `apiVersion: v1
kind: Job kind: Job

@ -35,5 +35,4 @@ type Interface interface {
ReleaseHistory(rlsName string, opts ...HistoryOption) (*rls.GetHistoryResponse, error) ReleaseHistory(rlsName string, opts ...HistoryOption) (*rls.GetHistoryResponse, error)
GetVersion(opts ...VersionOption) (*rls.GetVersionResponse, error) GetVersion(opts ...VersionOption) (*rls.GetVersionResponse, error)
RunReleaseTest(rlsName string, opts ...ReleaseTestOption) (<-chan *rls.TestReleaseResponse, <-chan error) RunReleaseTest(rlsName string, opts ...ReleaseTestOption) (<-chan *rls.TestReleaseResponse, <-chan error)
PingTiller() error
} }

@ -31,6 +31,8 @@ import (
var ( var (
tillerPodLabels labels.Set = labels.Set{"app": "helm", "name": "tiller"} tillerPodLabels labels.Set = labels.Set{"app": "helm", "name": "tiller"}
// ErrTillerNotFound thrown when a running tiller instance is not found
ErrTillerNotFound = fmt.Errorf("could not find a ready tiller pod")
) )
// New creates a new and initialized tunnel. // New creates a new and initialized tunnel.
@ -67,5 +69,5 @@ func getFirstRunningPod(client corev1.PodsGetter, namespace string, selector lab
return &p, nil return &p, nil
} }
} }
return nil, fmt.Errorf("could not find a ready tiller pod") return nil, ErrTillerNotFound
} }

@ -949,8 +949,6 @@ type ReleaseServiceClient interface {
GetHistory(ctx context.Context, in *GetHistoryRequest, opts ...grpc.CallOption) (*GetHistoryResponse, error) GetHistory(ctx context.Context, in *GetHistoryRequest, opts ...grpc.CallOption) (*GetHistoryResponse, error)
// RunReleaseTest executes the tests defined of a named release // RunReleaseTest executes the tests defined of a named release
RunReleaseTest(ctx context.Context, in *TestReleaseRequest, opts ...grpc.CallOption) (ReleaseService_RunReleaseTestClient, error) RunReleaseTest(ctx context.Context, in *TestReleaseRequest, opts ...grpc.CallOption) (ReleaseService_RunReleaseTestClient, error)
// PingTiller sends a test/ping signal to Tiller to ensure that it's up
PingTiller(ctx context.Context) error
} }
type releaseServiceClient struct { type releaseServiceClient struct {
@ -1080,14 +1078,6 @@ func (c *releaseServiceClient) RunReleaseTest(ctx context.Context, in *TestRelea
return x, nil return x, nil
} }
func (c *releaseServiceClient) PingTiller(ctx context.Context) error {
err := grpc.Invoke(ctx, "/hapi.services.tiller.ReleaseService/PingTiller", "Ping", nil, c.cc, grpc.FailFast(false))
if err != nil {
return err
}
return nil
}
type ReleaseService_RunReleaseTestClient interface { type ReleaseService_RunReleaseTestClient interface {
Recv() (*TestReleaseResponse, error) Recv() (*TestReleaseResponse, error)
grpc.ClientStream grpc.ClientStream
@ -1310,10 +1300,6 @@ func _ReleaseService_RunReleaseTest_Handler(srv interface{}, stream grpc.ServerS
return srv.(ReleaseServiceServer).RunReleaseTest(m, &releaseServiceRunReleaseTestServer{stream}) return srv.(ReleaseServiceServer).RunReleaseTest(m, &releaseServiceRunReleaseTestServer{stream})
} }
func _ReleaseService_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
return "Pong", nil
}
type ReleaseService_RunReleaseTestServer interface { type ReleaseService_RunReleaseTestServer interface {
Send(*TestReleaseResponse) error Send(*TestReleaseResponse) error
grpc.ServerStream grpc.ServerStream
@ -1363,10 +1349,6 @@ var _ReleaseService_serviceDesc = grpc.ServiceDesc{
MethodName: "GetHistory", MethodName: "GetHistory",
Handler: _ReleaseService_GetHistory_Handler, Handler: _ReleaseService_GetHistory_Handler,
}, },
{
MethodName: "PingTiller",
Handler: _ReleaseService_Ping_Handler,
},
}, },
Streams: []grpc.StreamDesc{ Streams: []grpc.StreamDesc{
{ {

@ -26,7 +26,7 @@ var (
// Increment major number for new feature additions and behavioral changes. // Increment major number for new feature additions and behavioral changes.
// Increment minor number for bug fixes and performance enhancements. // Increment minor number for bug fixes and performance enhancements.
// Increment patch number for critical fixes to existing releases. // Increment patch number for critical fixes to existing releases.
Version = "v2.8" Version = "v2.8.1"
// BuildMetadata is extra build time data // BuildMetadata is extra build time data
BuildMetadata = "unreleased" BuildMetadata = "unreleased"

Loading…
Cancel
Save