|
|
@ -22,11 +22,15 @@ import (
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
|
|
|
|
"github.com/openimsdk/protocol/constant"
|
|
|
|
"github.com/openimsdk/protocol/constant"
|
|
|
|
"github.com/openimsdk/protocol/msggateway"
|
|
|
|
"github.com/openimsdk/protocol/msggateway"
|
|
|
|
|
|
|
|
"github.com/openimsdk/protocol/sdkws"
|
|
|
|
"github.com/openimsdk/tools/discovery"
|
|
|
|
"github.com/openimsdk/tools/discovery"
|
|
|
|
"github.com/openimsdk/tools/errs"
|
|
|
|
"github.com/openimsdk/tools/errs"
|
|
|
|
"github.com/openimsdk/tools/log"
|
|
|
|
"github.com/openimsdk/tools/log"
|
|
|
|
"github.com/openimsdk/tools/mcontext"
|
|
|
|
"github.com/openimsdk/tools/mcontext"
|
|
|
|
|
|
|
|
"github.com/openimsdk/tools/mq/memamq"
|
|
|
|
|
|
|
|
"github.com/openimsdk/tools/utils/datautil"
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
|
|
|
|
"sync/atomic"
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
|
|
|
|
func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
|
|
|
@ -57,6 +61,7 @@ type Server struct {
|
|
|
|
pushTerminal map[int]struct{}
|
|
|
|
pushTerminal map[int]struct{}
|
|
|
|
ready func(srv *Server) error
|
|
|
|
ready func(srv *Server) error
|
|
|
|
userRcp rpcclient.UserRpcClient
|
|
|
|
userRcp rpcclient.UserRpcClient
|
|
|
|
|
|
|
|
queue *memamq.MemoryQueue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (s *Server) SetLongConnServer(LongConnServer LongConnServer) {
|
|
|
|
func (s *Server) SetLongConnServer(LongConnServer LongConnServer) {
|
|
|
@ -70,6 +75,7 @@ func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config, ready f
|
|
|
|
pushTerminal: make(map[int]struct{}),
|
|
|
|
pushTerminal: make(map[int]struct{}),
|
|
|
|
config: conf,
|
|
|
|
config: conf,
|
|
|
|
ready: ready,
|
|
|
|
ready: ready,
|
|
|
|
|
|
|
|
queue: memamq.NewMemoryQueue(512, 1024*16),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
s.pushTerminal[constant.IOSPlatformID] = struct{}{}
|
|
|
|
s.pushTerminal[constant.IOSPlatformID] = struct{}{}
|
|
|
|
s.pushTerminal[constant.AndroidPlatformID] = struct{}{}
|
|
|
|
s.pushTerminal[constant.AndroidPlatformID] = struct{}{}
|
|
|
@ -125,55 +131,93 @@ func (s *Server) OnlineBatchPushOneMsg(ctx context.Context, req *msggateway.Onli
|
|
|
|
return nil, nil
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq,
|
|
|
|
func (s *Server) pushToUser(ctx context.Context, userID string, msgData *sdkws.MsgData) *msggateway.SingleMsgToUserResults {
|
|
|
|
) (*msggateway.OnlineBatchPushOneMsgResp, error) {
|
|
|
|
clients, ok := s.LongConnServer.GetUserAllCons(userID)
|
|
|
|
var singleUserResults []*msggateway.SingleMsgToUserResults
|
|
|
|
|
|
|
|
for _, v := range req.PushToUserIDs {
|
|
|
|
|
|
|
|
var resp []*msggateway.SingleMsgToUserPlatform
|
|
|
|
|
|
|
|
results := &msggateway.SingleMsgToUserResults{
|
|
|
|
|
|
|
|
UserID: v,
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
clients, ok := s.LongConnServer.GetUserAllCons(v)
|
|
|
|
|
|
|
|
if !ok {
|
|
|
|
if !ok {
|
|
|
|
log.ZDebug(ctx, "push user not online", "userID", v)
|
|
|
|
log.ZDebug(ctx, "push user not online", "userID", userID)
|
|
|
|
results.Resp = resp
|
|
|
|
return &msggateway.SingleMsgToUserResults{
|
|
|
|
singleUserResults = append(singleUserResults, results)
|
|
|
|
UserID: userID,
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
log.ZDebug(ctx, "push user online", "clients", clients, "userID", userID)
|
|
|
|
|
|
|
|
result := &msggateway.SingleMsgToUserResults{
|
|
|
|
|
|
|
|
UserID: userID,
|
|
|
|
|
|
|
|
Resp: make([]*msggateway.SingleMsgToUserPlatform, 0, len(clients)),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
log.ZDebug(ctx, "push user online", "clients", clients, "userID", v)
|
|
|
|
|
|
|
|
for _, client := range clients {
|
|
|
|
for _, client := range clients {
|
|
|
|
if client == nil {
|
|
|
|
if client == nil {
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
userPlatform := &msggateway.SingleMsgToUserPlatform{
|
|
|
|
userPlatform := &msggateway.SingleMsgToUserPlatform{
|
|
|
|
RecvPlatFormID: int32(client.PlatformID),
|
|
|
|
RecvPlatFormID: int32(client.PlatformID),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if !client.IsBackground ||
|
|
|
|
if !client.IsBackground ||
|
|
|
|
(client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
|
|
|
|
(client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
|
|
|
|
err := client.PushMessage(ctx, req.MsgData)
|
|
|
|
err := client.PushMessage(ctx, msgData)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
|
|
|
|
userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
|
|
|
|
resp = append(resp, userPlatform)
|
|
|
|
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
if _, ok := s.pushTerminal[client.PlatformID]; ok {
|
|
|
|
if _, ok := s.pushTerminal[client.PlatformID]; ok {
|
|
|
|
results.OnlinePush = true
|
|
|
|
result.OnlinePush = true
|
|
|
|
resp = append(resp, userPlatform)
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
userPlatform.ResultCode = int64(servererrs.ErrIOSBackgroundPushErr.Code())
|
|
|
|
userPlatform.ResultCode = int64(servererrs.ErrIOSBackgroundPushErr.Code())
|
|
|
|
resp = append(resp, userPlatform)
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
result.Resp = append(result.Resp, userPlatform)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
results.Resp = resp
|
|
|
|
return result
|
|
|
|
singleUserResults = append(singleUserResults, results)
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return &msggateway.OnlineBatchPushOneMsgResp{
|
|
|
|
func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq) (*msggateway.OnlineBatchPushOneMsgResp, error) {
|
|
|
|
SinglePushResult: singleUserResults,
|
|
|
|
if len(req.PushToUserIDs) == 0 {
|
|
|
|
}, nil
|
|
|
|
return &msggateway.OnlineBatchPushOneMsgResp{}, nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
ch := make(chan *msggateway.SingleMsgToUserResults, len(req.PushToUserIDs))
|
|
|
|
|
|
|
|
var count atomic.Int64
|
|
|
|
|
|
|
|
count.Add(int64(len(req.PushToUserIDs)))
|
|
|
|
|
|
|
|
for i := range req.PushToUserIDs {
|
|
|
|
|
|
|
|
userID := req.PushToUserIDs[i]
|
|
|
|
|
|
|
|
err := s.queue.PushCtx(ctx, func() {
|
|
|
|
|
|
|
|
ch <- s.pushToUser(ctx, userID, req.MsgData)
|
|
|
|
|
|
|
|
if count.Add(-1) == 0 {
|
|
|
|
|
|
|
|
close(ch)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
if count.Add(-1) == 0 {
|
|
|
|
|
|
|
|
close(ch)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
log.ZError(ctx, "pushToUser MemoryQueue failed", err, "userID", userID)
|
|
|
|
|
|
|
|
ch <- &msggateway.SingleMsgToUserResults{
|
|
|
|
|
|
|
|
UserID: userID,
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
resp := &msggateway.OnlineBatchPushOneMsgResp{
|
|
|
|
|
|
|
|
SinglePushResult: make([]*msggateway.SingleMsgToUserResults, 0, len(req.PushToUserIDs)),
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
|
|
|
log.ZError(ctx, "SuperGroupOnlineBatchPushOneMsg ctx done", context.Cause(ctx))
|
|
|
|
|
|
|
|
userIDSet := datautil.SliceSet(req.PushToUserIDs)
|
|
|
|
|
|
|
|
for _, results := range resp.SinglePushResult {
|
|
|
|
|
|
|
|
delete(userIDSet, results.UserID)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
for userID := range userIDSet {
|
|
|
|
|
|
|
|
resp.SinglePushResult = append(resp.SinglePushResult, &msggateway.SingleMsgToUserResults{
|
|
|
|
|
|
|
|
UserID: userID,
|
|
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return resp, nil
|
|
|
|
|
|
|
|
case res, ok := <-ch:
|
|
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
|
|
return resp, nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
resp.SinglePushResult = append(resp.SinglePushResult, res)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (s *Server) KickUserOffline(
|
|
|
|
func (s *Server) KickUserOffline(
|
|
|
|