Add --rudder-address flag to tiller server

Before this change it was possible to use only local rudder. As a consequence
it was required to change tiller-deploy definition generated by helm init.

This change adds --rudder-address flag to tiller server, by default it will be
set to 127.0.0.1:10001. Rudder address wil be ignore if --experimental-release
won't be added. Also tiller will crash if rudder server is provided without experimental release.
In my opinion this is a sane way to preserve previous behaviour.

I included refactoring of rudder grpc client as it is safe to reuse same connection
with multiple concurrent users and it is nice cleanup.
pull/2835/head
Dmitry Shulyak 8 years ago
parent e4c5802cbc
commit bd3c9e34d5

@ -29,6 +29,7 @@ import (
"github.com/spf13/pflag" "github.com/spf13/pflag"
"k8s.io/helm/pkg/kube" "k8s.io/helm/pkg/kube"
rudderAPI "k8s.io/helm/pkg/proto/hapi/rudder" rudderAPI "k8s.io/helm/pkg/proto/hapi/rudder"
"k8s.io/helm/pkg/rudder"
"k8s.io/helm/pkg/tiller" "k8s.io/helm/pkg/tiller"
"k8s.io/helm/pkg/version" "k8s.io/helm/pkg/version"
) )
@ -41,8 +42,8 @@ type options struct {
} }
func (opts *options) registerFlags() { func (opts *options) registerFlags() {
pflag.StringVarP(&opts.listen, "listen", "l", "127.0.0.1:10001", pflag.StringVarP(&opts.listen, "listen", "l", rudder.RudderDefaultAddress,
"Socket for rudder grpc server (default: 127.0.0.1:10001).") fmt.Sprintf("Socket for rudder grpc server (default: %s).", rudder.RudderDefaultAddress))
} }
func (opts *options) parseFlags() { func (opts *options) parseFlags() {

@ -35,6 +35,7 @@ import (
"k8s.io/helm/pkg/kube" "k8s.io/helm/pkg/kube"
"k8s.io/helm/pkg/proto/hapi/services" "k8s.io/helm/pkg/proto/hapi/services"
"k8s.io/helm/pkg/rudder"
"k8s.io/helm/pkg/storage" "k8s.io/helm/pkg/storage"
"k8s.io/helm/pkg/storage/driver" "k8s.io/helm/pkg/storage/driver"
"k8s.io/helm/pkg/tiller" "k8s.io/helm/pkg/tiller"
@ -70,6 +71,9 @@ var (
enableTracing = flag.Bool("trace", false, "enable rpc tracing") enableTracing = flag.Bool("trace", false, "enable rpc tracing")
store = flag.String("storage", storageConfigMap, "storage driver to use. One of 'configmap' or 'memory'") store = flag.String("storage", storageConfigMap, "storage driver to use. One of 'configmap' or 'memory'")
remoteReleaseModules = flag.Bool("experimental-release", false, "enable experimental release modules") remoteReleaseModules = flag.Bool("experimental-release", false, "enable experimental release modules")
rudderAddress = flag.String("rudder-address", rudder.RudderDefaultAddress,
fmt.Sprintf("rudder address:port (default: %s). ignored if experimental-release is omitted",
rudder.RudderDefaultAddress))
tlsEnable = flag.Bool("tls", tlsEnableEnvVarDefault(), "enable TLS") tlsEnable = flag.Bool("tls", tlsEnableEnvVarDefault(), "enable TLS")
tlsVerify = flag.Bool("tls-verify", tlsVerifyEnvVarDefault(), "enable TLS and verify remote certificate") tlsVerify = flag.Bool("tls-verify", tlsVerifyEnvVarDefault(), "enable TLS and verify remote certificate")
keyFile = flag.String("tls-key", tlsDefaultsFromEnv("tls-key"), "path to TLS private key file") keyFile = flag.String("tls-key", tlsDefaultsFromEnv("tls-key"), "path to TLS private key file")
@ -102,6 +106,12 @@ func main() {
} }
func start() { func start() {
if *rudderAddress != rudder.RudderDefaultAddress && !*remoteReleaseModules {
log.Fatal(
"Rudder address will be ignored without --experimental-release flag.",
"Add --experimental-release or remove --rudder-address.",
)
}
clientset, err := kube.New(nil).ClientSet() clientset, err := kube.New(nil).ClientSet()
if err != nil { if err != nil {
@ -163,7 +173,15 @@ func start() {
srvErrCh := make(chan error) srvErrCh := make(chan error)
probeErrCh := make(chan error) probeErrCh := make(chan error)
go func() { go func() {
svc := tiller.NewReleaseServer(env, clientset, *remoteReleaseModules) var addressToUse string
if *remoteReleaseModules {
addressToUse = *rudderAddress
}
svc, err := tiller.NewReleaseServer(env, clientset, addressToUse)
if err != nil {
srvErrCh <- err
return
}
svc.Log = newLogger("tiller").Printf svc.Log = newLogger("tiller").Printf
services.RegisterReleaseServiceServer(rootServer, svc) services.RegisterReleaseServiceServer(rootServer, svc)
if err := rootServer.Serve(lstn); err != nil { if err := rootServer.Serve(lstn); err != nil {

@ -18,6 +18,7 @@ package rudder // import "k8s.io/helm/pkg/rudder"
import ( import (
"fmt" "fmt"
"time"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -25,67 +26,51 @@ import (
rudderAPI "k8s.io/helm/pkg/proto/hapi/rudder" rudderAPI "k8s.io/helm/pkg/proto/hapi/rudder"
) )
// GrpcPort specifies port on which rudder will spawn a server // RudderDefaultAddress will be used if none is provided with command lines argumnets.
const ( const (
GrpcPort = 10001 RudderDefaultAddress = "127.0.0.1:10001"
) )
var grpcAddr = fmt.Sprintf("127.0.0.1:%d", GrpcPort) // NewClient creates new instance of Client.
func NewClient(address string) (*Client, error) {
// InstallRelease calls Rudder InstallRelease method which should create provided release
func InstallRelease(rel *rudderAPI.InstallReleaseRequest) (*rudderAPI.InstallReleaseResponse, error) {
//TODO(mkwiek): parametrize this //TODO(mkwiek): parametrize this
conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure()) conn, err := grpc.Dial(
address,
grpc.WithInsecure(),
grpc.WithBlock(),
grpc.WithTimeout(3*time.Second))
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("error establishing connection with rudder using address %s: %v", address, err)
} }
return &Client{client: rudderAPI.NewReleaseModuleServiceClient(conn)}, nil
}
// Client wraps rudder grpc client.
type Client struct {
client rudderAPI.ReleaseModuleServiceClient
}
defer conn.Close() // InstallRelease calls Rudder InstallRelease method which should create provided release
client := rudderAPI.NewReleaseModuleServiceClient(conn) func (c *Client) InstallRelease(rel *rudderAPI.InstallReleaseRequest) (*rudderAPI.InstallReleaseResponse, error) {
return client.InstallRelease(context.Background(), rel) return c.client.InstallRelease(context.Background(), rel)
} }
// UpgradeRelease calls Rudder UpgradeRelease method which should perform update // UpgradeRelease calls Rudder UpgradeRelease method which should perform update
func UpgradeRelease(req *rudderAPI.UpgradeReleaseRequest) (*rudderAPI.UpgradeReleaseResponse, error) { func (c *Client) UpgradeRelease(req *rudderAPI.UpgradeReleaseRequest) (*rudderAPI.UpgradeReleaseResponse, error) {
conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure()) return c.client.UpgradeRelease(context.Background(), req)
if err != nil {
return nil, err
}
defer conn.Close()
client := rudderAPI.NewReleaseModuleServiceClient(conn)
return client.UpgradeRelease(context.Background(), req)
} }
// RollbackRelease calls Rudder RollbackRelease method which should perform update // RollbackRelease calls Rudder RollbackRelease method which should perform update
func RollbackRelease(req *rudderAPI.RollbackReleaseRequest) (*rudderAPI.RollbackReleaseResponse, error) { func (c *Client) RollbackRelease(req *rudderAPI.RollbackReleaseRequest) (*rudderAPI.RollbackReleaseResponse, error) {
conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure()) return c.client.RollbackRelease(context.Background(), req)
if err != nil {
return nil, err
}
defer conn.Close()
client := rudderAPI.NewReleaseModuleServiceClient(conn)
return client.RollbackRelease(context.Background(), req)
} }
// ReleaseStatus calls Rudder ReleaseStatus method which should perform update // ReleaseStatus calls Rudder ReleaseStatus method which should perform update
func ReleaseStatus(req *rudderAPI.ReleaseStatusRequest) (*rudderAPI.ReleaseStatusResponse, error) { func (c *Client) ReleaseStatus(req *rudderAPI.ReleaseStatusRequest) (*rudderAPI.ReleaseStatusResponse, error) {
conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure()) return c.client.ReleaseStatus(context.Background(), req)
if err != nil {
return nil, err
}
defer conn.Close()
client := rudderAPI.NewReleaseModuleServiceClient(conn)
return client.ReleaseStatus(context.Background(), req)
} }
// DeleteRelease calls Rudder DeleteRelease method which should uninstall provided release // DeleteRelease calls Rudder DeleteRelease method which should uninstall provided release
func DeleteRelease(rel *rudderAPI.DeleteReleaseRequest) (*rudderAPI.DeleteReleaseResponse, error) { func (c *Client) DeleteRelease(rel *rudderAPI.DeleteReleaseRequest) (*rudderAPI.DeleteReleaseResponse, error) {
conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure()) return c.client.DeleteRelease(context.Background(), rel)
if err != nil {
return nil, err
}
defer conn.Close()
client := rudderAPI.NewReleaseModuleServiceClient(conn)
return client.DeleteRelease(context.Background(), rel)
} }

@ -84,12 +84,23 @@ func (m *LocalReleaseModule) Delete(rel *release.Release, req *services.Uninstal
} }
// RemoteReleaseModule is a ReleaseModule which calls Rudder service to operate on a release // RemoteReleaseModule is a ReleaseModule which calls Rudder service to operate on a release
type RemoteReleaseModule struct{} type RemoteReleaseModule struct {
c *rudder.Client
}
// NewRemoteReleaseModule creates new instance of RemoteReleaseModule and returns it as ReleaseModule interface.
func NewRemoteReleaseModule(address string) (ReleaseModule, error) {
client, err := rudder.NewClient(address)
if err != nil {
return nil, err
}
return &RemoteReleaseModule{c: client}, nil
}
// Create calls rudder.InstallRelease // Create calls rudder.InstallRelease
func (m *RemoteReleaseModule) Create(r *release.Release, req *services.InstallReleaseRequest, env *environment.Environment) error { func (m *RemoteReleaseModule) Create(r *release.Release, req *services.InstallReleaseRequest, env *environment.Environment) error {
request := &rudderAPI.InstallReleaseRequest{Release: r} request := &rudderAPI.InstallReleaseRequest{Release: r}
_, err := rudder.InstallRelease(request) _, err := m.c.InstallRelease(request)
return err return err
} }
@ -103,7 +114,7 @@ func (m *RemoteReleaseModule) Update(current, target *release.Release, req *serv
Wait: req.Wait, Wait: req.Wait,
Force: req.Force, Force: req.Force,
} }
_, err := rudder.UpgradeRelease(upgrade) _, err := m.c.UpgradeRelease(upgrade)
return err return err
} }
@ -116,14 +127,14 @@ func (m *RemoteReleaseModule) Rollback(current, target *release.Release, req *se
Timeout: req.Timeout, Timeout: req.Timeout,
Wait: req.Wait, Wait: req.Wait,
} }
_, err := rudder.RollbackRelease(rollback) _, err := m.c.RollbackRelease(rollback)
return err return err
} }
// Status returns status retrieved from rudder.ReleaseStatus // Status returns status retrieved from rudder.ReleaseStatus
func (m *RemoteReleaseModule) Status(r *release.Release, req *services.GetReleaseStatusRequest, env *environment.Environment) (string, error) { func (m *RemoteReleaseModule) Status(r *release.Release, req *services.GetReleaseStatusRequest, env *environment.Environment) (string, error) {
statusRequest := &rudderAPI.ReleaseStatusRequest{Release: r} statusRequest := &rudderAPI.ReleaseStatusRequest{Release: r}
resp, err := rudder.ReleaseStatus(statusRequest) resp, err := m.c.ReleaseStatus(statusRequest)
if resp == nil { if resp == nil {
return "", err return "", err
} }
@ -133,7 +144,7 @@ func (m *RemoteReleaseModule) Status(r *release.Release, req *services.GetReleas
// Delete calls rudder.DeleteRelease // Delete calls rudder.DeleteRelease
func (m *RemoteReleaseModule) Delete(r *release.Release, req *services.UninstallReleaseRequest, env *environment.Environment) (string, []error) { func (m *RemoteReleaseModule) Delete(r *release.Release, req *services.UninstallReleaseRequest, env *environment.Environment) (string, []error) {
deleteRequest := &rudderAPI.DeleteReleaseRequest{Release: r} deleteRequest := &rudderAPI.DeleteReleaseRequest{Release: r}
resp, err := rudder.DeleteRelease(deleteRequest) resp, err := m.c.DeleteRelease(deleteRequest)
errs := make([]error, 0) errs := make([]error, 0)
result := "" result := ""

@ -86,10 +86,15 @@ type ReleaseServer struct {
} }
// NewReleaseServer creates a new release server. // NewReleaseServer creates a new release server.
func NewReleaseServer(env *environment.Environment, clientset internalclientset.Interface, useRemote bool) *ReleaseServer { func NewReleaseServer(env *environment.Environment, clientset internalclientset.Interface, rudderAddress string) (
*ReleaseServer, error) {
var releaseModule ReleaseModule var releaseModule ReleaseModule
if useRemote { var err error
releaseModule = &RemoteReleaseModule{} if len(rudderAddress) != 0 {
releaseModule, err = NewRemoteReleaseModule(rudderAddress)
if err != nil {
return nil, err
}
} else { } else {
releaseModule = &LocalReleaseModule{ releaseModule = &LocalReleaseModule{
clientset: clientset, clientset: clientset,
@ -101,7 +106,7 @@ func NewReleaseServer(env *environment.Environment, clientset internalclientset.
clientset: clientset, clientset: clientset,
ReleaseModule: releaseModule, ReleaseModule: releaseModule,
Log: func(_ string, _ ...interface{}) {}, Log: func(_ string, _ ...interface{}) {},
} }, nil
} }
// reuseValues copies values from the current release to a new release if the // reuseValues copies values from the current release to a new release if the

Loading…
Cancel
Save