|
|
|
@ -9,6 +9,8 @@ import (
|
|
|
|
|
"go.etcd.io/etcd/client/v3/naming/resolver"
|
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
|
gresolver "google.golang.org/grpc/resolver"
|
|
|
|
|
"strings"
|
|
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
@ -26,6 +28,9 @@ type SvcDiscoveryRegistryImpl struct {
|
|
|
|
|
rpcRegisterTarget string
|
|
|
|
|
|
|
|
|
|
rootDirectory string
|
|
|
|
|
|
|
|
|
|
mu sync.RWMutex
|
|
|
|
|
connMap map[string][]*grpc.ClientConn
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewSvcDiscoveryRegistry creates a new service discovery registry implementation
|
|
|
|
@ -51,11 +56,15 @@ func NewSvcDiscoveryRegistry(rootDirectory string, endpoints []string, options .
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return &SvcDiscoveryRegistryImpl{
|
|
|
|
|
|
|
|
|
|
s := &SvcDiscoveryRegistryImpl{
|
|
|
|
|
client: client,
|
|
|
|
|
resolver: r,
|
|
|
|
|
rootDirectory: rootDirectory,
|
|
|
|
|
}, nil
|
|
|
|
|
connMap: make(map[string][]*grpc.ClientConn),
|
|
|
|
|
}
|
|
|
|
|
go s.watchServiceChanges()
|
|
|
|
|
return s, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// WithDialTimeout sets a custom dial timeout for the etcd client
|
|
|
|
@ -87,31 +96,11 @@ func (r *SvcDiscoveryRegistryImpl) GetUserIdHashGatewayHost(ctx context.Context,
|
|
|
|
|
|
|
|
|
|
// GetConns returns gRPC client connections for a given service name
|
|
|
|
|
func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
|
|
|
|
|
var conns []*grpc.ClientConn
|
|
|
|
|
// Construct the full key for the service
|
|
|
|
|
fullServiceKey := fmt.Sprintf("%s/%s", r.rootDirectory, serviceName)
|
|
|
|
|
|
|
|
|
|
// List all endpoints for the service
|
|
|
|
|
resp, err := r.client.Get(ctx, fullServiceKey, clientv3.WithPrefix())
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Println("GetConns get ", fullServiceKey, err.Error())
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, kv := range resp.Kvs {
|
|
|
|
|
endpoint := string(kv.Key[len(fullServiceKey)+1:]) // Extract the endpoint address
|
|
|
|
|
//target := fmt.Sprintf("etcd://%s/%s/%s", r.rootDirectory, serviceName, endpoint)
|
|
|
|
|
target := endpoint
|
|
|
|
|
conn, err := grpc.DialContext(ctx, target, append(append(r.dialOptions, opts...), grpc.WithResolvers(r.resolver))...)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Println("DialContext ", target, err.Error())
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
conns = append(conns, conn)
|
|
|
|
|
fmt.Println("GetConns detail ", *conn)
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
return conns, nil
|
|
|
|
|
r.mu.RLock()
|
|
|
|
|
defer r.mu.RUnlock()
|
|
|
|
|
return r.connMap[fullServiceKey], nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// GetConn returns a single gRPC client connection for a given service name
|
|
|
|
@ -123,8 +112,6 @@ func (r *SvcDiscoveryRegistryImpl) GetConn(ctx context.Context, serviceName stri
|
|
|
|
|
// GetSelfConnTarget returns the connection target for the current service
|
|
|
|
|
func (r *SvcDiscoveryRegistryImpl) GetSelfConnTarget() string {
|
|
|
|
|
return r.rpcRegisterTarget
|
|
|
|
|
// return fmt.Sprintf("etcd:///%s", r.serviceKey)
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// AddOption appends gRPC dial options to the existing options
|
|
|
|
@ -163,7 +150,6 @@ func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
go r.keepAliveLease(r.leaseID)
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -184,12 +170,63 @@ func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// watchServiceChanges watches for changes in the service directory
|
|
|
|
|
func (r *SvcDiscoveryRegistryImpl) watchServiceChanges() {
|
|
|
|
|
watchChan := r.client.Watch(context.Background(), r.rootDirectory, clientv3.WithPrefix())
|
|
|
|
|
for watchResp := range watchChan {
|
|
|
|
|
for _, event := range watchResp.Events {
|
|
|
|
|
prefix, _ := r.splitEndpoint(string(event.Kv.Key))
|
|
|
|
|
fmt.Printf("Change detected for prefix: %s\n", prefix)
|
|
|
|
|
r.refreshConnMap(prefix)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// refreshConnMap fetches the latest endpoints and updates the local map
|
|
|
|
|
func (r *SvcDiscoveryRegistryImpl) refreshConnMap(prefix string) {
|
|
|
|
|
r.mu.Lock()
|
|
|
|
|
defer r.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
fullPrefix := fmt.Sprintf("%s/", prefix)
|
|
|
|
|
resp, err := r.client.Get(context.Background(), fullPrefix, clientv3.WithPrefix())
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Printf("Failed to get endpoints: %v\n", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Update the connMap with new connections
|
|
|
|
|
for _, kv := range resp.Kvs {
|
|
|
|
|
_, addr := r.splitEndpoint(string(kv.Key))
|
|
|
|
|
conn, err := grpc.DialContext(context.Background(), addr, append(r.dialOptions, grpc.WithResolvers(r.resolver))...)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Printf("Failed to dial new endpoint: %v\n", err)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
r.connMap[prefix] = append(r.connMap[prefix], conn)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// splitEndpoint splits the endpoint string into prefix and address
|
|
|
|
|
func (r *SvcDiscoveryRegistryImpl) splitEndpoint(input string) (string, string) {
|
|
|
|
|
lastSlashIndex := strings.LastIndex(input, "/")
|
|
|
|
|
if lastSlashIndex != -1 {
|
|
|
|
|
part1 := input[:lastSlashIndex]
|
|
|
|
|
part2 := input[lastSlashIndex+1:]
|
|
|
|
|
return part1, part2
|
|
|
|
|
}
|
|
|
|
|
return input, ""
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// UnRegister removes the service endpoint from etcd
|
|
|
|
|
func (r *SvcDiscoveryRegistryImpl) UnRegister() error {
|
|
|
|
|
if r.endpointMgr == nil {
|
|
|
|
|
return fmt.Errorf("endpoint manager is not initialized")
|
|
|
|
|
}
|
|
|
|
|
return r.endpointMgr.DeleteEndpoint(context.TODO(), r.serviceKey)
|
|
|
|
|
err := r.endpointMgr.DeleteEndpoint(context.TODO(), r.serviceKey)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Close closes the etcd client connection
|
|
|
|
@ -197,16 +234,17 @@ func (r *SvcDiscoveryRegistryImpl) Close() {
|
|
|
|
|
if r.client != nil {
|
|
|
|
|
_ = r.client.Close()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
r.mu.Lock()
|
|
|
|
|
defer r.mu.Unlock()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Check verifies if etcd is running by checking the existence of the root node and optionally creates it with a lease
|
|
|
|
|
func Check(ctx context.Context, etcdServers []string, etcdRoot string, createIfNotExist bool, options ...ZkOption) error {
|
|
|
|
|
// Configure the etcd client with default settings
|
|
|
|
|
cfg := clientv3.Config{
|
|
|
|
|
Endpoints: etcdServers,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Apply provided options to the config
|
|
|
|
|
for _, opt := range options {
|
|
|
|
|
opt(&cfg)
|
|
|
|
|
}
|
|
|
|
@ -217,7 +255,6 @@ func Check(ctx context.Context, etcdServers []string, etcdRoot string, createIfN
|
|
|
|
|
}
|
|
|
|
|
defer client.Close()
|
|
|
|
|
|
|
|
|
|
// Determine timeout for context
|
|
|
|
|
var opCtx context.Context
|
|
|
|
|
var cancel context.CancelFunc
|
|
|
|
|
if cfg.DialTimeout != 0 {
|
|
|
|
@ -227,26 +264,22 @@ func Check(ctx context.Context, etcdServers []string, etcdRoot string, createIfN
|
|
|
|
|
}
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
|
|
// Check if the root node exists
|
|
|
|
|
resp, err := client.Get(opCtx, etcdRoot)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "failed to get the root node from etcd")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If root node does not exist and createIfNotExist is true, create the root node with a lease
|
|
|
|
|
if len(resp.Kvs) == 0 {
|
|
|
|
|
if createIfNotExist {
|
|
|
|
|
var leaseTTL int64 = 10
|
|
|
|
|
var leaseResp *clientv3.LeaseGrantResponse
|
|
|
|
|
if leaseTTL > 0 {
|
|
|
|
|
// Create a lease
|
|
|
|
|
leaseResp, err = client.Grant(opCtx, leaseTTL)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "failed to create lease in etcd")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Put the key with the lease
|
|
|
|
|
putOpts := []clientv3.OpOption{}
|
|
|
|
|
if leaseResp != nil {
|
|
|
|
|
putOpts = append(putOpts, clientv3.WithLease(leaseResp.ID))
|
|
|
|
|