diff --git a/pkg/common/discoveryregister/etcd/etcd.go b/pkg/common/discoveryregister/etcd/etcd.go index 2c4385f68..7c2bcff66 100644 --- a/pkg/common/discoveryregister/etcd/etcd.go +++ b/pkg/common/discoveryregister/etcd/etcd.go @@ -9,6 +9,8 @@ import ( "go.etcd.io/etcd/client/v3/naming/resolver" "google.golang.org/grpc" gresolver "google.golang.org/grpc/resolver" + "strings" + "sync" "time" ) @@ -26,6 +28,9 @@ type SvcDiscoveryRegistryImpl struct { rpcRegisterTarget string rootDirectory string + + mu sync.RWMutex + connMap map[string][]*grpc.ClientConn } // NewSvcDiscoveryRegistry creates a new service discovery registry implementation @@ -51,11 +56,15 @@ func NewSvcDiscoveryRegistry(rootDirectory string, endpoints []string, options . if err != nil { return nil, err } - return &SvcDiscoveryRegistryImpl{ + + s := &SvcDiscoveryRegistryImpl{ client: client, resolver: r, rootDirectory: rootDirectory, - }, nil + connMap: make(map[string][]*grpc.ClientConn), + } + go s.watchServiceChanges() + return s, nil } // WithDialTimeout sets a custom dial timeout for the etcd client @@ -87,31 +96,11 @@ func (r *SvcDiscoveryRegistryImpl) GetUserIdHashGatewayHost(ctx context.Context, // GetConns returns gRPC client connections for a given service name func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { - var conns []*grpc.ClientConn - // Construct the full key for the service fullServiceKey := fmt.Sprintf("%s/%s", r.rootDirectory, serviceName) - // List all endpoints for the service - resp, err := r.client.Get(ctx, fullServiceKey, clientv3.WithPrefix()) - if err != nil { - fmt.Println("GetConns get ", fullServiceKey, err.Error()) - return nil, err - } - - for _, kv := range resp.Kvs { - endpoint := string(kv.Key[len(fullServiceKey)+1:]) // Extract the endpoint address - //target := fmt.Sprintf("etcd://%s/%s/%s", r.rootDirectory, serviceName, endpoint) - target := endpoint - conn, err := grpc.DialContext(ctx, target, append(append(r.dialOptions, opts...), grpc.WithResolvers(r.resolver))...) - if err != nil { - fmt.Println("DialContext ", target, err.Error()) - return nil, err - } - conns = append(conns, conn) - fmt.Println("GetConns detail ", *conn) - - } - return conns, nil + r.mu.RLock() + defer r.mu.RUnlock() + return r.connMap[fullServiceKey], nil } // GetConn returns a single gRPC client connection for a given service name @@ -123,8 +112,6 @@ func (r *SvcDiscoveryRegistryImpl) GetConn(ctx context.Context, serviceName stri // GetSelfConnTarget returns the connection target for the current service func (r *SvcDiscoveryRegistryImpl) GetSelfConnTarget() string { return r.rpcRegisterTarget - // return fmt.Sprintf("etcd:///%s", r.serviceKey) - } // AddOption appends gRPC dial options to the existing options @@ -163,7 +150,6 @@ func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int, } go r.keepAliveLease(r.leaseID) - return nil } @@ -184,12 +170,63 @@ func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) { } } +// watchServiceChanges watches for changes in the service directory +func (r *SvcDiscoveryRegistryImpl) watchServiceChanges() { + watchChan := r.client.Watch(context.Background(), r.rootDirectory, clientv3.WithPrefix()) + for watchResp := range watchChan { + for _, event := range watchResp.Events { + prefix, _ := r.splitEndpoint(string(event.Kv.Key)) + fmt.Printf("Change detected for prefix: %s\n", prefix) + r.refreshConnMap(prefix) + } + } +} + +// refreshConnMap fetches the latest endpoints and updates the local map +func (r *SvcDiscoveryRegistryImpl) refreshConnMap(prefix string) { + r.mu.Lock() + defer r.mu.Unlock() + + fullPrefix := fmt.Sprintf("%s/", prefix) + resp, err := r.client.Get(context.Background(), fullPrefix, clientv3.WithPrefix()) + if err != nil { + fmt.Printf("Failed to get endpoints: %v\n", err) + return + } + + // Update the connMap with new connections + for _, kv := range resp.Kvs { + _, addr := r.splitEndpoint(string(kv.Key)) + conn, err := grpc.DialContext(context.Background(), addr, append(r.dialOptions, grpc.WithResolvers(r.resolver))...) + if err != nil { + fmt.Printf("Failed to dial new endpoint: %v\n", err) + continue + } + r.connMap[prefix] = append(r.connMap[prefix], conn) + } +} + +// splitEndpoint splits the endpoint string into prefix and address +func (r *SvcDiscoveryRegistryImpl) splitEndpoint(input string) (string, string) { + lastSlashIndex := strings.LastIndex(input, "/") + if lastSlashIndex != -1 { + part1 := input[:lastSlashIndex] + part2 := input[lastSlashIndex+1:] + return part1, part2 + } + return input, "" +} + // UnRegister removes the service endpoint from etcd func (r *SvcDiscoveryRegistryImpl) UnRegister() error { if r.endpointMgr == nil { return fmt.Errorf("endpoint manager is not initialized") } - return r.endpointMgr.DeleteEndpoint(context.TODO(), r.serviceKey) + err := r.endpointMgr.DeleteEndpoint(context.TODO(), r.serviceKey) + if err != nil { + return err + } + return nil } // Close closes the etcd client connection @@ -197,16 +234,17 @@ func (r *SvcDiscoveryRegistryImpl) Close() { if r.client != nil { _ = r.client.Close() } + + r.mu.Lock() + defer r.mu.Unlock() } // Check verifies if etcd is running by checking the existence of the root node and optionally creates it with a lease func Check(ctx context.Context, etcdServers []string, etcdRoot string, createIfNotExist bool, options ...ZkOption) error { - // Configure the etcd client with default settings cfg := clientv3.Config{ Endpoints: etcdServers, } - // Apply provided options to the config for _, opt := range options { opt(&cfg) } @@ -217,7 +255,6 @@ func Check(ctx context.Context, etcdServers []string, etcdRoot string, createIfN } defer client.Close() - // Determine timeout for context var opCtx context.Context var cancel context.CancelFunc if cfg.DialTimeout != 0 { @@ -227,26 +264,22 @@ func Check(ctx context.Context, etcdServers []string, etcdRoot string, createIfN } defer cancel() - // Check if the root node exists resp, err := client.Get(opCtx, etcdRoot) if err != nil { return errors.Wrap(err, "failed to get the root node from etcd") } - // If root node does not exist and createIfNotExist is true, create the root node with a lease if len(resp.Kvs) == 0 { if createIfNotExist { var leaseTTL int64 = 10 var leaseResp *clientv3.LeaseGrantResponse if leaseTTL > 0 { - // Create a lease leaseResp, err = client.Grant(opCtx, leaseTTL) if err != nil { return errors.Wrap(err, "failed to create lease in etcd") } } - // Put the key with the lease putOpts := []clientv3.OpOption{} if leaseResp != nil { putOpts = append(putOpts, clientv3.WithLease(leaseResp.ID))