diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 4383c336a..7eead73ee 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -247,7 +247,7 @@ func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, for _, v := range conns { msgClient := msggateway.NewMsgGatewayClient(v) reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs}) - v.Close() + p.discov.CloseConn(v) if err != nil { continue } diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index c4e4877bf..d0a6176f4 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -109,7 +109,7 @@ func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID client := msggateway.NewMsgGatewayClient(v) kickReq := &msggateway.KickUserOfflineReq{KickUserIDList: []string{userID}, PlatformID: platformID} _, err := client.KickUserOffline(ctx, kickReq) - v.Close() + s.RegisterCenter.CloseConn(v) return utils.Wrap(err, "") } return errs.ErrInternalServer.Wrap() diff --git a/pkg/discoveryregistry/discovery_register.go b/pkg/discoveryregistry/discovery_register.go index e9e706ab4..4214ed49b 100644 --- a/pkg/discoveryregistry/discovery_register.go +++ b/pkg/discoveryregistry/discovery_register.go @@ -8,10 +8,10 @@ import ( ) type Conn interface { - GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) - GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) + GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error) + GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) AddOption(opts ...grpc.DialOption) - CloseConn(conn *grpc.ClientConn) + CloseConn(conn grpc.ClientConnInterface) // do not use this method for call rpc GetClientLocalConns() map[string][]resolver.Address } diff --git a/pkg/discoveryregistry/zookeeper/debug.go b/pkg/discoveryregistry/zookeeper/debug.go new file mode 100644 index 000000000..acfae7cc5 --- /dev/null +++ b/pkg/discoveryregistry/zookeeper/debug.go @@ -0,0 +1,47 @@ +package zookeeper + +import ( + "context" + "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" + "google.golang.org/grpc" + "strings" +) + +func newClientConnInterface(cc grpc.ClientConnInterface) grpc.ClientConnInterface { + return &clientConnInterface{cc: cc} +} + +type clientConnInterface struct { + cc grpc.ClientConnInterface +} + +func (c *clientConnInterface) callOptionToString(opts []grpc.CallOption) string { + arr := make([]string, 0, len(opts)+1) + arr = append(arr, fmt.Sprintf("opts len: %d", len(opts))) + for i, opt := range opts { + arr = append(arr, fmt.Sprintf("[%d:%T]", i, opt)) + } + return strings.Join(arr, ", ") +} + +func (c *clientConnInterface) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error { + log.ZDebug(ctx, "grpc.ClientConnInterface.Invoke in", "method", method, "args", args, "reply", reply, "opts", c.callOptionToString(opts)) + if err := c.cc.Invoke(ctx, method, args, reply, opts...); err != nil { + log.ZError(ctx, "grpc.ClientConnInterface.Invoke error", err, "method", method, "args", args, "reply", reply) + return err + } + log.ZDebug(ctx, "grpc.ClientConnInterface.Invoke success", "method", method, "args", args, "reply", reply) + return nil +} + +func (c *clientConnInterface) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { + log.ZDebug(ctx, "grpc.ClientConnInterface.NewStream in", "desc", desc, "method", method, "opts", c.callOptionToString(opts)) + cs, err := c.cc.NewStream(ctx, desc, method, opts...) + if err != nil { + log.ZError(ctx, "grpc.ClientConnInterface.NewStream error", err, "desc", desc, "method", method, "opts", len(opts)) + return nil, err + } + log.ZDebug(ctx, "grpc.ClientConnInterface.NewStream success", "desc", desc, "method", method, "opts", len(opts)) + return cs, nil +} diff --git a/pkg/discoveryregistry/zookeeper/discover.go b/pkg/discoveryregistry/zookeeper/discover.go index 610c125e8..6c47c1674 100644 --- a/pkg/discoveryregistry/zookeeper/discover.go +++ b/pkg/discoveryregistry/zookeeper/discover.go @@ -3,6 +3,7 @@ package zookeeper import ( "context" "fmt" + "io" "strings" "sync" @@ -71,7 +72,7 @@ func (s *ZkClient) GetConnsRemote(serviceName string) (conns []resolver.Address, return conns, nil } -func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { +func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error) { s.logger.Printf("get conns from client, serviceName: %s", serviceName) s.lock.Lock() defer s.lock.Unlock() @@ -86,24 +87,30 @@ func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grp } s.localConns[serviceName] = conns } - var ret []*grpc.ClientConn + var ret []grpc.ClientConnInterface s.logger.Printf("get conns from zk success, serviceName: %s", serviceName) for _, conn := range conns { - c, err := grpc.DialContext(ctx, conn.Addr, append(s.options, opts...)...) + cc, err := grpc.DialContext(ctx, conn.Addr, append(s.options, opts...)...) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("conns dialContext error, conn: %s", conn.Addr)) } - ret = append(ret, c) + ret = append(ret, newClientConnInterface(cc)) } s.logger.Printf("dial ctx success, serviceName: %s", serviceName) return ret, nil } -func (s *ZkClient) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { +func (s *ZkClient) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) { newOpts := append(s.options, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, s.balancerName))) - return grpc.DialContext(ctx, fmt.Sprintf("%s:///%s", s.scheme, serviceName), append(newOpts, opts...)...) + cc, err := grpc.DialContext(ctx, fmt.Sprintf("%s:///%s", s.scheme, serviceName), append(newOpts, opts...)...) + if err != nil { + return nil, err + } + return newClientConnInterface(cc), nil } -func (s *ZkClient) CloseConn(conn *grpc.ClientConn) { - conn.Close() +func (s *ZkClient) CloseConn(conn grpc.ClientConnInterface) { + if closer, ok := conn.(io.Closer); ok { + closer.Close() + } } diff --git a/pkg/rpcclient/auth.go b/pkg/rpcclient/auth.go index fe6d8da4b..580e6bcee 100644 --- a/pkg/rpcclient/auth.go +++ b/pkg/rpcclient/auth.go @@ -19,7 +19,7 @@ func NewAuth(discov discoveryregistry.SvcDiscoveryRegistry) *Auth { } type Auth struct { - conn *grpc.ClientConn + conn grpc.ClientConnInterface Client auth.AuthClient discov discoveryregistry.SvcDiscoveryRegistry } diff --git a/pkg/rpcclient/conversation.go b/pkg/rpcclient/conversation.go index e9a7b1ea4..55ea5f829 100644 --- a/pkg/rpcclient/conversation.go +++ b/pkg/rpcclient/conversation.go @@ -14,7 +14,7 @@ import ( type Conversation struct { Client conversation.ConversationClient - conn *grpc.ClientConn + conn grpc.ClientConnInterface discov discoveryregistry.SvcDiscoveryRegistry } diff --git a/pkg/rpcclient/friend.go b/pkg/rpcclient/friend.go index 299c93b00..aa7ba0344 100644 --- a/pkg/rpcclient/friend.go +++ b/pkg/rpcclient/friend.go @@ -11,7 +11,7 @@ import ( ) type Friend struct { - conn *grpc.ClientConn + conn grpc.ClientConnInterface Client friend.FriendClient discov discoveryregistry.SvcDiscoveryRegistry } diff --git a/pkg/rpcclient/group.go b/pkg/rpcclient/group.go index 6f87e63cc..911887dda 100644 --- a/pkg/rpcclient/group.go +++ b/pkg/rpcclient/group.go @@ -15,7 +15,7 @@ import ( ) type Group struct { - conn *grpc.ClientConn + conn grpc.ClientConnInterface Client group.GroupClient discov discoveryregistry.SvcDiscoveryRegistry } diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index a592e104d..6097109ae 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -104,7 +104,7 @@ func newSessionTypeConf() map[int32]int32 { } type Message struct { - conn *grpc.ClientConn + conn grpc.ClientConnInterface Client msg.MsgClient discov discoveryregistry.SvcDiscoveryRegistry } diff --git a/pkg/rpcclient/push.go b/pkg/rpcclient/push.go index 3c9736cc2..8ac20b943 100644 --- a/pkg/rpcclient/push.go +++ b/pkg/rpcclient/push.go @@ -10,7 +10,7 @@ import ( ) type Push struct { - conn *grpc.ClientConn + conn grpc.ClientConnInterface Client push.PushMsgServiceClient discov discoveryregistry.SvcDiscoveryRegistry } diff --git a/pkg/rpcclient/third.go b/pkg/rpcclient/third.go index a59a5adb4..46c5e1b92 100644 --- a/pkg/rpcclient/third.go +++ b/pkg/rpcclient/third.go @@ -10,7 +10,7 @@ import ( ) type Third struct { - conn *grpc.ClientConn + conn grpc.ClientConnInterface Client third.ThirdClient discov discoveryregistry.SvcDiscoveryRegistry } diff --git a/pkg/rpcclient/user.go b/pkg/rpcclient/user.go index d717ba6f3..76fea81dd 100644 --- a/pkg/rpcclient/user.go +++ b/pkg/rpcclient/user.go @@ -15,7 +15,7 @@ import ( ) type User struct { - conn *grpc.ClientConn + conn grpc.ClientConnInterface Client user.UserClient discov discoveryregistry.SvcDiscoveryRegistry }