Add etcd as a service discovery mechanism

pull/2318/head
skiffer-git 6 months ago
parent a4d6b2a3b9
commit db906fcea0

@ -1,16 +1,13 @@
enable: "etcd"
etcd:
rootDirectory: openim
address: [ localhost:12379 ]
username: ''
password: ''
zookeeper:
schema: openim
address: [ localhost:12181 ]
username: ''
password: ''

@ -56,13 +56,14 @@ type MsgTransfer struct {
}
type Config struct {
MsgTransfer config.MsgTransfer
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
Share config.Share
WebhooksConfig config.Webhooks
Discovery config.Discovery
MsgTransfer config.MsgTransfer
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
ZookeeperConfig config.ZooKeeper
Share config.Share
WebhooksConfig config.Webhooks
Discovery config.Discovery
}
func Start(ctx context.Context, index int, config *Config) error {

@ -48,9 +48,10 @@ type conversationServer struct {
}
type Config struct {
RpcConfig config.Conversation
RedisConfig config.Redis
MongodbConfig config.Mongo
RpcConfig config.Conversation
RedisConfig config.Redis
MongodbConfig config.Mongo
// ZookeeperConfig config.ZooKeeper
NotificationConfig config.Notification
Share config.Share
LocalCacheConfig config.LocalCache

@ -24,33 +24,31 @@ import (
"time"
)
const (
zookeeperConst = "zookeeper"
kubenetesConst = "k8s"
directConst = "direct"
etcdConst = "etcd"
)
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
func NewDiscoveryRegister(zookeeperConfig *config.ZooKeeper, share *config.Share) (discovery.SvcDiscoveryRegistry, error) {
switch share.Env {
case zookeeperConst:
func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share) (discovery.SvcDiscoveryRegistry, error) {
switch discovery.Enable {
case "zookeeper":
return zookeeper.NewZkClient(
zookeeperConfig.Address,
zookeeperConfig.Schema,
discovery.ZooKeeper.Address,
discovery.ZooKeeper.Schema,
zookeeper.WithFreq(time.Hour),
zookeeper.WithUserNameAndPassword(zookeeperConfig.Username, zookeeperConfig.Password),
zookeeper.WithUserNameAndPassword(discovery.ZooKeeper.Username, discovery.ZooKeeper.Password),
zookeeper.WithRoundRobin(),
zookeeper.WithTimeout(10),
)
case kubenetesConst:
case "k8s":
return kubernetes.NewK8sDiscoveryRegister(share.RpcRegisterName.MessageGateway)
case etcdConst:
return getcd.NewSvcDiscoveryRegistry("etcd", []string{"localhost:2379"})
case directConst:
case "etcd":
return getcd.NewSvcDiscoveryRegistry(
discovery.Etcd.RootDirectory,
discovery.Etcd.Address,
getcd.WithDialTimeout(10*time.Second),
getcd.WithMaxCallSendMsgSize(20*1024*1024),
getcd.WithUsernameAndPassword(discovery.Etcd.Username, discovery.Etcd.Password))
case "direct":
//return direct.NewConnDirect(config)
default:
return nil, errs.New("unsupported discovery type", "type", share.Env).Wrap()
return nil, errs.New("unsupported discovery type", "type", discovery.Enable).Wrap()
}
return nil, nil
}

@ -8,27 +8,38 @@ import (
"go.etcd.io/etcd/client/v3/naming/resolver"
"google.golang.org/grpc"
gresolver "google.golang.org/grpc/resolver"
"log"
"time"
)
// ZkOption defines a function type for modifying clientv3.Config
type ZkOption func(*clientv3.Config)
// SvcDiscoveryRegistryImpl implementation
type SvcDiscoveryRegistryImpl struct {
client *clientv3.Client
resolver gresolver.Builder
dialOptions []grpc.DialOption
serviceKey string
endpointMgr endpoints.Manager
leaseID clientv3.LeaseID
schema string
client *clientv3.Client
resolver gresolver.Builder
dialOptions []grpc.DialOption
serviceKey string
endpointMgr endpoints.Manager
leaseID clientv3.LeaseID
rootDirectory string
}
func NewSvcDiscoveryRegistry(schema string, endpoints []string) (*SvcDiscoveryRegistryImpl, error) {
// NewSvcDiscoveryRegistry creates a new service discovery registry implementation
func NewSvcDiscoveryRegistry(rootDirectory string, endpoints []string, options ...ZkOption) (*SvcDiscoveryRegistryImpl, error) {
cfg := clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
// Increase keep-alive queue capacity and message size
PermitWithoutStream: true,
MaxCallSendMsgSize: 10 * 1024 * 1024, // 10 MB
}
// Apply provided options to the config
for _, opt := range options {
opt(&cfg)
}
client, err := clientv3.New(cfg)
if err != nil {
return nil, err
@ -38,17 +49,42 @@ func NewSvcDiscoveryRegistry(schema string, endpoints []string) (*SvcDiscoveryRe
return nil, err
}
return &SvcDiscoveryRegistryImpl{
client: client,
resolver: r,
schema: schema,
client: client,
resolver: r,
rootDirectory: rootDirectory,
}, nil
}
// WithDialTimeout sets a custom dial timeout for the etcd client
func WithDialTimeout(timeout time.Duration) ZkOption {
return func(cfg *clientv3.Config) {
cfg.DialTimeout = timeout
}
}
// WithMaxCallSendMsgSize sets a custom max call send message size for the etcd client
func WithMaxCallSendMsgSize(size int) ZkOption {
return func(cfg *clientv3.Config) {
cfg.MaxCallSendMsgSize = size
}
}
// WithUsernameAndPassword sets a username and password for the etcd client
func WithUsernameAndPassword(username, password string) ZkOption {
return func(cfg *clientv3.Config) {
cfg.Username = username
cfg.Password = password
}
}
// GetUserIdHashGatewayHost returns the gateway host for a given user ID hash
func (r *SvcDiscoveryRegistryImpl) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) {
return "", nil
}
// GetConns returns gRPC client connections for a given service name
func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
target := fmt.Sprintf("%s:///%s", r.schema, serviceName)
target := fmt.Sprintf("etcd:///%s", serviceName)
conn, err := grpc.DialContext(ctx, target, append(append(r.dialOptions, opts...), grpc.WithResolvers(r.resolver))...)
if err != nil {
return nil, err
@ -56,34 +92,39 @@ func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName str
return []*grpc.ClientConn{conn}, nil
}
// GetConn returns a single gRPC client connection for a given service name
func (r *SvcDiscoveryRegistryImpl) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
target := fmt.Sprintf("%s:///%s", r.schema, serviceName)
target := fmt.Sprintf("etcd:///%s", serviceName)
return grpc.DialContext(ctx, target, append(append(r.dialOptions, opts...), grpc.WithResolvers(r.resolver))...)
}
// GetSelfConnTarget returns the connection target for the current service
func (r *SvcDiscoveryRegistryImpl) GetSelfConnTarget() string {
return fmt.Sprintf("%s:///%s", r.schema, r.serviceKey)
return fmt.Sprintf("etcd:///%s", r.serviceKey)
}
// AddOption appends gRPC dial options to the existing options
func (r *SvcDiscoveryRegistryImpl) AddOption(opts ...grpc.DialOption) {
r.dialOptions = append(r.dialOptions, opts...)
}
// CloseConn closes a given gRPC client connection
func (r *SvcDiscoveryRegistryImpl) CloseConn(conn *grpc.ClientConn) {
if err := conn.Close(); err != nil {
log.Printf("Failed to close connection: %v", err)
fmt.Printf("Failed to close connection: %v\n", err)
}
}
// Register registers a new service endpoint with etcd
func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
r.serviceKey = fmt.Sprintf("%s/%s-%d", serviceName, host, port)
em, err := endpoints.NewManager(r.client, serviceName)
r.serviceKey = fmt.Sprintf("%s/%s/%s-%d", r.rootDirectory, serviceName, host, port)
em, err := endpoints.NewManager(r.client, r.rootDirectory+"/"+serviceName)
if err != nil {
return err
}
r.endpointMgr = em
leaseResp, err := r.client.Grant(context.Background(), 30)
leaseResp, err := r.client.Grant(context.Background(), 60) // Increase TTL time
if err != nil {
return err
}
@ -100,10 +141,11 @@ func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int,
return nil
}
// keepAliveLease maintains the lease alive by sending keep-alive requests
func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) {
ch, err := r.client.KeepAlive(context.Background(), leaseID)
if err != nil {
log.Printf("Failed to keep lease alive: %v", err)
fmt.Printf("Failed to keep lease alive: %v\n", err)
return
}
@ -111,12 +153,13 @@ func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) {
if ka != nil {
fmt.Printf("Received lease keep-alive response: %v\n", ka)
} else {
fmt.Printf("Lease keep-alive response channel closed")
break
fmt.Printf("Lease keep-alive response channel closed\n")
return
}
}
}
// UnRegister removes the service endpoint from etcd
func (r *SvcDiscoveryRegistryImpl) UnRegister() error {
if r.endpointMgr == nil {
return fmt.Errorf("endpoint manager is not initialized")
@ -124,6 +167,7 @@ func (r *SvcDiscoveryRegistryImpl) UnRegister() error {
return r.endpointMgr.DeleteEndpoint(context.TODO(), r.serviceKey)
}
// Close closes the etcd client connection
func (r *SvcDiscoveryRegistryImpl) Close() {
if r.client != nil {
_ = r.client.Close()

Loading…
Cancel
Save