pull/458/head
withchao 2 years ago
parent d4b67f18bf
commit c6f41c6b99

@ -8,10 +8,10 @@ import (
)
type Conn interface {
GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error)
GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error)
GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error)
GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error)
AddOption(opts ...grpc.DialOption)
CloseConn(conn *grpc.ClientConn)
CloseConn(conn grpc.ClientConnInterface)
// do not use this method for call rpc
GetClientLocalConns() map[string][]resolver.Address
}

@ -0,0 +1,47 @@
package zookeeper
import (
"context"
"fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"google.golang.org/grpc"
"strings"
)
func newClientConnInterface(cc grpc.ClientConnInterface) grpc.ClientConnInterface {
return &clientConnInterface{cc: cc}
}
type clientConnInterface struct {
cc grpc.ClientConnInterface
}
func (c *clientConnInterface) callOptionToString(opts []grpc.CallOption) string {
arr := make([]string, 0, len(opts)+1)
arr = append(arr, fmt.Sprintf("opts len: %d", len(opts)))
for i, opt := range opts {
arr = append(arr, fmt.Sprintf("[%d:%T]", i, opt))
}
return strings.Join(arr, ", ")
}
func (c *clientConnInterface) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
log.ZDebug(ctx, "grpc.ClientConnInterface.Invoke in", "method", method, "args", args, "reply", reply, "opts", c.callOptionToString(opts))
if err := c.cc.Invoke(ctx, method, args, reply, opts...); err != nil {
log.ZError(ctx, "grpc.ClientConnInterface.Invoke error", err, "method", method, "args", args, "reply", reply)
return err
}
log.ZDebug(ctx, "grpc.ClientConnInterface.Invoke success", "method", method, "args", args, "reply", reply)
return nil
}
func (c *clientConnInterface) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
log.ZDebug(ctx, "grpc.ClientConnInterface.NewStream in", "desc", desc, "method", method, "opts", c.callOptionToString(opts))
cs, err := c.cc.NewStream(ctx, desc, method, opts...)
if err != nil {
log.ZError(ctx, "grpc.ClientConnInterface.NewStream error", err, "desc", desc, "method", method, "opts", len(opts))
return nil, err
}
log.ZDebug(ctx, "grpc.ClientConnInterface.NewStream success", "desc", desc, "method", method, "opts", len(opts))
return cs, nil
}

@ -71,7 +71,7 @@ func (s *ZkClient) GetConnsRemote(serviceName string) (conns []resolver.Address,
return conns, nil
}
func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error) {
s.logger.Printf("get conns from client, serviceName: %s", serviceName)
s.lock.Lock()
defer s.lock.Unlock()
@ -86,24 +86,28 @@ func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grp
}
s.localConns[serviceName] = conns
}
var ret []*grpc.ClientConn
var ret []grpc.ClientConnInterface
s.logger.Printf("get conns from zk success, serviceName: %s", serviceName)
for _, conn := range conns {
c, err := grpc.DialContext(ctx, conn.Addr, append(s.options, opts...)...)
cc, err := grpc.DialContext(ctx, conn.Addr, append(s.options, opts...)...)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("conns dialContext error, conn: %s", conn.Addr))
}
ret = append(ret, c)
ret = append(ret, newClientConnInterface(cc))
}
s.logger.Printf("dial ctx success, serviceName: %s", serviceName)
return ret, nil
}
func (s *ZkClient) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
func (s *ZkClient) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) {
newOpts := append(s.options, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, s.balancerName)))
return grpc.DialContext(ctx, fmt.Sprintf("%s:///%s", s.scheme, serviceName), append(newOpts, opts...)...)
cc, err := grpc.DialContext(ctx, fmt.Sprintf("%s:///%s", s.scheme, serviceName), append(newOpts, opts...)...)
if err != nil {
return nil, err
}
return newClientConnInterface(cc), nil
}
func (s *ZkClient) CloseConn(conn *grpc.ClientConn) {
conn.Close()
func (s *ZkClient) CloseConn(conn grpc.ClientConnInterface) {
//conn.Close()
}

Loading…
Cancel
Save