diff --git a/go.mod b/go.mod index 0637492eb..fa40effdd 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.69-alpha.38 - github.com/openimsdk/tools v0.0.49-alpha.51 + github.com/openimsdk/tools v0.0.49-alpha.52 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 92a783c06..cc0f5f766 100644 --- a/go.sum +++ b/go.sum @@ -321,8 +321,8 @@ github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCF github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.69-alpha.38 h1:kVZCHIXg/el8YJFoIBWhZu1sbbTUqmzgF4l0W3sUH24= github.com/openimsdk/protocol v0.0.69-alpha.38/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.49-alpha.51 h1:JTPEetVSNOczw1n+XjiPozaH2SBPQAc+9VlPE41wEeY= -github.com/openimsdk/tools v0.0.49-alpha.51/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= +github.com/openimsdk/tools v0.0.49-alpha.52 h1:NwAAtBO4BV96qG6Z0P2btGEqn4AI2DFgaHvLMXNHal0= +github.com/openimsdk/tools v0.0.49-alpha.52/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 3891aa532..33e761a22 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -22,11 +22,15 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msggateway" + "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" + "github.com/openimsdk/tools/mq/memamq" + "github.com/openimsdk/tools/utils/datautil" "google.golang.org/grpc" + "sync/atomic" ) func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error { @@ -57,6 +61,7 @@ type Server struct { pushTerminal map[int]struct{} ready func(srv *Server) error userRcp rpcclient.UserRpcClient + queue *memamq.MemoryQueue } func (s *Server) SetLongConnServer(LongConnServer LongConnServer) { @@ -70,6 +75,7 @@ func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config, ready f pushTerminal: make(map[int]struct{}), config: conf, ready: ready, + queue: memamq.NewMemoryQueue(512, 1024*16), } s.pushTerminal[constant.IOSPlatformID] = struct{}{} s.pushTerminal[constant.AndroidPlatformID] = struct{}{} @@ -125,57 +131,150 @@ func (s *Server) OnlineBatchPushOneMsg(ctx context.Context, req *msggateway.Onli return nil, nil } -func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq, -) (*msggateway.OnlineBatchPushOneMsgResp, error) { - var singleUserResults []*msggateway.SingleMsgToUserResults - for _, v := range req.PushToUserIDs { - var resp []*msggateway.SingleMsgToUserPlatform - results := &msggateway.SingleMsgToUserResults{ - UserID: v, +func (s *Server) pushToUser(ctx context.Context, userID string, msgData *sdkws.MsgData) *msggateway.SingleMsgToUserResults { + clients, ok := s.LongConnServer.GetUserAllCons(userID) + if !ok { + log.ZDebug(ctx, "push user not online", "userID", userID) + return &msggateway.SingleMsgToUserResults{ + UserID: userID, } - clients, ok := s.LongConnServer.GetUserAllCons(v) - if !ok { - log.ZDebug(ctx, "push user not online", "userID", v) - results.Resp = resp - singleUserResults = append(singleUserResults, results) + } + log.ZDebug(ctx, "push user online", "clients", clients, "userID", userID) + result := &msggateway.SingleMsgToUserResults{ + UserID: userID, + Resp: make([]*msggateway.SingleMsgToUserPlatform, 0, len(clients)), + } + for _, client := range clients { + if client == nil { continue } - - log.ZDebug(ctx, "push user online", "clients", clients, "userID", v) - for _, client := range clients { - if client == nil { - continue + userPlatform := &msggateway.SingleMsgToUserPlatform{ + RecvPlatFormID: int32(client.PlatformID), + } + if !client.IsBackground || + (client.IsBackground && client.PlatformID != constant.IOSPlatformID) { + err := client.PushMessage(ctx, msgData) + if err != nil { + userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code()) + //result.Resp = append(result.Resp, userPlatform) + //resp = append(resp, userPlatform) + } else { + if _, ok := s.pushTerminal[client.PlatformID]; ok { + result.OnlinePush = true + //result.Resp = append(result.Resp, userPlatform) + //results.OnlinePush = true + //resp = append(resp, userPlatform) + } } + } else { + userPlatform.ResultCode = int64(servererrs.ErrIOSBackgroundPushErr.Code()) + //resp = append(resp, userPlatform) + } + result.Resp = append(result.Resp, userPlatform) + } + return result +} - userPlatform := &msggateway.SingleMsgToUserPlatform{ - RecvPlatFormID: int32(client.PlatformID), +func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq) (*msggateway.OnlineBatchPushOneMsgResp, error) { + if len(req.PushToUserIDs) == 0 { + return &msggateway.OnlineBatchPushOneMsgResp{}, nil + } + ch := make(chan *msggateway.SingleMsgToUserResults, len(req.PushToUserIDs)) + var count atomic.Int64 + count.Add(int64(len(req.PushToUserIDs))) + for _, userID := range req.PushToUserIDs { + err := s.queue.PushCtx(ctx, func() { + ch <- s.pushToUser(ctx, userID, req.MsgData) + if count.Add(-1) == 0 { + close(ch) } - if !client.IsBackground || - (client.IsBackground && client.PlatformID != constant.IOSPlatformID) { - err := client.PushMessage(ctx, req.MsgData) - if err != nil { - userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code()) - resp = append(resp, userPlatform) - } else { - if _, ok := s.pushTerminal[client.PlatformID]; ok { - results.OnlinePush = true - resp = append(resp, userPlatform) - } - } - } else { - userPlatform.ResultCode = int64(servererrs.ErrIOSBackgroundPushErr.Code()) - resp = append(resp, userPlatform) + }) + if err != nil { + if count.Add(-1) == 0 { + close(ch) + } + log.ZError(ctx, "pushToUser MemoryQueue failed", err, "userID", userID) + ch <- &msggateway.SingleMsgToUserResults{ + UserID: userID, } } - results.Resp = resp - singleUserResults = append(singleUserResults, results) } - - return &msggateway.OnlineBatchPushOneMsgResp{ - SinglePushResult: singleUserResults, - }, nil + resp := &msggateway.OnlineBatchPushOneMsgResp{ + SinglePushResult: make([]*msggateway.SingleMsgToUserResults, 0, len(req.PushToUserIDs)), + } + for { + select { + case <-ctx.Done(): + log.ZError(ctx, "SuperGroupOnlineBatchPushOneMsg ctx done", context.Cause(ctx)) + userIDSet := datautil.SliceSet(req.PushToUserIDs) + for _, results := range resp.SinglePushResult { + delete(userIDSet, results.UserID) + } + for userID := range userIDSet { + resp.SinglePushResult = append(resp.SinglePushResult, &msggateway.SingleMsgToUserResults{ + UserID: userID, + }) + } + return resp, nil + case res, ok := <-ch: + if !ok { + return resp, nil + } + resp.SinglePushResult = append(resp.SinglePushResult, res) + } + } } +//func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq) (*msggateway.OnlineBatchPushOneMsgResp, error) { +// var singleUserResults []*msggateway.SingleMsgToUserResults +// for _, userID := range req.PushToUserIDs { +// var resp []*msggateway.SingleMsgToUserPlatform +// results := &msggateway.SingleMsgToUserResults{ +// UserID: userID, +// } +// clients, ok := s.LongConnServer.GetUserAllCons(userID) +// if !ok { +// log.ZDebug(ctx, "push user not online", "userID", userID) +// results.Resp = resp +// singleUserResults = append(singleUserResults, results) +// continue +// } +// +// log.ZDebug(ctx, "push user online", "clients", clients, "userID", userID) +// for _, client := range clients { +// if client == nil { +// continue +// } +// +// userPlatform := &msggateway.SingleMsgToUserPlatform{ +// RecvPlatFormID: int32(client.PlatformID), +// } +// if !client.IsBackground || +// (client.IsBackground && client.PlatformID != constant.IOSPlatformID) { +// err := client.PushMessage(ctx, req.MsgData) +// if err != nil { +// userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code()) +// resp = append(resp, userPlatform) +// } else { +// if _, ok := s.pushTerminal[client.PlatformID]; ok { +// results.OnlinePush = true +// resp = append(resp, userPlatform) +// } +// } +// } else { +// userPlatform.ResultCode = int64(servererrs.ErrIOSBackgroundPushErr.Code()) +// resp = append(resp, userPlatform) +// } +// } +// results.Resp = resp +// singleUserResults = append(singleUserResults, results) +// } +// +// return &msggateway.OnlineBatchPushOneMsgResp{ +// SinglePushResult: singleUserResults, +// }, nil +//} + func (s *Server) KickUserOffline( ctx context.Context, req *msggateway.KickUserOfflineReq,