diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index ef6df3e4f..096fd1463 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/msgprocessor" "runtime/debug" "sync" @@ -241,9 +242,9 @@ func (c *Client) replyMessage(ctx context.Context, binaryReq *Req, err error, re func (c *Client) PushMessage(ctx context.Context, msgData *sdkws.MsgData) error { var msg sdkws.PushMessages - conversationID := utils.GetConversationIDByMsg(msgData) + conversationID := msgprocessor.GetConversationIDByMsg(msgData) m := map[string]*sdkws.PullMsgs{conversationID: {Msgs: []*sdkws.MsgData{msgData}}} - if utils.IsNotification(conversationID) { + if msgprocessor.IsNotification(conversationID) { msg.NotificationMsgs = m } else { msg.Msgs = m diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 031fdc188..b787268d0 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "github.com/OpenIMSDK/Open-IM-Server/pkg/msgprocessor" "github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush" "github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush/fcm" @@ -82,7 +83,7 @@ func NewOfflinePusher(cache cache.MsgModel) offlinepush.OfflinePusher { } func (p *Pusher) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error { - conevrsationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID) + conevrsationID := msgprocessor.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID) maxSeq, err := p.msgRpcClient.GetConversationMaxSeq(ctx, conevrsationID) if err != nil { return err @@ -170,7 +171,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws }(groupID, kickedUsers) pushToUserIDs = append(pushToUserIDs, kickedUsers...) case constant.GroupDismissedNotification: - if utils.IsNotification(utils.GetConversationIDByMsg(msg)) { // 消息先到,通知后到 + if msgprocessor.IsNotification(msgprocessor.GetConversationIDByMsg(msg)) { // 消息先到,通知后到 var tips sdkws.GroupDismissedTips if p.UnmarshalNotificationElem(msg.Content, &tips) != nil { return err diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 3d6235149..753a45722 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -16,6 +16,7 @@ package conversation import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/msgprocessor" "google.golang.org/grpc" @@ -197,7 +198,7 @@ func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req // create conversation without notification for msg redis transfer. func (c *conversationServer) CreateSingleChatConversations(ctx context.Context, req *pbConversation.CreateSingleChatConversationsReq) (*pbConversation.CreateSingleChatConversationsResp, error) { var conversation tableRelation.ConversationModel - conversation.ConversationID = utils.GetConversationIDBySessionType(constant.SingleChatType, req.RecvID, req.SendID) + conversation.ConversationID = msgprocessor.GetConversationIDBySessionType(constant.SingleChatType, req.RecvID, req.SendID) conversation.ConversationType = constant.SingleChatType conversation.OwnerUserID = req.SendID conversation.UserID = req.RecvID diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index bbf09e13d..1a018c17e 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -17,6 +17,7 @@ package group import ( "context" "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/msgprocessor" "math/big" "math/rand" "strconv" @@ -829,7 +830,7 @@ func (s *groupServer) QuitGroup(ctx context.Context, req *pbGroup.QuitGroupReq) } func (s *groupServer) deleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error { - conevrsationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID) + conevrsationID := msgprocessor.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID) maxSeq, err := s.msgRpcClient.GetConversationMaxSeq(ctx, conevrsationID) if err != nil { return err @@ -889,7 +890,7 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbGroup.SetGroupInf go func() { nctx := mcontext.NewCtx("@@@" + mcontext.GetOperationID(ctx)) conversation := &pbConversation.ConversationReq{ - ConversationID: utils.GetConversationIDBySessionType(constant.SuperGroupChatType, req.GroupInfoForSet.GroupID), + ConversationID: msgprocessor.GetConversationIDBySessionType(constant.SuperGroupChatType, req.GroupInfoForSet.GroupID), ConversationType: constant.SuperGroupChatType, GroupID: req.GroupInfoForSet.GroupID, } diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index b3a122bb6..8ec54e7d8 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -16,6 +16,7 @@ package msg import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/msgprocessor" promePkg "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" "github.com/OpenIMSDK/protocol/constant" @@ -90,7 +91,7 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa ctx := mcontext.NewCtx("@@@" + mcontext.GetOperationID(nctx)) var atUserID []string conversation := &pbConversation.ConversationReq{ - ConversationID: utils.GetConversationIDByMsg(msg), + ConversationID: msgprocessor.GetConversationIDByMsg(msg), ConversationType: msg.SessionType, GroupID: msg.GroupID, } @@ -148,8 +149,8 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbMsg.SendMsgReq if err := m.messageVerification(ctx, req); err != nil { return nil, err } - var isSend bool = true - isNotification := utils.IsNotificationByMsg(req.MsgData) + var isSend = true + isNotification := msgprocessor.IsNotificationByMsg(req.MsgData) if !isNotification { isSend, err = m.modifyMessageByUserMessageReceiveOpt( ctx, diff --git a/internal/rpc/msg/sync_msg.go b/internal/rpc/msg/sync_msg.go index 7ae25f6cc..4aaa2bb13 100644 --- a/internal/rpc/msg/sync_msg.go +++ b/internal/rpc/msg/sync_msg.go @@ -16,6 +16,7 @@ package msg import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/msgprocessor" "github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/msg" @@ -34,7 +35,7 @@ func (m *msgServer) PullMessageBySeqs( resp.Msgs = make(map[string]*sdkws.PullMsgs) resp.NotificationMsgs = make(map[string]*sdkws.PullMsgs) for _, seq := range req.SeqRanges { - if !utils.IsNotification(seq.ConversationID) { + if !msgprocessor.IsNotification(seq.ConversationID) { conversation, err := m.Conversation.GetConversation(ctx, req.UserID, seq.ConversationID) if err != nil { log.ZError(ctx, "GetConversation error", err, "conversationID", seq.ConversationID) diff --git a/internal/tools/msg_test.go b/internal/tools/msg_test.go deleted file mode 100644 index 13ac4f97f..000000000 --- a/internal/tools/msg_test.go +++ /dev/null @@ -1,355 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tools - -import ( - "context" - "strconv" - - "go.mongodb.org/mongo-driver/bson" - - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" - "github.com/OpenIMSDK/protocol/constant" - "github.com/OpenIMSDK/tools/mcontext" - "github.com/OpenIMSDK/tools/utils" - - unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation" - - "testing" - "time" -) - -func GenMsgDoc(startSeq, stopSeq, delSeq, index int64, conversationID string) *unRelationTb.MsgDocModel { - msgDoc := &unRelationTb.MsgDocModel{DocID: conversationID + strconv.Itoa(int(index))} - for i := 0; i < 5000; i++ { - msgDoc.Msg = append(msgDoc.Msg, &unRelationTb.MsgInfoModel{}) - } - for i := startSeq; i <= stopSeq; i++ { - msg := &unRelationTb.MsgDataModel{ - SendID: "sendID1", - RecvID: "recvID1", - GroupID: "", - ClientMsgID: "xxx", - ServerMsgID: "xxx", - SenderPlatformID: 1, - SenderNickname: "testNickName", - SenderFaceURL: "testFaceURL", - SessionType: 1, - MsgFrom: 100, - ContentType: 101, - Content: "testContent", - Seq: i, - CreateTime: time.Now().Unix(), - Status: 1, - } - if i <= delSeq { - msg.SendTime = 10000 - } else { - msg.SendTime = utils.GetCurrentTimestampByMill() - } - msgDoc.Msg[i-1] = &unRelationTb.MsgInfoModel{Msg: msg} - } - return msgDoc -} - -func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { - operationID := "test" - rdb, err := cache.NewRedis() - if err != nil { - return - } - mgo, err := unrelation.NewMongo() - if err != nil { - return - } - cacheModel := cache.NewMsgCacheModel(rdb) - mongoClient := mgo.GetDatabase().Collection(unRelationTb.MsgDocModel{}.TableName()) - ctx := context.Background() - ctx = mcontext.SetOperationID(ctx, operationID) - - testUID1 := "test_del_id1" - var conversationID string - conversationID = utils.GetConversationIDBySessionType(constant.SuperGroupChatType, testUID1) - _, err = mongoClient.DeleteOne(ctx, bson.M{"doc_id": conversationID + ":" + strconv.Itoa(0)}) - if err != nil { - t.Error("DeleteOne failed") - return - } - - err = cacheModel.SetMaxSeq(ctx, conversationID, 600) - if err != nil { - t.Error("SetUserMaxSeq failed") - } - msgDoc := GenMsgDoc(1, 600, 200, 0, conversationID) - if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil { - t.Error("InsertOne failed", conversationID) - } - - msgTools, err := InitMsgTool() - if err != nil { - t.Error("init failed") - return - } - msgTools.ClearConversationsMsg(ctx, []string{conversationID}) - minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := msgTools.msgDatabase.GetConversationMinMaxSeqInMongoAndCache( - ctx, - conversationID, - ) - if err != nil { - t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") - return - } - if maxSeqCache != maxSeqMongo { - t.Error("checkMaxSeqWithMongo failed", conversationID) - } - if minSeqMongo != minSeqCache { - t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache) - } - if minSeqCache != 201 { - t.Error("test1 is not the same", "minSeq:", minSeqCache, "targetSeq", 201) - } - - /////// uid2 - - testUID2 := "test_del_id2" - conversationID = utils.GetConversationIDBySessionType(constant.SuperGroupChatType, testUID2) - - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(0)}) - if err != nil { - t.Error("delete failed") - } - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(1)}) - if err != nil { - t.Error("delete failed") - } - - err = cacheModel.SetMaxSeq(ctx, conversationID, 7000) - if err != nil { - t.Error("SetUserMaxSeq failed") - } - msgDoc = GenMsgDoc(1, 4999, 5000, 0, conversationID) - msgDoc2 := GenMsgDoc(5000, 7000, 6000, 1, conversationID) - if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil { - t.Error("InsertOne failed", testUID1) - } - if _, err := mongoClient.InsertOne(ctx, msgDoc2); err != nil { - t.Error("InsertOne failed", testUID1) - } - - msgTools.ClearConversationsMsg(ctx, []string{conversationID}) - minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetConversationMinMaxSeqInMongoAndCache( - ctx, - conversationID, - ) - if err != nil { - t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") - return - } - if maxSeqCache != maxSeqMongo { - t.Error("checkMaxSeqWithMongo failed", conversationID) - } - if minSeqMongo != minSeqCache { - t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache) - } - if minSeqCache != 6001 { - t.Error("test1 is not the same", "minSeq:", minSeqCache, "targetSeq", 201) - } - - /////// uid3 - testUID3 := "test_del_id3" - conversationID = utils.GetConversationIDBySessionType(constant.SuperGroupChatType, testUID3) - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(0)}) - if err != nil { - t.Error("delete failed") - } - err = cacheModel.SetMaxSeq(ctx, conversationID, 4999) - if err != nil { - t.Error("SetUserMaxSeq failed") - } - msgDoc = GenMsgDoc(1, 4999, 5000, 0, conversationID) - if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil { - t.Error("InsertOne failed", conversationID) - } - - msgTools.ClearConversationsMsg(ctx, []string{conversationID}) - minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetConversationMinMaxSeqInMongoAndCache( - ctx, - conversationID, - ) - if err != nil { - t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") - return - } - if maxSeqCache != maxSeqMongo { - t.Error("checkMaxSeqWithMongo failed", conversationID) - } - if minSeqMongo != minSeqCache { - t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache) - } - if minSeqCache != 5000 { - t.Error("test1 is not the same", "minSeq:", minSeqCache, "targetSeq", 201) - } - - //// uid4 - testUID4 := "test_del_id4" - conversationID = utils.GetConversationIDBySessionType(constant.SuperGroupChatType, testUID4) - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(0)}) - if err != nil { - t.Error("delete failed") - } - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(1)}) - if err != nil { - t.Error("delete failed") - } - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(2)}) - if err != nil { - t.Error("delete failed") - } - - err = cacheModel.SetMaxSeq(ctx, conversationID, 12000) - msgDoc = GenMsgDoc(1, 4999, 5000, 0, conversationID) - msgDoc2 = GenMsgDoc(5000, 9999, 10000, 1, conversationID) - msgDoc3 := GenMsgDoc(10000, 12000, 11000, 2, conversationID) - if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil { - t.Error("InsertOne failed", conversationID) - } - if _, err := mongoClient.InsertOne(ctx, msgDoc2); err != nil { - t.Error("InsertOne failed", conversationID) - } - if _, err := mongoClient.InsertOne(ctx, msgDoc3); err != nil { - t.Error("InsertOne failed", conversationID) - } - - msgTools.ClearConversationsMsg(ctx, []string{conversationID}) - if err != nil { - t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") - return - } - minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetConversationMinMaxSeqInMongoAndCache( - ctx, - conversationID, - ) - if err != nil { - t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") - return - } - if maxSeqCache != maxSeqMongo { - t.Error("checkMaxSeqWithMongo failed", conversationID) - } - if minSeqMongo != minSeqCache { - t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache) - } - if minSeqCache != 5000 { - t.Error("test1 is not the same", "minSeq:", minSeqCache) - } - - testUID5 := "test_del_id5" - conversationID = utils.GetConversationIDBySessionType(constant.SuperGroupChatType, testUID5) - - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(0)}) - if err != nil { - t.Error("delete failed") - } - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(1)}) - if err != nil { - t.Error("delete failed") - } - err = cacheModel.SetMaxSeq(ctx, conversationID, 9999) - msgDoc = GenMsgDoc(1, 4999, 5000, 0, conversationID) - msgDoc2 = GenMsgDoc(5000, 9999, 10000, 1, conversationID) - if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil { - t.Error("InsertOne failed", conversationID) - } - if _, err := mongoClient.InsertOne(ctx, msgDoc2); err != nil { - t.Error("InsertOne failed", conversationID) - } - - msgTools.ClearConversationsMsg(ctx, []string{conversationID}) - if err != nil { - t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") - return - } - minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetConversationMinMaxSeqInMongoAndCache( - ctx, - conversationID, - ) - if err != nil { - t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") - return - } - if maxSeqCache != maxSeqMongo { - t.Error("checkMaxSeqWithMongo failed", conversationID) - } - if minSeqMongo != minSeqCache { - t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache) - } - if minSeqCache != 10000 { - t.Error("test1 is not the same", "minSeq:", minSeqCache) - } - - testUID6 := "test_del_id6" - conversationID = utils.GetConversationIDBySessionType(constant.SuperGroupChatType, testUID6) - - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(0)}) - if err != nil { - t.Error("delete failed") - } - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(1)}) - if err != nil { - t.Error("delete failed") - } - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(2)}) - if err != nil { - t.Error("delete failed") - } - _, err = mongoClient.DeleteOne(ctx, bson.M{"uid": conversationID + ":" + strconv.Itoa(3)}) - if err != nil { - t.Error("delete failed") - } - msgDoc = GenMsgDoc(1, 4999, 5000, 0, conversationID) - msgDoc2 = GenMsgDoc(5000, 9999, 10000, 1, conversationID) - msgDoc3 = GenMsgDoc(10000, 14999, 13000, 2, conversationID) - msgDoc4 := GenMsgDoc(15000, 19999, 0, 3, conversationID) - if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil { - t.Error("InsertOne failed", testUID4) - } - if _, err := mongoClient.InsertOne(ctx, msgDoc2); err != nil { - t.Error("InsertOne failed", testUID4) - } - if _, err := mongoClient.InsertOne(ctx, msgDoc3); err != nil { - t.Error("InsertOne failed", testUID4) - } - if _, err := mongoClient.InsertOne(ctx, msgDoc4); err != nil { - t.Error("InsertOne failed", testUID4) - } - minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetConversationMinMaxSeqInMongoAndCache( - ctx, - conversationID, - ) - if err != nil { - t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed") - return - } - if maxSeqCache != maxSeqMongo { - t.Error("checkMaxSeqWithMongo failed", conversationID) - } - if minSeqMongo != minSeqCache { - t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache) - } - if minSeqCache != 13001 { - t.Error("test1 is not the same", "minSeq:", minSeqCache) - } -}