diff --git a/internal/push/onlinepusher.go b/internal/push/onlinepusher.go index ddc500806..9c9189d37 100644 --- a/internal/push/onlinepusher.go +++ b/internal/push/onlinepusher.go @@ -2,7 +2,6 @@ package push import ( "context" - "fmt" "github.com/openimsdk/protocol/msggateway" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/discovery" @@ -82,8 +81,6 @@ func (d *DefaultAllNode) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.M // Online push message for _, conn := range conns { - fmt.Println(ctx, "get gateway conn detail ", "conn ", *conn) - conn := conn // loop var safe wg.Go(func() error { msgClient := msggateway.NewMsgGatewayClient(conn) diff --git a/pkg/common/discoveryregister/etcd/etcd.go b/pkg/common/discoveryregister/etcd/etcd.go index 247161f9b..dd8806378 100644 --- a/pkg/common/discoveryregister/etcd/etcd.go +++ b/pkg/common/discoveryregister/etcd/etcd.go @@ -97,10 +97,8 @@ func (r *SvcDiscoveryRegistryImpl) GetUserIdHashGatewayHost(ctx context.Context, // GetConns returns gRPC client connections for a given service name func (r *SvcDiscoveryRegistryImpl) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { fullServiceKey := fmt.Sprintf("%s/%s", r.rootDirectory, serviceName) - r.mu.RLock() defer r.mu.RUnlock() - fmt.Printf("all conns ", serviceName, r.connMap[fullServiceKey]) return r.connMap[fullServiceKey], nil } @@ -122,9 +120,7 @@ func (r *SvcDiscoveryRegistryImpl) AddOption(opts ...grpc.DialOption) { // CloseConn closes a given gRPC client connection func (r *SvcDiscoveryRegistryImpl) CloseConn(conn *grpc.ClientConn) { - if err := conn.Close(); err != nil { - fmt.Printf("Failed to close connection: %v\n", err) - } + conn.Close() } // Register registers a new service endpoint with etcd @@ -136,7 +132,7 @@ func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int, } r.endpointMgr = em - leaseResp, err := r.client.Grant(context.Background(), 60) // Increase TTL time + leaseResp, err := r.client.Grant(context.Background(), 30) // if err != nil { return err } @@ -158,14 +154,11 @@ func (r *SvcDiscoveryRegistryImpl) Register(serviceName, host string, port int, func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) { ch, err := r.client.KeepAlive(context.Background(), leaseID) if err != nil { - fmt.Printf("Failed to keep lease alive: %v\n", err) return } - for ka := range ch { if ka != nil { } else { - fmt.Printf("Lease keep-alive response channel closed\n") return } } @@ -175,10 +168,14 @@ func (r *SvcDiscoveryRegistryImpl) keepAliveLease(leaseID clientv3.LeaseID) { func (r *SvcDiscoveryRegistryImpl) watchServiceChanges() { watchChan := r.client.Watch(context.Background(), r.rootDirectory, clientv3.WithPrefix()) for watchResp := range watchChan { + updatedPrefixes := make(map[string]struct{}) // Create a set to track updated prefixes + for _, event := range watchResp.Events { prefix, _ := r.splitEndpoint(string(event.Kv.Key)) - fmt.Printf("Change detected for prefix: %s\n", prefix) - r.refreshConnMap(prefix) + if _, alreadyUpdated := updatedPrefixes[prefix]; !alreadyUpdated { + updatedPrefixes[prefix] = struct{}{} // Mark this prefix as updated + r.refreshConnMap(prefix) + } } } } @@ -191,7 +188,6 @@ func (r *SvcDiscoveryRegistryImpl) refreshConnMap(prefix string) { fullPrefix := fmt.Sprintf("%s/", prefix) resp, err := r.client.Get(context.Background(), fullPrefix, clientv3.WithPrefix()) if err != nil { - fmt.Printf("Failed to get endpoints: %v\n", err) return } r.connMap[prefix] = []*grpc.ClientConn{} // Update the connMap with new connections @@ -199,7 +195,6 @@ func (r *SvcDiscoveryRegistryImpl) refreshConnMap(prefix string) { _, addr := r.splitEndpoint(string(kv.Key)) conn, err := grpc.DialContext(context.Background(), addr, append(r.dialOptions, grpc.WithResolvers(r.resolver))...) if err != nil { - fmt.Printf("Failed to dial new endpoint: %v\n", err) continue } r.connMap[prefix] = append(r.connMap[prefix], conn) @@ -244,11 +239,9 @@ func Check(ctx context.Context, etcdServers []string, etcdRoot string, createIfN cfg := clientv3.Config{ Endpoints: etcdServers, } - for _, opt := range options { opt(&cfg) } - client, err := clientv3.New(cfg) if err != nil { return errors.Wrap(err, "failed to connect to etcd") @@ -279,7 +272,6 @@ func Check(ctx context.Context, etcdServers []string, etcdRoot string, createIfN return errors.Wrap(err, "failed to create lease in etcd") } } - putOpts := []clientv3.OpOption{} if leaseResp != nil { putOpts = append(putOpts, clientv3.WithLease(leaseResp.ID)) @@ -289,13 +281,9 @@ func Check(ctx context.Context, etcdServers []string, etcdRoot string, createIfN if err != nil { return errors.Wrap(err, "failed to create the root node in etcd") } - fmt.Printf("Root node %s did not exist, but has been created.\n", etcdRoot) } else { return fmt.Errorf("root node %s does not exist in etcd", etcdRoot) } - } else { - fmt.Printf("Etcd is running and the root node %s exists.\n", etcdRoot) } - return nil }