diff --git a/cmd/rudder/rudder.go b/cmd/rudder/rudder.go index 30ece3998..f869cfecd 100644 --- a/cmd/rudder/rudder.go +++ b/cmd/rudder/rudder.go @@ -29,6 +29,7 @@ import ( "github.com/spf13/pflag" "k8s.io/helm/pkg/kube" rudderAPI "k8s.io/helm/pkg/proto/hapi/rudder" + "k8s.io/helm/pkg/rudder" "k8s.io/helm/pkg/tiller" "k8s.io/helm/pkg/version" ) @@ -41,8 +42,8 @@ type options struct { } func (opts *options) registerFlags() { - pflag.StringVarP(&opts.listen, "listen", "l", "127.0.0.1:10001", - "Socket for rudder grpc server (default: 127.0.0.1:10001).") + pflag.StringVarP(&opts.listen, "listen", "l", rudder.RudderDefaultAddress, + fmt.Sprintf("Socket for rudder grpc server (default: %s).", rudder.RudderDefaultAddress)) } func (opts *options) parseFlags() { diff --git a/cmd/tiller/tiller.go b/cmd/tiller/tiller.go index 2a4cf066e..85bd7943f 100644 --- a/cmd/tiller/tiller.go +++ b/cmd/tiller/tiller.go @@ -35,6 +35,7 @@ import ( "k8s.io/helm/pkg/kube" "k8s.io/helm/pkg/proto/hapi/services" + "k8s.io/helm/pkg/rudder" "k8s.io/helm/pkg/storage" "k8s.io/helm/pkg/storage/driver" "k8s.io/helm/pkg/tiller" @@ -70,12 +71,15 @@ var ( enableTracing = flag.Bool("trace", false, "enable rpc tracing") store = flag.String("storage", storageConfigMap, "storage driver to use. One of 'configmap' or 'memory'") remoteReleaseModules = flag.Bool("experimental-release", false, "enable experimental release modules") - tlsEnable = flag.Bool("tls", tlsEnableEnvVarDefault(), "enable TLS") - tlsVerify = flag.Bool("tls-verify", tlsVerifyEnvVarDefault(), "enable TLS and verify remote certificate") - keyFile = flag.String("tls-key", tlsDefaultsFromEnv("tls-key"), "path to TLS private key file") - certFile = flag.String("tls-cert", tlsDefaultsFromEnv("tls-cert"), "path to TLS certificate file") - caCertFile = flag.String("tls-ca-cert", tlsDefaultsFromEnv("tls-ca-cert"), "trust certificates signed by this CA") - maxHistory = flag.Int("history-max", historyMaxFromEnv(), "maximum number of releases kept in release history, with 0 meaning no limit") + rudderAddress = flag.String("rudder-address", rudder.RudderDefaultAddress, + fmt.Sprintf("rudder address:port (default: %s). ignored if experimental-release is omitted", + rudder.RudderDefaultAddress)) + tlsEnable = flag.Bool("tls", tlsEnableEnvVarDefault(), "enable TLS") + tlsVerify = flag.Bool("tls-verify", tlsVerifyEnvVarDefault(), "enable TLS and verify remote certificate") + keyFile = flag.String("tls-key", tlsDefaultsFromEnv("tls-key"), "path to TLS private key file") + certFile = flag.String("tls-cert", tlsDefaultsFromEnv("tls-cert"), "path to TLS certificate file") + caCertFile = flag.String("tls-ca-cert", tlsDefaultsFromEnv("tls-ca-cert"), "trust certificates signed by this CA") + maxHistory = flag.Int("history-max", historyMaxFromEnv(), "maximum number of releases kept in release history, with 0 meaning no limit") // rootServer is the root gRPC server. // @@ -102,6 +106,12 @@ func main() { } func start() { + if *rudderAddress != rudder.RudderDefaultAddress && !*remoteReleaseModules { + log.Fatal( + "Rudder address will be ignored without --experimental-release flag.", + "Add --experimental-release or remove --rudder-address.", + ) + } clientset, err := kube.New(nil).ClientSet() if err != nil { @@ -163,7 +173,15 @@ func start() { srvErrCh := make(chan error) probeErrCh := make(chan error) go func() { - svc := tiller.NewReleaseServer(env, clientset, *remoteReleaseModules) + var addressToUse string + if *remoteReleaseModules { + addressToUse = *rudderAddress + } + svc, err := tiller.NewReleaseServer(env, clientset, addressToUse) + if err != nil { + srvErrCh <- err + return + } svc.Log = newLogger("tiller").Printf services.RegisterReleaseServiceServer(rootServer, svc) if err := rootServer.Serve(lstn); err != nil { diff --git a/pkg/rudder/client.go b/pkg/rudder/client.go index 219bb010a..327883df3 100644 --- a/pkg/rudder/client.go +++ b/pkg/rudder/client.go @@ -18,6 +18,7 @@ package rudder // import "k8s.io/helm/pkg/rudder" import ( "fmt" + "time" "golang.org/x/net/context" "google.golang.org/grpc" @@ -25,67 +26,51 @@ import ( rudderAPI "k8s.io/helm/pkg/proto/hapi/rudder" ) -// GrpcPort specifies port on which rudder will spawn a server +// RudderDefaultAddress will be used if none is provided with command lines argumnets. const ( - GrpcPort = 10001 + RudderDefaultAddress = "127.0.0.1:10001" ) -var grpcAddr = fmt.Sprintf("127.0.0.1:%d", GrpcPort) - -// InstallRelease calls Rudder InstallRelease method which should create provided release -func InstallRelease(rel *rudderAPI.InstallReleaseRequest) (*rudderAPI.InstallReleaseResponse, error) { +// NewClient creates new instance of Client. +func NewClient(address string) (*Client, error) { //TODO(mkwiek): parametrize this - conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure()) + conn, err := grpc.Dial( + address, + grpc.WithInsecure(), + grpc.WithBlock(), + grpc.WithTimeout(3*time.Second)) if err != nil { - return nil, err + return nil, fmt.Errorf("error establishing connection with rudder using address %s: %v", address, err) } + return &Client{client: rudderAPI.NewReleaseModuleServiceClient(conn)}, nil +} + +// Client wraps rudder grpc client. +type Client struct { + client rudderAPI.ReleaseModuleServiceClient +} - defer conn.Close() - client := rudderAPI.NewReleaseModuleServiceClient(conn) - return client.InstallRelease(context.Background(), rel) +// InstallRelease calls Rudder InstallRelease method which should create provided release +func (c *Client) InstallRelease(rel *rudderAPI.InstallReleaseRequest) (*rudderAPI.InstallReleaseResponse, error) { + return c.client.InstallRelease(context.Background(), rel) } // UpgradeRelease calls Rudder UpgradeRelease method which should perform update -func UpgradeRelease(req *rudderAPI.UpgradeReleaseRequest) (*rudderAPI.UpgradeReleaseResponse, error) { - conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure()) - if err != nil { - return nil, err - } - defer conn.Close() - client := rudderAPI.NewReleaseModuleServiceClient(conn) - return client.UpgradeRelease(context.Background(), req) +func (c *Client) UpgradeRelease(req *rudderAPI.UpgradeReleaseRequest) (*rudderAPI.UpgradeReleaseResponse, error) { + return c.client.UpgradeRelease(context.Background(), req) } // RollbackRelease calls Rudder RollbackRelease method which should perform update -func RollbackRelease(req *rudderAPI.RollbackReleaseRequest) (*rudderAPI.RollbackReleaseResponse, error) { - conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure()) - if err != nil { - return nil, err - } - defer conn.Close() - client := rudderAPI.NewReleaseModuleServiceClient(conn) - return client.RollbackRelease(context.Background(), req) +func (c *Client) RollbackRelease(req *rudderAPI.RollbackReleaseRequest) (*rudderAPI.RollbackReleaseResponse, error) { + return c.client.RollbackRelease(context.Background(), req) } // ReleaseStatus calls Rudder ReleaseStatus method which should perform update -func ReleaseStatus(req *rudderAPI.ReleaseStatusRequest) (*rudderAPI.ReleaseStatusResponse, error) { - conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure()) - if err != nil { - return nil, err - } - defer conn.Close() - client := rudderAPI.NewReleaseModuleServiceClient(conn) - return client.ReleaseStatus(context.Background(), req) +func (c *Client) ReleaseStatus(req *rudderAPI.ReleaseStatusRequest) (*rudderAPI.ReleaseStatusResponse, error) { + return c.client.ReleaseStatus(context.Background(), req) } // DeleteRelease calls Rudder DeleteRelease method which should uninstall provided release -func DeleteRelease(rel *rudderAPI.DeleteReleaseRequest) (*rudderAPI.DeleteReleaseResponse, error) { - conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure()) - if err != nil { - return nil, err - } - - defer conn.Close() - client := rudderAPI.NewReleaseModuleServiceClient(conn) - return client.DeleteRelease(context.Background(), rel) +func (c *Client) DeleteRelease(rel *rudderAPI.DeleteReleaseRequest) (*rudderAPI.DeleteReleaseResponse, error) { + return c.client.DeleteRelease(context.Background(), rel) } diff --git a/pkg/tiller/release_modules.go b/pkg/tiller/release_modules.go index 73b95e60c..af19d32b6 100644 --- a/pkg/tiller/release_modules.go +++ b/pkg/tiller/release_modules.go @@ -84,12 +84,23 @@ func (m *LocalReleaseModule) Delete(rel *release.Release, req *services.Uninstal } // RemoteReleaseModule is a ReleaseModule which calls Rudder service to operate on a release -type RemoteReleaseModule struct{} +type RemoteReleaseModule struct { + c *rudder.Client +} + +// NewRemoteReleaseModule creates new instance of RemoteReleaseModule and returns it as ReleaseModule interface. +func NewRemoteReleaseModule(address string) (ReleaseModule, error) { + client, err := rudder.NewClient(address) + if err != nil { + return nil, err + } + return &RemoteReleaseModule{c: client}, nil +} // Create calls rudder.InstallRelease func (m *RemoteReleaseModule) Create(r *release.Release, req *services.InstallReleaseRequest, env *environment.Environment) error { request := &rudderAPI.InstallReleaseRequest{Release: r} - _, err := rudder.InstallRelease(request) + _, err := m.c.InstallRelease(request) return err } @@ -103,7 +114,7 @@ func (m *RemoteReleaseModule) Update(current, target *release.Release, req *serv Wait: req.Wait, Force: req.Force, } - _, err := rudder.UpgradeRelease(upgrade) + _, err := m.c.UpgradeRelease(upgrade) return err } @@ -116,14 +127,14 @@ func (m *RemoteReleaseModule) Rollback(current, target *release.Release, req *se Timeout: req.Timeout, Wait: req.Wait, } - _, err := rudder.RollbackRelease(rollback) + _, err := m.c.RollbackRelease(rollback) return err } // Status returns status retrieved from rudder.ReleaseStatus func (m *RemoteReleaseModule) Status(r *release.Release, req *services.GetReleaseStatusRequest, env *environment.Environment) (string, error) { statusRequest := &rudderAPI.ReleaseStatusRequest{Release: r} - resp, err := rudder.ReleaseStatus(statusRequest) + resp, err := m.c.ReleaseStatus(statusRequest) if resp == nil { return "", err } @@ -133,7 +144,7 @@ func (m *RemoteReleaseModule) Status(r *release.Release, req *services.GetReleas // Delete calls rudder.DeleteRelease func (m *RemoteReleaseModule) Delete(r *release.Release, req *services.UninstallReleaseRequest, env *environment.Environment) (string, []error) { deleteRequest := &rudderAPI.DeleteReleaseRequest{Release: r} - resp, err := rudder.DeleteRelease(deleteRequest) + resp, err := m.c.DeleteRelease(deleteRequest) errs := make([]error, 0) result := "" diff --git a/pkg/tiller/release_server.go b/pkg/tiller/release_server.go index db7c0b568..3aea083e4 100644 --- a/pkg/tiller/release_server.go +++ b/pkg/tiller/release_server.go @@ -86,10 +86,15 @@ type ReleaseServer struct { } // NewReleaseServer creates a new release server. -func NewReleaseServer(env *environment.Environment, clientset internalclientset.Interface, useRemote bool) *ReleaseServer { +func NewReleaseServer(env *environment.Environment, clientset internalclientset.Interface, rudderAddress string) ( + *ReleaseServer, error) { var releaseModule ReleaseModule - if useRemote { - releaseModule = &RemoteReleaseModule{} + var err error + if len(rudderAddress) != 0 { + releaseModule, err = NewRemoteReleaseModule(rudderAddress) + if err != nil { + return nil, err + } } else { releaseModule = &LocalReleaseModule{ clientset: clientset, @@ -101,7 +106,7 @@ func NewReleaseServer(env *environment.Environment, clientset internalclientset. clientset: clientset, ReleaseModule: releaseModule, Log: func(_ string, _ ...interface{}) {}, - } + }, nil } // reuseValues copies values from the current release to a new release if the