diff --git a/pkg/discoveryregistry/discovery_register.go b/pkg/discoveryregistry/discovery_register.go index e9e706ab4..4214ed49b 100644 --- a/pkg/discoveryregistry/discovery_register.go +++ b/pkg/discoveryregistry/discovery_register.go @@ -8,10 +8,10 @@ import ( ) type Conn interface { - GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) - GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) + GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error) + GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) AddOption(opts ...grpc.DialOption) - CloseConn(conn *grpc.ClientConn) + CloseConn(conn grpc.ClientConnInterface) // do not use this method for call rpc GetClientLocalConns() map[string][]resolver.Address } diff --git a/pkg/discoveryregistry/zookeeper/debug.go b/pkg/discoveryregistry/zookeeper/debug.go new file mode 100644 index 000000000..acfae7cc5 --- /dev/null +++ b/pkg/discoveryregistry/zookeeper/debug.go @@ -0,0 +1,47 @@ +package zookeeper + +import ( + "context" + "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" + "google.golang.org/grpc" + "strings" +) + +func newClientConnInterface(cc grpc.ClientConnInterface) grpc.ClientConnInterface { + return &clientConnInterface{cc: cc} +} + +type clientConnInterface struct { + cc grpc.ClientConnInterface +} + +func (c *clientConnInterface) callOptionToString(opts []grpc.CallOption) string { + arr := make([]string, 0, len(opts)+1) + arr = append(arr, fmt.Sprintf("opts len: %d", len(opts))) + for i, opt := range opts { + arr = append(arr, fmt.Sprintf("[%d:%T]", i, opt)) + } + return strings.Join(arr, ", ") +} + +func (c *clientConnInterface) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error { + log.ZDebug(ctx, "grpc.ClientConnInterface.Invoke in", "method", method, "args", args, "reply", reply, "opts", c.callOptionToString(opts)) + if err := c.cc.Invoke(ctx, method, args, reply, opts...); err != nil { + log.ZError(ctx, "grpc.ClientConnInterface.Invoke error", err, "method", method, "args", args, "reply", reply) + return err + } + log.ZDebug(ctx, "grpc.ClientConnInterface.Invoke success", "method", method, "args", args, "reply", reply) + return nil +} + +func (c *clientConnInterface) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { + log.ZDebug(ctx, "grpc.ClientConnInterface.NewStream in", "desc", desc, "method", method, "opts", c.callOptionToString(opts)) + cs, err := c.cc.NewStream(ctx, desc, method, opts...) + if err != nil { + log.ZError(ctx, "grpc.ClientConnInterface.NewStream error", err, "desc", desc, "method", method, "opts", len(opts)) + return nil, err + } + log.ZDebug(ctx, "grpc.ClientConnInterface.NewStream success", "desc", desc, "method", method, "opts", len(opts)) + return cs, nil +} diff --git a/pkg/discoveryregistry/zookeeper/discover.go b/pkg/discoveryregistry/zookeeper/discover.go index 610c125e8..8a7c114fa 100644 --- a/pkg/discoveryregistry/zookeeper/discover.go +++ b/pkg/discoveryregistry/zookeeper/discover.go @@ -71,7 +71,7 @@ func (s *ZkClient) GetConnsRemote(serviceName string) (conns []resolver.Address, return conns, nil } -func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { +func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error) { s.logger.Printf("get conns from client, serviceName: %s", serviceName) s.lock.Lock() defer s.lock.Unlock() @@ -86,24 +86,28 @@ func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grp } s.localConns[serviceName] = conns } - var ret []*grpc.ClientConn + var ret []grpc.ClientConnInterface s.logger.Printf("get conns from zk success, serviceName: %s", serviceName) for _, conn := range conns { - c, err := grpc.DialContext(ctx, conn.Addr, append(s.options, opts...)...) + cc, err := grpc.DialContext(ctx, conn.Addr, append(s.options, opts...)...) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("conns dialContext error, conn: %s", conn.Addr)) } - ret = append(ret, c) + ret = append(ret, newClientConnInterface(cc)) } s.logger.Printf("dial ctx success, serviceName: %s", serviceName) return ret, nil } -func (s *ZkClient) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { +func (s *ZkClient) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) { newOpts := append(s.options, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, s.balancerName))) - return grpc.DialContext(ctx, fmt.Sprintf("%s:///%s", s.scheme, serviceName), append(newOpts, opts...)...) + cc, err := grpc.DialContext(ctx, fmt.Sprintf("%s:///%s", s.scheme, serviceName), append(newOpts, opts...)...) + if err != nil { + return nil, err + } + return newClientConnInterface(cc), nil } -func (s *ZkClient) CloseConn(conn *grpc.ClientConn) { - conn.Close() +func (s *ZkClient) CloseConn(conn grpc.ClientConnInterface) { + //conn.Close() }