diff --git a/go.mod b/go.mod index 2ec3e20b2..640b4a824 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.65 - github.com/openimsdk/tools v0.0.49-alpha.9 + github.com/openimsdk/tools v0.0.49-alpha.10 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 @@ -40,7 +40,6 @@ require ( github.com/shirou/gopsutil v3.21.11+incompatible github.com/spf13/viper v1.18.2 github.com/stathat/consistent v1.0.0 - go.etcd.io/etcd/client/v3 v3.5.13 go.uber.org/automaxprocs v1.5.3 golang.org/x/sync v0.6.0 ) @@ -142,6 +141,7 @@ require ( github.com/yusufpapurcu/wmi v1.2.4 // indirect go.etcd.io/etcd/api/v3 v3.5.13 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect + go.etcd.io/etcd/client/v3 v3.5.13 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect diff --git a/go.sum b/go.sum index edecead94..241bb96aa 100644 --- a/go.sum +++ b/go.sum @@ -288,8 +288,8 @@ github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJ github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.65 h1:SPT9qyUsFRTTKSKb/FjpS+xr6sxz/Kbnu+su1bxYagc= github.com/openimsdk/protocol v0.0.65/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.49-alpha.9 h1:yoa3GS6t0d1mRv/S86niFBGDgSjy2EWWwBI5NAH1Kgk= -github.com/openimsdk/tools v0.0.49-alpha.9/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM= +github.com/openimsdk/tools v0.0.49-alpha.10 h1:vcfMUGBClD3TsjsTd/Wb0R2WcpjCo8hupMqreb0dXFA= +github.com/openimsdk/tools v0.0.49-alpha.10/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index 1085ec1ea..f37fc9c10 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -16,9 +16,9 @@ package discoveryregister import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" - getcd "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/etcd" "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes" "github.com/openimsdk/tools/discovery" + getcd "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/discovery/zookeeper" "github.com/openimsdk/tools/errs" "time" diff --git a/pkg/common/discoveryregister/etcd/etcd.go b/pkg/common/discoveryregister/etcd/etcd.go deleted file mode 100644 index dd8806378..000000000 --- a/pkg/common/discoveryregister/etcd/etcd.go +++ /dev/null @@ -1,289 +0,0 @@ -package etcd - -import ( - "context" - "fmt" - "github.com/pkg/errors" - clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/client/v3/naming/endpoints" - "go.etcd.io/etcd/client/v3/naming/resolver" - "google.golang.org/grpc" - gresolver "google.golang.org/grpc/resolver" - "strings" - "sync" - "time" -) - -// ZkOption defines a function type for modifying clientv3.Config -type ZkOption func(*clientv3.Config) - -// SvcDiscoveryRegistryImpl implementation -type SvcDiscoveryRegistryImpl struct { - client *clientv3.Client - resolver gresolver.Builder - dialOptions []grpc.DialOption - serviceKey string - endpointMgr endpoints.Manager - leaseID clientv3.LeaseID - rpcRegisterTarget string - - rootDirectory string - - mu sync.RWMutex - connMap map[string][]*grpc.ClientConn -} - -// NewSvcDiscoveryRegistry creates a new service discovery registry implementation -func NewSvcDiscoveryRegistry(rootDirectory string, endpoints []string, options ...ZkOption) (*SvcDiscoveryRegistryImpl, error) { - cfg := clientv3.Config{ - Endpoints: endpoints, - DialTimeout: 5 * time.Second, - // Increase keep-alive queue capacity and message size - PermitWithoutStream: true, - MaxCallSendMsgSize: 10 * 1024 * 1024, // 10 MB - } - - // Apply provided options to the config - for _, opt := range options { - opt(&cfg) - } - - client, err := clientv3.New(cfg) - if err != nil { - return nil, err - } - r, err := resolver.NewBuilder(client) - if err != nil { - return nil, err - } - - s := &SvcDiscoveryRegistryImpl{ - client: client, - resolver: r, - rootDirectory: rootDirectory, - connMap: make(map[string][]*grpc.ClientConn), - } - go s.watchServiceChanges() - return s, nil -} - -// WithDialTimeout sets a custom dial timeout for the etcd client -func WithDialTimeout(timeout time.Duration) ZkOption { - return func(cfg *clientv3.Config) { - cfg.DialTimeout = timeout - } -} - -// WithMaxCallSendMsgSize sets a custom max call send message size for the etcd client -func WithMaxCallSendMsgSize(size int) ZkOption { - return func(cfg *clientv3.Config) { - cfg.MaxCallSendMsgSize = size - } -} - -// WithUsernameAndPassword sets a username and password for the etcd client -func WithUsernameAndPassword(username, password string) ZkOption { - return func(cfg *clientv3.Config) { - cfg.Username = username - cfg.Password = password - } -} - -// GetUserIdHashGatewayHost returns the gateway host for a given user ID hash -func (r *SvcDiscoveryRegistryImpl) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) { - return "", nil -} - -// GetConns returns gRPC client connections for a given service name -func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { - fullServiceKey := fmt.Sprintf("%s/%s", r.rootDirectory, serviceName) - r.mu.RLock() - defer r.mu.RUnlock() - return r.connMap[fullServiceKey], nil -} - -// GetConn returns a single gRPC client connection for a given service name -func (r *SvcDiscoveryRegistryImpl) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { - target := fmt.Sprintf("etcd:///%s/%s", r.rootDirectory, serviceName) - return grpc.DialContext(ctx, target, append(append(r.dialOptions, opts...), grpc.WithResolvers(r.resolver))...) -} - -// GetSelfConnTarget returns the connection target for the current service -func (r *SvcDiscoveryRegistryImpl) GetSelfConnTarget() string { - return r.rpcRegisterTarget -} - -// AddOption appends gRPC dial options to the existing options -func (r *SvcDiscoveryRegistryImpl) AddOption(opts ...grpc.DialOption) { - r.dialOptions = append(r.dialOptions, opts...) -} - -// CloseConn closes a given gRPC client connection -func (r *SvcDiscoveryRegistryImpl) CloseConn(conn *grpc.ClientConn) { - conn.Close() -} - -// Register registers a new service endpoint with etcd -func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { - r.serviceKey = fmt.Sprintf("%s/%s/%s:%d", r.rootDirectory, serviceName, host, port) - em, err := endpoints.NewManager(r.client, r.rootDirectory+"/"+serviceName) - if err != nil { - return err - } - r.endpointMgr = em - - leaseResp, err := r.client.Grant(context.Background(), 30) // - if err != nil { - return err - } - r.leaseID = leaseResp.ID - - r.rpcRegisterTarget = fmt.Sprintf("%s:%d", host, port) - endpoint := endpoints.Endpoint{Addr: r.rpcRegisterTarget} - - err = em.AddEndpoint(context.TODO(), r.serviceKey, endpoint, clientv3.WithLease(leaseResp.ID)) - if err != nil { - return err - } - - go r.keepAliveLease(r.leaseID) - return nil -} - -// keepAliveLease maintains the lease alive by sending keep-alive requests -func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) { - ch, err := r.client.KeepAlive(context.Background(), leaseID) - if err != nil { - return - } - for ka := range ch { - if ka != nil { - } else { - return - } - } -} - -// watchServiceChanges watches for changes in the service directory -func (r *SvcDiscoveryRegistryImpl) watchServiceChanges() { - watchChan := r.client.Watch(context.Background(), r.rootDirectory, clientv3.WithPrefix()) - for watchResp := range watchChan { - updatedPrefixes := make(map[string]struct{}) // Create a set to track updated prefixes - - for _, event := range watchResp.Events { - prefix, _ := r.splitEndpoint(string(event.Kv.Key)) - if _, alreadyUpdated := updatedPrefixes[prefix]; !alreadyUpdated { - updatedPrefixes[prefix] = struct{}{} // Mark this prefix as updated - r.refreshConnMap(prefix) - } - } - } -} - -// refreshConnMap fetches the latest endpoints and updates the local map -func (r *SvcDiscoveryRegistryImpl) refreshConnMap(prefix string) { - r.mu.Lock() - defer r.mu.Unlock() - - fullPrefix := fmt.Sprintf("%s/", prefix) - resp, err := r.client.Get(context.Background(), fullPrefix, clientv3.WithPrefix()) - if err != nil { - return - } - r.connMap[prefix] = []*grpc.ClientConn{} // Update the connMap with new connections - for _, kv := range resp.Kvs { - _, addr := r.splitEndpoint(string(kv.Key)) - conn, err := grpc.DialContext(context.Background(), addr, append(r.dialOptions, grpc.WithResolvers(r.resolver))...) - if err != nil { - continue - } - r.connMap[prefix] = append(r.connMap[prefix], conn) - } -} - -// splitEndpoint splits the endpoint string into prefix and address -func (r *SvcDiscoveryRegistryImpl) splitEndpoint(input string) (string, string) { - lastSlashIndex := strings.LastIndex(input, "/") - if lastSlashIndex != -1 { - part1 := input[:lastSlashIndex] - part2 := input[lastSlashIndex+1:] - return part1, part2 - } - return input, "" -} - -// UnRegister removes the service endpoint from etcd -func (r *SvcDiscoveryRegistryImpl) UnRegister() error { - if r.endpointMgr == nil { - return fmt.Errorf("endpoint manager is not initialized") - } - err := r.endpointMgr.DeleteEndpoint(context.TODO(), r.serviceKey) - if err != nil { - return err - } - return nil -} - -// Close closes the etcd client connection -func (r *SvcDiscoveryRegistryImpl) Close() { - if r.client != nil { - _ = r.client.Close() - } - - r.mu.Lock() - defer r.mu.Unlock() -} - -// Check verifies if etcd is running by checking the existence of the root node and optionally creates it with a lease -func Check(ctx context.Context, etcdServers []string, etcdRoot string, createIfNotExist bool, options ...ZkOption) error { - cfg := clientv3.Config{ - Endpoints: etcdServers, - } - for _, opt := range options { - opt(&cfg) - } - client, err := clientv3.New(cfg) - if err != nil { - return errors.Wrap(err, "failed to connect to etcd") - } - defer client.Close() - - var opCtx context.Context - var cancel context.CancelFunc - if cfg.DialTimeout != 0 { - opCtx, cancel = context.WithTimeout(ctx, cfg.DialTimeout) - } else { - opCtx, cancel = context.WithTimeout(ctx, 10*time.Second) - } - defer cancel() - - resp, err := client.Get(opCtx, etcdRoot) - if err != nil { - return errors.Wrap(err, "failed to get the root node from etcd") - } - - if len(resp.Kvs) == 0 { - if createIfNotExist { - var leaseTTL int64 = 10 - var leaseResp *clientv3.LeaseGrantResponse - if leaseTTL > 0 { - leaseResp, err = client.Grant(opCtx, leaseTTL) - if err != nil { - return errors.Wrap(err, "failed to create lease in etcd") - } - } - putOpts := []clientv3.OpOption{} - if leaseResp != nil { - putOpts = append(putOpts, clientv3.WithLease(leaseResp.ID)) - } - - _, err := client.Put(opCtx, etcdRoot, "", putOpts...) - if err != nil { - return errors.Wrap(err, "failed to create the root node in etcd") - } - } else { - return fmt.Errorf("root node %s does not exist in etcd", etcdRoot) - } - } - return nil -}