Merge branch 'ver3' of github.com:OpenIMSDK/Open-IM-Server into ver3

pull/458/head
wangchuxiao 2 years ago
commit dea358ee22

@ -247,7 +247,7 @@ func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
for _, v := range conns { for _, v := range conns {
msgClient := msggateway.NewMsgGatewayClient(v) msgClient := msggateway.NewMsgGatewayClient(v)
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs}) reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs})
v.Close() p.discov.CloseConn(v)
if err != nil { if err != nil {
continue continue
} }

@ -109,7 +109,7 @@ func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID
client := msggateway.NewMsgGatewayClient(v) client := msggateway.NewMsgGatewayClient(v)
kickReq := &msggateway.KickUserOfflineReq{KickUserIDList: []string{userID}, PlatformID: platformID} kickReq := &msggateway.KickUserOfflineReq{KickUserIDList: []string{userID}, PlatformID: platformID}
_, err := client.KickUserOffline(ctx, kickReq) _, err := client.KickUserOffline(ctx, kickReq)
v.Close() s.RegisterCenter.CloseConn(v)
return utils.Wrap(err, "") return utils.Wrap(err, "")
} }
return errs.ErrInternalServer.Wrap() return errs.ErrInternalServer.Wrap()

@ -8,10 +8,10 @@ import (
) )
type Conn interface { type Conn interface {
GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error)
GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error)
AddOption(opts ...grpc.DialOption) AddOption(opts ...grpc.DialOption)
CloseConn(conn *grpc.ClientConn) CloseConn(conn grpc.ClientConnInterface)
// do not use this method for call rpc // do not use this method for call rpc
GetClientLocalConns() map[string][]resolver.Address GetClientLocalConns() map[string][]resolver.Address
} }

@ -0,0 +1,47 @@
package zookeeper
import (
"context"
"fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"google.golang.org/grpc"
"strings"
)
func newClientConnInterface(cc grpc.ClientConnInterface) grpc.ClientConnInterface {
return &clientConnInterface{cc: cc}
}
type clientConnInterface struct {
cc grpc.ClientConnInterface
}
func (c *clientConnInterface) callOptionToString(opts []grpc.CallOption) string {
arr := make([]string, 0, len(opts)+1)
arr = append(arr, fmt.Sprintf("opts len: %d", len(opts)))
for i, opt := range opts {
arr = append(arr, fmt.Sprintf("[%d:%T]", i, opt))
}
return strings.Join(arr, ", ")
}
func (c *clientConnInterface) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
log.ZDebug(ctx, "grpc.ClientConnInterface.Invoke in", "method", method, "args", args, "reply", reply, "opts", c.callOptionToString(opts))
if err := c.cc.Invoke(ctx, method, args, reply, opts...); err != nil {
log.ZError(ctx, "grpc.ClientConnInterface.Invoke error", err, "method", method, "args", args, "reply", reply)
return err
}
log.ZDebug(ctx, "grpc.ClientConnInterface.Invoke success", "method", method, "args", args, "reply", reply)
return nil
}
func (c *clientConnInterface) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
log.ZDebug(ctx, "grpc.ClientConnInterface.NewStream in", "desc", desc, "method", method, "opts", c.callOptionToString(opts))
cs, err := c.cc.NewStream(ctx, desc, method, opts...)
if err != nil {
log.ZError(ctx, "grpc.ClientConnInterface.NewStream error", err, "desc", desc, "method", method, "opts", len(opts))
return nil, err
}
log.ZDebug(ctx, "grpc.ClientConnInterface.NewStream success", "desc", desc, "method", method, "opts", len(opts))
return cs, nil
}

