pull/2835/merge
Dmitry Shulyak 8 years ago committed by GitHub
commit b746413038

@ -26,6 +26,7 @@ import (
"google.golang.org/grpc/grpclog"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"github.com/spf13/pflag"
"k8s.io/helm/pkg/kube"
rudderAPI "k8s.io/helm/pkg/proto/hapi/rudder"
"k8s.io/helm/pkg/rudder"
@ -36,24 +37,43 @@ import (
var kubeClient *kube.Client
var clientset internalclientset.Interface
type options struct {
listen string
}
func (opts *options) registerFlags() {
pflag.StringVarP(&opts.listen, "listen", "l", rudder.RudderDefaultAddress,
fmt.Sprintf("Socket for rudder grpc server (default: %s).", rudder.RudderDefaultAddress))
}
func (opts *options) parseFlags() {
pflag.Parse()
}
func (opts *options) regAndParseFlags() {
opts.registerFlags()
opts.parseFlags()
}
func main() {
opts := new(options)
opts.regAndParseFlags()
var err error
kubeClient = kube.New(nil)
clientset, err = kubeClient.ClientSet()
if err != nil {
grpclog.Fatalf("Cannot initialize Kubernetes connection: %s", err)
}
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", rudder.GrpcPort))
grpclog.Printf("Creating tcp socket on %s\n", opts.listen)
lis, err := net.Listen("tcp", opts.listen)
if err != nil {
grpclog.Fatalf("failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
rudderAPI.RegisterReleaseModuleServiceServer(grpcServer, &ReleaseModuleServiceServer{})
grpclog.Print("Server starting")
grpclog.Printf("Starting server on %s\n", opts.listen)
grpcServer.Serve(lis)
grpclog.Print("Server started")
}
// ReleaseModuleServiceServer provides implementation for rudderAPI.ReleaseModuleServiceServer

@ -35,6 +35,7 @@ import (
"k8s.io/helm/pkg/kube"
"k8s.io/helm/pkg/proto/hapi/services"
"k8s.io/helm/pkg/rudder"
"k8s.io/helm/pkg/storage"
"k8s.io/helm/pkg/storage/driver"
"k8s.io/helm/pkg/tiller"
@ -71,12 +72,15 @@ var (
enableTracing = flag.Bool("trace", false, "enable rpc tracing")
store = flag.String("storage", storageConfigMap, "storage driver to use. One of 'configmap', 'memory', or 'secret'")
remoteReleaseModules = flag.Bool("experimental-release", false, "enable experimental release modules")
tlsEnable = flag.Bool("tls", tlsEnableEnvVarDefault(), "enable TLS")
tlsVerify = flag.Bool("tls-verify", tlsVerifyEnvVarDefault(), "enable TLS and verify remote certificate")
keyFile = flag.String("tls-key", tlsDefaultsFromEnv("tls-key"), "path to TLS private key file")
certFile = flag.String("tls-cert", tlsDefaultsFromEnv("tls-cert"), "path to TLS certificate file")
caCertFile = flag.String("tls-ca-cert", tlsDefaultsFromEnv("tls-ca-cert"), "trust certificates signed by this CA")
maxHistory = flag.Int("history-max", historyMaxFromEnv(), "maximum number of releases kept in release history, with 0 meaning no limit")
rudderAddress = flag.String("rudder-address", rudder.RudderDefaultAddress,
fmt.Sprintf("rudder address:port (default: %s). ignored if experimental-release is omitted",
rudder.RudderDefaultAddress))
tlsEnable = flag.Bool("tls", tlsEnableEnvVarDefault(), "enable TLS")
tlsVerify = flag.Bool("tls-verify", tlsVerifyEnvVarDefault(), "enable TLS and verify remote certificate")
keyFile = flag.String("tls-key", tlsDefaultsFromEnv("tls-key"), "path to TLS private key file")
certFile = flag.String("tls-cert", tlsDefaultsFromEnv("tls-cert"), "path to TLS certificate file")
caCertFile = flag.String("tls-ca-cert", tlsDefaultsFromEnv("tls-ca-cert"), "trust certificates signed by this CA")
maxHistory = flag.Int("history-max", historyMaxFromEnv(), "maximum number of releases kept in release history, with 0 meaning no limit")
// rootServer is the root gRPC server.
//
@ -103,6 +107,12 @@ func main() {
}
func start() {
if *rudderAddress != rudder.RudderDefaultAddress && !*remoteReleaseModules {
log.Fatal(
"Rudder address will be ignored without --experimental-release flag.",
"Add --experimental-release or remove --rudder-address.",
)
}
clientset, err := kube.New(nil).ClientSet()
if err != nil {
@ -170,7 +180,15 @@ func start() {
srvErrCh := make(chan error)
probeErrCh := make(chan error)
go func() {
svc := tiller.NewReleaseServer(env, clientset, *remoteReleaseModules)
var addressToUse string
if *remoteReleaseModules {
addressToUse = *rudderAddress
}
svc, err := tiller.NewReleaseServer(env, clientset, addressToUse)
if err != nil {
srvErrCh <- err
return
}
svc.Log = newLogger("tiller").Printf
services.RegisterReleaseServiceServer(rootServer, svc)
if err := rootServer.Serve(lstn); err != nil {

@ -18,6 +18,7 @@ package rudder // import "k8s.io/helm/pkg/rudder"
import (
"fmt"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
@ -25,67 +26,51 @@ import (
rudderAPI "k8s.io/helm/pkg/proto/hapi/rudder"
)
// GrpcPort specifies port on which rudder will spawn a server
// RudderDefaultAddress will be used if none is provided with command lines argumnets.
const (
GrpcPort = 10001
RudderDefaultAddress = "127.0.0.1:10001"
)
var grpcAddr = fmt.Sprintf("127.0.0.1:%d", GrpcPort)
// InstallRelease calls Rudder InstallRelease method which should create provided release
func InstallRelease(rel *rudderAPI.InstallReleaseRequest) (*rudderAPI.InstallReleaseResponse, error) {
// NewClient creates new instance of Client.
func NewClient(address string) (*Client, error) {
//TODO(mkwiek): parametrize this
conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure())
conn, err := grpc.Dial(
address,
grpc.WithInsecure(),
grpc.WithBlock(),
grpc.WithTimeout(3*time.Second))
if err != nil {
return nil, err
return nil, fmt.Errorf("error establishing connection with rudder using address %s: %v", address, err)
}
return &Client{client: rudderAPI.NewReleaseModuleServiceClient(conn)}, nil
}
// Client wraps rudder grpc client.
type Client struct {
client rudderAPI.ReleaseModuleServiceClient
}
defer conn.Close()
client := rudderAPI.NewReleaseModuleServiceClient(conn)
return client.InstallRelease(context.Background(), rel)
// InstallRelease calls Rudder InstallRelease method which should create provided release
func (c *Client) InstallRelease(rel *rudderAPI.InstallReleaseRequest) (*rudderAPI.InstallReleaseResponse, error) {
return c.client.InstallRelease(context.Background(), rel)
}
// UpgradeRelease calls Rudder UpgradeRelease method which should perform update
func UpgradeRelease(req *rudderAPI.UpgradeReleaseRequest) (*rudderAPI.UpgradeReleaseResponse, error) {
conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure())
if err != nil {
return nil, err
}
defer conn.Close()
client := rudderAPI.NewReleaseModuleServiceClient(conn)
return client.UpgradeRelease(context.Background(), req)
func (c *Client) UpgradeRelease(req *rudderAPI.UpgradeReleaseRequest) (*rudderAPI.UpgradeReleaseResponse, error) {
return c.client.UpgradeRelease(context.Background(), req)
}
// RollbackRelease calls Rudder RollbackRelease method which should perform update
func RollbackRelease(req *rudderAPI.RollbackReleaseRequest) (*rudderAPI.RollbackReleaseResponse, error) {
conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure())
if err != nil {
return nil, err
}
defer conn.Close()
client := rudderAPI.NewReleaseModuleServiceClient(conn)
return client.RollbackRelease(context.Background(), req)
func (c *Client) RollbackRelease(req *rudderAPI.RollbackReleaseRequest) (*rudderAPI.RollbackReleaseResponse, error) {
return c.client.RollbackRelease(context.Background(), req)
}
// ReleaseStatus calls Rudder ReleaseStatus method which should perform update
func ReleaseStatus(req *rudderAPI.ReleaseStatusRequest) (*rudderAPI.ReleaseStatusResponse, error) {
conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure())
if err != nil {
return nil, err
}
defer conn.Close()
client := rudderAPI.NewReleaseModuleServiceClient(conn)
return client.ReleaseStatus(context.Background(), req)
func (c *Client) ReleaseStatus(req *rudderAPI.ReleaseStatusRequest) (*rudderAPI.ReleaseStatusResponse, error) {
return c.client.ReleaseStatus(context.Background(), req)
}
// DeleteRelease calls Rudder DeleteRelease method which should uninstall provided release
func DeleteRelease(rel *rudderAPI.DeleteReleaseRequest) (*rudderAPI.DeleteReleaseResponse, error) {
conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure())
if err != nil {
return nil, err
}
defer conn.Close()
client := rudderAPI.NewReleaseModuleServiceClient(conn)
return client.DeleteRelease(context.Background(), rel)
func (c *Client) DeleteRelease(rel *rudderAPI.DeleteReleaseRequest) (*rudderAPI.DeleteReleaseResponse, error) {
return c.client.DeleteRelease(context.Background(), rel)
}

@ -84,12 +84,23 @@ func (m *LocalReleaseModule) Delete(rel *release.Release, req *services.Uninstal
}
// RemoteReleaseModule is a ReleaseModule which calls Rudder service to operate on a release
type RemoteReleaseModule struct{}
type RemoteReleaseModule struct {
c *rudder.Client
}
// NewRemoteReleaseModule creates new instance of RemoteReleaseModule and returns it as ReleaseModule interface.
func NewRemoteReleaseModule(address string) (ReleaseModule, error) {
client, err := rudder.NewClient(address)
if err != nil {
return nil, err
}
return &RemoteReleaseModule{c: client}, nil
}
// Create calls rudder.InstallRelease
func (m *RemoteReleaseModule) Create(r *release.Release, req *services.InstallReleaseRequest, env *environment.Environment) error {
request := &rudderAPI.InstallReleaseRequest{Release: r}
_, err := rudder.InstallRelease(request)
_, err := m.c.InstallRelease(request)
return err
}
@ -103,7 +114,7 @@ func (m *RemoteReleaseModule) Update(current, target *release.Release, req *serv
Wait: req.Wait,
Force: req.Force,
}
_, err := rudder.UpgradeRelease(upgrade)
_, err := m.c.UpgradeRelease(upgrade)
return err
}
@ -116,14 +127,14 @@ func (m *RemoteReleaseModule) Rollback(current, target *release.Release, req *se
Timeout: req.Timeout,
Wait: req.Wait,
}
_, err := rudder.RollbackRelease(rollback)
_, err := m.c.RollbackRelease(rollback)
return err
}
// Status returns status retrieved from rudder.ReleaseStatus
func (m *RemoteReleaseModule) Status(r *release.Release, req *services.GetReleaseStatusRequest, env *environment.Environment) (string, error) {
statusRequest := &rudderAPI.ReleaseStatusRequest{Release: r}
resp, err := rudder.ReleaseStatus(statusRequest)
resp, err := m.c.ReleaseStatus(statusRequest)
if resp == nil {
return "", err
}
@ -133,7 +144,7 @@ func (m *RemoteReleaseModule) Status(r *release.Release, req *services.GetReleas
// Delete calls rudder.DeleteRelease
func (m *RemoteReleaseModule) Delete(r *release.Release, req *services.UninstallReleaseRequest, env *environment.Environment) (string, []error) {
deleteRequest := &rudderAPI.DeleteReleaseRequest{Release: r}
resp, err := rudder.DeleteRelease(deleteRequest)
resp, err := m.c.DeleteRelease(deleteRequest)
errs := make([]error, 0)
result := ""

@ -87,10 +87,15 @@ type ReleaseServer struct {
}
// NewReleaseServer creates a new release server.
func NewReleaseServer(env *environment.Environment, clientset internalclientset.Interface, useRemote bool) *ReleaseServer {
func NewReleaseServer(env *environment.Environment, clientset internalclientset.Interface, rudderAddress string) (
*ReleaseServer, error) {
var releaseModule ReleaseModule
if useRemote {
releaseModule = &RemoteReleaseModule{}
var err error
if len(rudderAddress) != 0 {
releaseModule, err = NewRemoteReleaseModule(rudderAddress)
if err != nil {
return nil, err
}
} else {
releaseModule = &LocalReleaseModule{
clientset: clientset,
@ -102,7 +107,7 @@ func NewReleaseServer(env *environment.Environment, clientset internalclientset.
clientset: clientset,
ReleaseModule: releaseModule,
Log: func(_ string, _ ...interface{}) {},
}
}, nil
}
// reuseValues copies values from the current release to a new release if the

Loading…
Cancel
Save