diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 670757850..807c4af3b 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -17,22 +17,19 @@ package msggateway import ( "context" - "github.com/OpenIMSDK/tools/mcontext" - - "github.com/openimsdk/open-im-server/v3/pkg/authverify" - - "github.com/OpenIMSDK/tools/errs" "google.golang.org/grpc" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" - "github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/msggateway" "github.com/OpenIMSDK/tools/discoveryregistry" + "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" + "github.com/OpenIMSDK/tools/mcontext" "github.com/OpenIMSDK/tools/utils" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" ) @@ -41,6 +38,7 @@ func (s *Server) InitServer(disCov discoveryregistry.SvcDiscoveryRegistry, serve if err != nil { return err } + msgModel := cache.NewMsgCacheModel(rdb) s.LongConnServer.SetDiscoveryRegistry(disCov) s.LongConnServer.SetCacheHandler(msgModel) @@ -97,22 +95,25 @@ func (s *Server) GetUsersOnlineStatus( if !ok { continue } - temp := new(msggateway.GetUsersOnlineStatusResp_SuccessResult) - temp.UserID = userID + + uresp := new(msggateway.GetUsersOnlineStatusResp_SuccessResult) + uresp.UserID = userID for _, client := range clients { - if client != nil { - ps := new(msggateway.GetUsersOnlineStatusResp_SuccessDetail) - ps.Platform = constant.PlatformIDToName(client.PlatformID) - ps.Status = constant.OnlineStatus - ps.ConnID = client.ctx.GetConnID() - ps.Token = client.token - ps.IsBackground = client.IsBackground - temp.Status = constant.OnlineStatus - temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, ps) + if client == nil { + continue } + + ps := new(msggateway.GetUsersOnlineStatusResp_SuccessDetail) + ps.Platform = constant.PlatformIDToName(client.PlatformID) + ps.Status = constant.OnlineStatus + ps.ConnID = client.ctx.GetConnID() + ps.Token = client.token + ps.IsBackground = client.IsBackground + uresp.Status = constant.OnlineStatus + uresp.DetailPlatformStatus = append(uresp.DetailPlatformStatus, ps) } - if temp.Status == constant.OnlineStatus { - resp.SuccessResult = append(resp.SuccessResult, temp) + if uresp.Status == constant.OnlineStatus { + resp.SuccessResult = append(resp.SuccessResult, uresp) } } return &resp, nil @@ -129,50 +130,55 @@ func (s *Server) SuperGroupOnlineBatchPushOneMsg( ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq, ) (*msggateway.OnlineBatchPushOneMsgResp, error) { - var singleUserResult []*msggateway.SingleMsgToUserResults + + var singleUserResults []*msggateway.SingleMsgToUserResults + for _, v := range req.PushToUserIDs { var resp []*msggateway.SingleMsgToUserPlatform - tempT := &msggateway.SingleMsgToUserResults{ + results := &msggateway.SingleMsgToUserResults{ UserID: v, } clients, ok := s.LongConnServer.GetUserAllCons(v) if !ok { log.ZDebug(ctx, "push user not online", "userID", v) - tempT.Resp = resp - singleUserResult = append(singleUserResult, tempT) + results.Resp = resp + singleUserResults = append(singleUserResults, results) continue } + log.ZDebug(ctx, "push user online", "clients", clients, "userID", v) for _, client := range clients { - if client != nil { - temp := &msggateway.SingleMsgToUserPlatform{ - RecvID: v, - RecvPlatFormID: int32(client.PlatformID), - } - if !client.IsBackground || - (client.IsBackground == true && client.PlatformID != constant.IOSPlatformID) { - err := client.PushMessage(ctx, req.MsgData) - if err != nil { - temp.ResultCode = -2 - resp = append(resp, temp) - } else { - if utils.IsContainInt(client.PlatformID, s.pushTerminal) { - tempT.OnlinePush = true - resp = append(resp, temp) - } - } + if client == nil { + continue + } + + userPlatform := &msggateway.SingleMsgToUserPlatform{ + RecvID: v, + RecvPlatFormID: int32(client.PlatformID), + } + if !client.IsBackground || + (client.IsBackground && client.PlatformID != constant.IOSPlatformID) { + err := client.PushMessage(ctx, req.MsgData) + if err != nil { + userPlatform.ResultCode = -2 + resp = append(resp, userPlatform) } else { - temp.ResultCode = -3 - resp = append(resp, temp) + if utils.IsContainInt(client.PlatformID, s.pushTerminal) { + results.OnlinePush = true + resp = append(resp, userPlatform) + } } + } else { + userPlatform.ResultCode = -3 + resp = append(resp, userPlatform) } } - tempT.Resp = resp - singleUserResult = append(singleUserResult, tempT) + results.Resp = resp + singleUserResults = append(singleUserResults, results) } return &msggateway.OnlineBatchPushOneMsgResp{ - SinglePushResult: singleUserResult, + SinglePushResult: singleUserResults, }, nil } @@ -181,17 +187,21 @@ func (s *Server) KickUserOffline( req *msggateway.KickUserOfflineReq, ) (*msggateway.KickUserOfflineResp, error) { for _, v := range req.KickUserIDList { - if clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID)); ok { - for _, client := range clients { - log.ZDebug(ctx, "kick user offline", "userID", v, "platformID", req.PlatformID, "client", client) - if err := client.longConnServer.KickUserConn(client); err != nil { - log.ZWarn(ctx, "kick user offline failed", err, "userID", v, "platformID", req.PlatformID) - } - } - } else { + clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID)) + if !ok { log.ZInfo(ctx, "conn not exist", "userID", v, "platformID", req.PlatformID) + continue + } + + for _, client := range clients { + log.ZDebug(ctx, "kick user offline", "userID", v, "platformID", req.PlatformID, "client", client) + if err := client.longConnServer.KickUserConn(client); err != nil { + log.ZWarn(ctx, "kick user offline failed", err, "userID", v, "platformID", req.PlatformID) + } } + continue } + return &msggateway.KickUserOfflineResp{}, nil } diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index a0cecd7f5..10dd988d1 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -57,12 +57,6 @@ type LongConnServer interface { MessageHandler } -var bufferPool = sync.Pool{ - New: func() interface{} { - return make([]byte, 1024) - }, -} - type WsServer struct { port int wsMaxConnNum int64 @@ -374,7 +368,7 @@ func (ws *WsServer) unregisterClient(client *Client) { } ws.onlineUserConnNum.Add(-1) ws.SetUserOnlineStatus(client.ctx, client, constant.Offline) - log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", ws.onlineUserNum, "online user conn Num", + log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", ws.onlineUserNum.Load(), "online user conn Num", ws.onlineUserConnNum.Load(), ) }