@ -3,6 +3,7 @@ package zookeeper
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"strings" "strings"
"sync" "sync"
@ -71,7 +72,7 @@ func (s *ZkClient) GetConnsRemote(serviceName string) (conns []resolver.Address,
return conns, nil return conns, nil
} }
func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]grpc.ClientConnInterface, error) {
s.logger.Printf("get conns from client, serviceName: %s", serviceName) s.logger.Printf("get conns from client, serviceName: %s", serviceName)
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
@ -86,24 +87,30 @@ func (s *ZkClient) GetConns(ctx context.Context, serviceName string, opts ...grp
} }
s.localConns[serviceName] = conns s.localConns[serviceName] = conns
} }
var ret []*grpc.ClientConn var ret []grpc.ClientConnInterface
s.logger.Printf("get conns from zk success, serviceName: %s", serviceName) s.logger.Printf("get conns from zk success, serviceName: %s", serviceName)
for _, conn := range conns { for _, conn := range conns {
c, err := grpc.DialContext(ctx, conn.Addr, append(s.options, opts...)...) cc, err := grpc.DialContext(ctx, conn.Addr, append(s.options, opts...)...)
if err != nil { if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("conns dialContext error, conn: %s", conn.Addr)) return nil, errors.Wrap(err, fmt.Sprintf("conns dialContext error, conn: %s", conn.Addr))
} }
ret = append(ret, c) ret = append(ret, newClientConnInterface(cc))
} }
s.logger.Printf("dial ctx success, serviceName: %s", serviceName) s.logger.Printf("dial ctx success, serviceName: %s", serviceName)
return ret, nil return ret, nil
} }
func (s *ZkClient) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { func (s *ZkClient) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (grpc.ClientConnInterface, error) {
newOpts := append(s.options, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, s.balancerName))) newOpts := append(s.options, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, s.balancerName)))
return grpc.DialContext(ctx, fmt.Sprintf("%s:///%s", s.scheme, serviceName), append(newOpts, opts...)...) cc, err := grpc.DialContext(ctx, fmt.Sprintf("%s:///%s", s.scheme, serviceName), append(newOpts, opts...)...)
if err != nil {
return nil, err
}
return newClientConnInterface(cc), nil
} }
func (s *ZkClient) CloseConn(conn *grpc.ClientConn) { func (s *ZkClient) CloseConn(conn grpc.ClientConnInterface) {
conn.Close() if closer, ok := conn.(io.Closer); ok {
closer.Close()
}
} }

@ -19,7 +19,7 @@ func NewAuth(discov discoveryregistry.SvcDiscoveryRegistry) *Auth {
} }
type Auth struct { type Auth struct {
conn *grpc.ClientConn conn grpc.ClientConnInterface
Client auth.AuthClient Client auth.AuthClient
discov discoveryregistry.SvcDiscoveryRegistry discov discoveryregistry.SvcDiscoveryRegistry
} }

@ -14,7 +14,7 @@ import (
type Conversation struct { type Conversation struct {
Client conversation.ConversationClient Client conversation.ConversationClient
conn *grpc.ClientConn conn grpc.ClientConnInterface
discov discoveryregistry.SvcDiscoveryRegistry discov discoveryregistry.SvcDiscoveryRegistry
} }

@ -11,7 +11,7 @@ import (
) )
type Friend struct { type Friend struct {
conn *grpc.ClientConn conn grpc.ClientConnInterface
Client friend.FriendClient Client friend.FriendClient
discov discoveryregistry.SvcDiscoveryRegistry discov discoveryregistry.SvcDiscoveryRegistry
} }

@ -15,7 +15,7 @@ import (
) )
type Group struct { type Group struct {
conn *grpc.ClientConn conn grpc.ClientConnInterface
Client group.GroupClient Client group.GroupClient
discov discoveryregistry.SvcDiscoveryRegistry discov discoveryregistry.SvcDiscoveryRegistry
} }

@ -104,7 +104,7 @@ func newSessionTypeConf() map[int32]int32 {
} }
type Message struct { type Message struct {
conn *grpc.ClientConn conn grpc.ClientConnInterface
Client msg.MsgClient Client msg.MsgClient
discov discoveryregistry.SvcDiscoveryRegistry discov discoveryregistry.SvcDiscoveryRegistry
} }

@ -10,7 +10,7 @@ import (
) )
type Push struct { type Push struct {
conn *grpc.ClientConn conn grpc.ClientConnInterface
Client push.PushMsgServiceClient Client push.PushMsgServiceClient
discov discoveryregistry.SvcDiscoveryRegistry discov discoveryregistry.SvcDiscoveryRegistry
} }

@ -10,7 +10,7 @@ import (
) )
type Third struct { type Third struct {
conn *grpc.ClientConn conn grpc.ClientConnInterface
Client third.ThirdClient Client third.ThirdClient
discov discoveryregistry.SvcDiscoveryRegistry discov discoveryregistry.SvcDiscoveryRegistry
} }

@ -15,7 +15,7 @@ import (
) )
type User struct { type User struct {
conn *grpc.ClientConn conn grpc.ClientConnInterface
Client user.UserClient Client user.UserClient
discov discoveryregistry.SvcDiscoveryRegistry discov discoveryregistry.SvcDiscoveryRegistry
} }

Loading…
Cancel
Save