From 5f783fa07875f12d3bad67fa69e938ff59b2051d Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Fri, 26 May 2023 11:11:34 +0800 Subject: [PATCH] delete msg --- internal/msgtransfer/init.go | 3 + .../online_msg_to_mongo_handler.go | 11 - internal/rpc/msg/server.go | 3 + internal/rpc/msg/sync_msg.go | 4 +- internal/rpc/msg/verify.go | 60 ---- internal/tools/msg_test.go | 17 +- pkg/common/constant/constant.go | 4 +- pkg/common/convert/msg.go | 74 ++++ pkg/common/db/cache/msg.go | 10 +- pkg/common/db/controller/msg.go | 328 +++++++----------- pkg/common/db/table/unrelation/msg.go | 34 +- pkg/common/db/unrelation/mongo.go | 2 +- pkg/common/db/unrelation/msg.go | 208 ++++------- pkg/utils/utils.go | 30 ++ 14 files changed, 337 insertions(+), 451 deletions(-) create mode 100644 pkg/common/convert/msg.go diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index f2e36e02b..e7595bc99 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -43,6 +43,9 @@ func StartTransfer(prometheusPort int) error { if err != nil { return err } + if err := mongo.CreateMsgIndex(); err != nil { + return err + } client, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, openKeeper.WithFreq(time.Hour), openKeeper.WithRoundRobin(), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.UserName, config.Config.Zookeeper.Password), openKeeper.WithTimeout(10)) diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 85f7211cc..48ac22e7f 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -55,17 +55,6 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont } for _, v := range msgFromMQ.MsgData { switch v.ContentType { - case constant.DeleteMessageNotification: - deleteMessageTips := sdkws.DeleteMessageTips{} - err := proto.Unmarshal(v.Content, &deleteMessageTips) - if err != nil { - log.ZError(ctx, "tips unmarshal err:", err, "msg", msg) - continue - } - if totalUnExistSeqs, err := mc.msgDatabase.DelMsgBySeqs(ctx, deleteMessageTips.UserID, deleteMessageTips.Seqs); err != nil { - log.ZError(ctx, "DelMsgBySeqs", err, "userIDs", deleteMessageTips.UserID, "seqs", deleteMessageTips.Seqs, "totalUnExistSeqs", totalUnExistSeqs) - continue - } case constant.MsgRevokeNotification: var elem sdkws.NotificationElem if err := json.Unmarshal(v.Content, &elem); err != nil { diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 15392ada2..a6e95bdd0 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -55,6 +55,9 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e if err != nil { return err } + if err := mongo.CreateMsgIndex(); err != nil { + return err + } cacheModel := cache.NewMsgCacheModel(rdb) msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase()) extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase()) diff --git a/internal/rpc/msg/sync_msg.go b/internal/rpc/msg/sync_msg.go index da010b0f5..bf2beb6fc 100644 --- a/internal/rpc/msg/sync_msg.go +++ b/internal/rpc/msg/sync_msg.go @@ -15,7 +15,7 @@ func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessag resp.NotificationMsgs = make(map[string]*sdkws.PullMsgs) for _, seq := range req.SeqRanges { if !utils.IsNotification(seq.ConversationID) { - msgs, err := m.MsgDatabase.GetMsgBySeqsRange(ctx, seq.ConversationID, seq.Begin, seq.End, seq.Num) + msgs, err := m.MsgDatabase.GetMsgBySeqsRange(ctx, req.UserID, seq.ConversationID, seq.Begin, seq.End, seq.Num) if err != nil { log.ZWarn(ctx, "GetMsgBySeqsRange error", err, "conversationID", seq.ConversationID, "seq", seq) continue @@ -26,7 +26,7 @@ func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessag for i := seq.Begin; i <= seq.End; i++ { seqs = append(seqs, i) } - notificationMsgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, seq.ConversationID, seqs) + notificationMsgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, req.UserID, seq.ConversationID, seqs) if err != nil { log.ZWarn(ctx, "GetMsgBySeqs error", err, "conversationID", seq.ConversationID, "seq", seq) continue diff --git a/internal/rpc/msg/verify.go b/internal/rpc/msg/verify.go index 446363354..4a7a4ba9b 100644 --- a/internal/rpc/msg/verify.go +++ b/internal/rpc/msg/verify.go @@ -94,74 +94,14 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe return nil, nil } return nil, nil - case constant.GroupChatType: - if utils.IsContain(data.MsgData.SendID, config.Config.Manager.AppManagerUid) { - return nil, nil - } - userIDList, err := m.GetGroupMemberIDs(ctx, data.MsgData.GroupID) - if err != nil { - return nil, err - } - if utils.IsContain(data.MsgData.SendID, config.Config.Manager.AppManagerUid) { - return userIDList, nil - } - if data.MsgData.ContentType <= constant.NotificationEnd && data.MsgData.ContentType >= constant.NotificationBegin { - return userIDList, nil - } - if !utils.IsContain(data.MsgData.SendID, userIDList) { - return nil, errs.ErrNotInGroupYet.Wrap() - } - isMute, err := m.userIsMuteAndIsAdminInGroup(ctx, data.MsgData.GroupID, data.MsgData.SendID) - if err != nil { - return nil, err - } - if isMute { - return nil, errs.ErrMutedInGroup.Wrap() - } - - isMute, isAdmin, err := m.groupIsMuted(ctx, data.MsgData.GroupID, data.MsgData.SendID) - if err != nil { - return nil, err - } - if isAdmin { - return userIDList, nil - } - - if isMute { - return nil, errs.ErrMutedGroup.Wrap() - } - return userIDList, nil case constant.SuperGroupChatType: groupInfo, err := m.Group.GetGroupInfo(ctx, data.MsgData.GroupID) if err != nil { return nil, err } - if data.MsgData.ContentType == constant.AdvancedRevoke { - revokeMessage := new(MessageRevoked) - err := utils.JsonStringToStruct(string(data.MsgData.Content), revokeMessage) - if err != nil { - return nil, errs.ErrArgs.Wrap() - } - - if revokeMessage.RevokerID != revokeMessage.SourceMessageSendID { - resp, err := m.MsgDatabase.GetMsgBySeqs(ctx, data.MsgData.GroupID, []int64{int64(revokeMessage.Seq)}) - if err != nil { - return nil, err - } - if resp[0].ClientMsgID == revokeMessage.ClientMsgID && resp[0].Seq == int64(revokeMessage.Seq) { - revokeMessage.SourceMessageSendTime = resp[0].SendTime - revokeMessage.SourceMessageSenderNickname = resp[0].SenderNickname - revokeMessage.SourceMessageSendID = resp[0].SendID - data.MsgData.Content = []byte(utils.StructToJsonString(revokeMessage)) - } else { - return nil, errs.ErrData.Wrap("MsgData") - } - } - } if groupInfo.GroupType == constant.SuperGroup { return nil, nil } - userIDList, err := m.GetGroupMemberIDs(ctx, data.MsgData.GroupID) if err != nil { return nil, err diff --git a/internal/tools/msg_test.go b/internal/tools/msg_test.go index c1c61f08d..bf548903d 100644 --- a/internal/tools/msg_test.go +++ b/internal/tools/msg_test.go @@ -7,10 +7,8 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" - "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "go.mongodb.org/mongo-driver/bson" - "google.golang.org/protobuf/proto" unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation" @@ -22,10 +20,10 @@ import ( func GenMsgDoc(startSeq, stopSeq, delSeq, index int64, conversationID string) *unRelationTb.MsgDocModel { msgDoc := &unRelationTb.MsgDocModel{DocID: conversationID + strconv.Itoa(int(index))} for i := 0; i < 5000; i++ { - msgDoc.Msg = append(msgDoc.Msg, unRelationTb.MsgInfoModel{SendTime: 0, Msg: []byte{}}) + msgDoc.Msg = append(msgDoc.Msg, &unRelationTb.MsgInfoModel{}) } for i := startSeq; i <= stopSeq; i++ { - msg := sdkws.MsgData{ + msg := &unRelationTb.MsgDataModel{ SendID: "sendID1", RecvID: "recvID1", GroupID: "", @@ -37,20 +35,17 @@ func GenMsgDoc(startSeq, stopSeq, delSeq, index int64, conversationID string) *u SessionType: 1, MsgFrom: 100, ContentType: 101, - Content: []byte("testFaceURL"), + Content: "testContent", Seq: i, - SendTime: time.Now().Unix(), CreateTime: time.Now().Unix(), Status: 1, } - bytes, _ := proto.Marshal(&msg) - var sendTime int64 if i <= delSeq { - sendTime = 10000 + msg.SendTime = 10000 } else { - sendTime = utils.GetCurrentTimestampByMill() + msg.SendTime = utils.GetCurrentTimestampByMill() } - msgDoc.Msg[i-1] = unRelationTb.MsgInfoModel{SendTime: int64(sendTime), Msg: bytes} + msgDoc.Msg[i-1] = &unRelationTb.MsgInfoModel{Msg: msg} } return msgDoc } diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index f12589f92..00c1c8094 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -32,8 +32,8 @@ const ( CustomNotification = 203 //SysRelated - NotificationBegin = 1000 - DeleteMessageNotification = 1100 + NotificationBegin = 1000 + FriendApplicationApprovedNotification = 1201 //add_friend_response FriendApplicationRejectedNotification = 1202 //add_friend_response FriendApplicationNotification = 1203 //add_friend diff --git a/pkg/common/convert/msg.go b/pkg/common/convert/msg.go new file mode 100644 index 000000000..1476ff6ee --- /dev/null +++ b/pkg/common/convert/msg.go @@ -0,0 +1,74 @@ +package convert + +import ( + "encoding/json" + + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" +) + +func MsgPb2DB(msg *sdkws.MsgData) unrelation.MsgDataModel { + var msgDataModel unrelation.MsgDataModel + msgDataModel.SendID = msg.SendID + msgDataModel.RecvID = msg.RecvID + msgDataModel.GroupID = msg.GroupID + msgDataModel.ClientMsgID = msg.ClientMsgID + msgDataModel.ServerMsgID = msg.ServerMsgID + msgDataModel.SenderPlatformID = msg.SenderPlatformID + msgDataModel.SenderNickname = msg.SenderNickname + msgDataModel.SenderFaceURL = msg.SenderFaceURL + msgDataModel.SessionType = msg.SessionType + msgDataModel.MsgFrom = msg.MsgFrom + msgDataModel.ContentType = msg.ContentType + msgDataModel.Content = string(msg.Content) + msgDataModel.Seq = msg.Seq + msgDataModel.SendTime = msg.SendTime + msgDataModel.CreateTime = msg.CreateTime + msgDataModel.Status = msg.Status + msgDataModel.Options = msg.Options + msgDataModel.OfflinePush = &unrelation.OfflinePushModel{ + Title: msg.OfflinePushInfo.Title, + Desc: msg.OfflinePushInfo.Desc, + Ex: msg.OfflinePushInfo.Ex, + IOSPushSound: msg.OfflinePushInfo.IOSPushSound, + IOSBadgeCount: msg.OfflinePushInfo.IOSBadgeCount, + } + msgDataModel.AtUserIDList = msg.AtUserIDList + msgDataModel.AttachedInfo = msg.AttachedInfo + msgDataModel.Ex = msg.Ex + return msgDataModel + +} + +func MsgDB2Pb(msgModel *unrelation.MsgDataModel) *sdkws.MsgData { + var msg sdkws.MsgData + msg.SendID = msgModel.SendID + msg.RecvID = msgModel.RecvID + msg.GroupID = msgModel.GroupID + msg.ClientMsgID = msgModel.ClientMsgID + msg.ServerMsgID = msgModel.ServerMsgID + msg.SenderPlatformID = msgModel.SenderPlatformID + msg.SenderNickname = msgModel.SenderNickname + msg.SenderFaceURL = msgModel.SenderFaceURL + msg.SessionType = msgModel.SessionType + msg.MsgFrom = msgModel.MsgFrom + msg.ContentType = msgModel.ContentType + b, _ := json.Marshal(msgModel.Content) + msg.Content = b + msg.Seq = msgModel.Seq + msg.SendTime = msgModel.SendTime + msg.CreateTime = msgModel.CreateTime + msg.Status = msgModel.Status + msg.Options = msgModel.Options + msg.OfflinePushInfo = &sdkws.OfflinePushInfo{ + Title: msgModel.OfflinePush.Title, + Desc: msgModel.OfflinePush.Desc, + Ex: msgModel.OfflinePush.Ex, + IOSPushSound: msgModel.OfflinePush.IOSPushSound, + IOSBadgeCount: msgModel.OfflinePush.IOSBadgeCount, + } + msg.AtUserIDList = msgModel.AtUserIDList + msg.AttachedInfo = msgModel.AttachedInfo + msg.Ex = msgModel.Ex + return &msg +} diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 4f5afd05e..5d6574568 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -91,7 +91,7 @@ type MsgModel interface { LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error - GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*sdkws.MsgData, error) + GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) DeleteMsgByConversationIDAndSeq(ctx context.Context, docID string, seq int64) MsgModel } @@ -558,8 +558,8 @@ func (c msgCache) getMsgReadCacheKey(docID string, seq int64) string { return messageReadCache + docID + "_" + strconv.Itoa(int(seq)) } -func (c *msgCache) getMsgsIndex(msg *sdkws.MsgData, keys []string) (int, error) { - key := c.getMsgReadCacheKey(utils.GetConversationIDByMsg(msg), msg.Seq) +func (c *msgCache) getMsgsIndex(msg *unRelationTb.MsgInfoModel, keys []string) (int, error) { + key := c.getMsgReadCacheKey(utils.GetConversationIDByMsgModel(msg.Msg), msg.Msg.Seq) for i, _key := range keys { if key == _key { return i, nil @@ -568,12 +568,12 @@ func (c *msgCache) getMsgsIndex(msg *sdkws.MsgData, keys []string) (int, error) return 0, errIndex } -func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*sdkws.MsgData, error) { +func (c *msgCache) GetMsgsByConversationIDAndSeq(ctx context.Context, docID string, seqs []int64) ([]*unRelationTb.MsgInfoModel, error) { var keys []string for _, seq := range seqs { keys = append(keys, c.getMsgReadCacheKey(docID, seq)) } - return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getMsgsIndex, func(ctx context.Context) ([]*sdkws.MsgData, error) { + return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getMsgsIndex, func(ctx context.Context) ([]*unRelationTb.MsgInfoModel, error) { return c.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs) }) } diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 24faab98e..22628179b 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -4,11 +4,11 @@ import ( "fmt" "sort" "strconv" - "sync" "time" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/convert" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation" @@ -16,7 +16,6 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" - "github.com/gogo/protobuf/sortkeys" "context" "errors" @@ -37,15 +36,15 @@ type CommonMsgDatabase interface { DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error // incrSeq然后批量插入缓存 BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error) - // 删除消息 返回不存在的seqList - DelMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (totalUnExistSeqs []int64, err error) + // 通过seqList获取mongo中写扩散消息 - GetMsgBySeqsRange(ctx context.Context, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) + GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) // 通过seqList获取大群在 mongo里面的消息 - GetMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) + GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) + // 删除会话消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache) DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error - CleanUpUserConversationsMsgs(ctx context.Context, userID string, conversationIDs []string) + SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) @@ -168,8 +167,8 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", msg.Revoke) } else if msg.DelList != nil { res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "del_list", msg.DelList) - } else if msg.ReadList != nil { - res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "read_list", msg.ReadList) + // } else if msg.ReadList != nil { + // res, err = db.msgDocDatabase.PushUnique(ctx, docID, index, "read_list", msg.ReadList) } else { return false, errs.ErrArgs.Wrap("msg all field is nil") } @@ -194,7 +193,7 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI } doc := unRelationTb.MsgDocModel{ DocID: docID, - Msg: make([]unRelationTb.MsgInfoModel, num), + Msg: make([]*unRelationTb.MsgInfoModel, num), } var insert int for j := i; j < len(msgList); j++ { @@ -203,15 +202,15 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI break } insert++ - doc.Msg[getIndex(seq)] = *msgList[j] + doc.Msg[getIndex(seq)] = msgList[j] } for i, model := range doc.Msg { if model.DelList == nil { doc.Msg[i].DelList = []string{} } - if model.ReadList == nil { - doc.Msg[i].ReadList = []string{} - } + // if model.ReadList == nil { + // doc.Msg[i].ReadList = []string{} + // } } if err := db.msgDocDatabase.Create(ctx, &doc); err != nil { if mongo.IsDuplicateKeyError(err) { @@ -364,76 +363,17 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa return lastMaxSeq, isNew, utils.Wrap(err, "") } -func (db *commonMsgDatabase) DelMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (totalUnExistSeqs []int64, err error) { - sortkeys.Int64s(seqs) - docIDSeqsMap := db.msg.GetDocIDSeqsMap(conversationID, seqs) - lock := sync.Mutex{} - var wg sync.WaitGroup - wg.Add(len(docIDSeqsMap)) - for k, v := range docIDSeqsMap { - go func(docID string, seqs []int64) { - defer wg.Done() - unExistSeqList, err := db.DelMsgBySeqsInOneDoc(ctx, docID, seqs) - if err != nil { - return - } - lock.Lock() - totalUnExistSeqs = append(totalUnExistSeqs, unExistSeqList...) - lock.Unlock() - }(k, v) - } - return totalUnExistSeqs, nil -} - -func (db *commonMsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) { - seqMsgs, indexes, unExistSeqs, err := db.msgDocDatabase.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs) - if err != nil { - return nil, err - } - for i, v := range seqMsgs { - if err = db.msgDocDatabase.UpdateMsgStatusByIndexInOneDoc(ctx, docID, v, indexes[i], constant.MsgDeleted); err != nil { - log.ZError(ctx, "UpdateMsgStatusByIndexInOneDoc", err, "docID", docID, "msg", v, "index", indexes[i]) - } - } - return unExistSeqs, nil -} - -func (db *commonMsgDatabase) GetNewestMsg(ctx context.Context, conversationID string) (msgPb *sdkws.MsgData, err error) { - msgInfo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID) - if err != nil { - return nil, err - } - return db.unmarshalMsg(msgInfo) -} - -func (db *commonMsgDatabase) GetOldestMsg(ctx context.Context, conversationID string) (msgPb *sdkws.MsgData, err error) { - msgInfo, err := db.msgDocDatabase.GetOldestMsg(ctx, conversationID) - if err != nil { - return nil, err - } - return db.unmarshalMsg(msgInfo) -} - -func (db *commonMsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) { - msgPb = &sdkws.MsgData{} - // todo: unmarshal - //err = proto.Unmarshal(msgInfo.Msg, msgPb) - //if err != nil { - // return nil, utils.Wrap(err, "") - //} - return msgPb, nil -} - func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { - m := db.msg.GetDocIDSeqsMap(conversationID, seqs) var totalUnExistSeqs []int64 - for docID, seqs := range m { + for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) { log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs) - seqMsgs, unexistSeqs, err := db.findMsgBySeq(ctx, docID, seqs) + msgs, unexistSeqs, err := db.findMsgInfoBySeq(ctx, docID, seqs) if err != nil { return nil, err } - totalMsgs = append(totalMsgs, seqMsgs...) + for _, msg := range msgs { + totalMsgs = append(totalMsgs, convert.MsgDB2Pb(msg.Msg)) + } totalUnExistSeqs = append(totalUnExistSeqs, unexistSeqs...) } for _, unexistSeq := range totalUnExistSeqs { @@ -442,7 +382,7 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID st return totalMsgs, nil } -func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversationID string, delNums, rangeBegin, begin int64) (seqMsgs []*sdkws.MsgData, err error) { +func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversationID string, delNums, rangeBegin, begin int64) (seqMsgs []*unRelationTb.MsgDataModel, err error) { var reFetchSeqs []int64 if delNums > 0 { newBeginSeq := rangeBegin - delNums @@ -457,18 +397,18 @@ func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversatio return } if len(reFetchSeqs) > 0 { - m := db.msg.GetDocIDSeqsMap(conversationID, reFetchSeqs) - for docID, seqs := range m { - msgs, _, err := db.findMsgBySeq(ctx, docID, seqs) - if err != nil { - return nil, err - } - for _, msg := range msgs { - if msg.Status != constant.MsgDeleted { - seqMsgs = append(seqMsgs, msg) - } - } - } + // m := db.msg.GetDocIDSeqsMap(conversationID, reFetchSeqs) + // for docID, seqs := range m { + // msgs, _, err := db.findMsgInfoBySeq(ctx, docID, seqs) + // if err != nil { + // return nil, err + // } + // for _, msg := range msgs { + // if msg.Status != constant.MsgDeleted { + // seqMsgs = append(seqMsgs, msg) + // } + // } + // } } if len(seqMsgs) < int(delNums) { seqMsgs2, err := db.refetchDelSeqsMsgs(ctx, conversationID, delNums-int64(len(seqMsgs)), rangeBegin-1, begin) @@ -480,19 +420,19 @@ func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversatio return seqMsgs, nil } -func (db *commonMsgDatabase) findMsgBySeq(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, unExistSeqs []int64, err error) { +func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, docID string, seqs []int64) (totalMsgs []*unRelationTb.MsgInfoModel, unExistSeqs []int64, err error) { msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs) if err != nil { return nil, nil, err } - log.ZDebug(ctx, "findMsgBySeq", "docID", docID, "seqs", seqs, "len(msgs)", len(msgs)) - seqMsgs = append(seqMsgs, msgs...) + log.ZDebug(ctx, "findMsgInfoBySeq", "docID", docID, "seqs", seqs, "len(msgs)", len(msgs)) + totalMsgs = append(totalMsgs, msgs...) if len(msgs) == 0 { unExistSeqs = seqs } else { for _, seq := range seqs { for i, msg := range msgs { - if seq == msg.Seq { + if seq == msg.Msg.Seq { break } if i == len(msgs)-1 { @@ -501,50 +441,27 @@ func (db *commonMsgDatabase) findMsgBySeq(ctx context.Context, docID string, seq } } } - msgs, _, unExistSeqs, err = db.msgDocDatabase.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, unExistSeqs) - if err != nil { - return nil, nil, err - } - seqMsgs = append(seqMsgs, msgs...) - return seqMsgs, unExistSeqs, nil + return totalMsgs, unExistSeqs, nil } -func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, conversationID string, allSeqs []int64, begin, end int64) (seqMsgs []*sdkws.MsgData, err error) { +func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID string, conversationID string, allSeqs []int64, begin, end int64) (seqMsgs []*sdkws.MsgData, err error) { log.ZDebug(ctx, "getMsgBySeqsRange", "conversationID", conversationID, "allSeqs", allSeqs, "begin", begin, "end", end) - m := db.msg.GetDocIDSeqsMap(conversationID, allSeqs) var totalNotExistSeqs []int64 // mongo index - for docID, seqs := range m { + for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) { log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs) - msgs, notExistSeqs, err := db.findMsgBySeq(ctx, docID, seqs) + msgs, notExistSeqs, err := db.findMsgInfoBySeq(ctx, docID, seqs) if err != nil { return nil, err } log.ZDebug(ctx, "getMsgBySeqsRange", "unExistSeqs", notExistSeqs, "msgs", len(msgs)) - seqMsgs = append(seqMsgs, msgs...) + for _, msg := range msgs { + seqMsgs = append(seqMsgs, convert.MsgDB2Pb(msg.Msg)) + } totalNotExistSeqs = append(totalNotExistSeqs, notExistSeqs...) } log.ZDebug(ctx, "getMsgBySeqsRange", "totalNotExistSeqs", totalNotExistSeqs) - // find by next doc - var missedSeqs []int64 - if len(totalNotExistSeqs) > 0 { - m = db.msg.GetDocIDSeqsMap(conversationID, totalNotExistSeqs) - for docID, seqs := range m { - docID = db.msg.ToNextDoc(docID) - msgs, _, unExistSeqs, err := db.msgDocDatabase.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs) - if err != nil { - missedSeqs = append(missedSeqs, seqs...) - log.ZError(ctx, "get message from mongo exception", err, "docID", docID, "seqs", seqs) - continue - } - missedSeqs = append(missedSeqs, unExistSeqs...) - seqMsgs = append(seqMsgs, msgs...) - if len(unExistSeqs) > 0 { - log.ZWarn(ctx, "some seqs lost in mongo", err, "docID", docID, "seqs", seqs, "unExistSeqs", unExistSeqs) - } - } - } - seqMsgs = append(seqMsgs, db.msg.GenExceptionMessageBySeqs(missedSeqs)...) + seqMsgs = append(seqMsgs, db.msg.GenExceptionMessageBySeqs(totalNotExistSeqs)...) var delSeqs []int64 for _, msg := range seqMsgs { if msg.Status == constant.MsgDeleted { @@ -556,7 +473,9 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, conversation if err != nil { log.ZWarn(ctx, "refetchDelSeqsMsgs", err, "delSeqs", delSeqs, "begin", begin) } - seqMsgs = append(seqMsgs, msgs...) + for _, msg := range msgs { + seqMsgs = append(seqMsgs, convert.MsgDB2Pb(msg)) + } } // sort by seq if len(totalNotExistSeqs) > 0 || len(delSeqs) > 0 { @@ -565,7 +484,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, conversation return seqMsgs, nil } -func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) { +func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) { var seqs []int64 for i := end; i > end-num; i-- { if i >= begin { @@ -587,7 +506,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, conversation // get from cache or db prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs)) if len(failedSeqs) > 0 { - mongoMsgs, err := db.getMsgBySeqsRange(ctx, conversationID, failedSeqs, begin, end) + mongoMsgs, err := db.getMsgBySeqsRange(ctx, userID, conversationID, failedSeqs, begin, end) if err != nil { prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) return nil, err @@ -598,8 +517,25 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, conversation return successMsgs, nil } -func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (successMsgs []*sdkws.MsgData, err error) { - successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs) +func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (successMsgs []*sdkws.MsgData, err error) { + userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID) + if err != nil { + return nil, err + } + minSeq, err := db.cache.GetMinSeq(ctx, conversationID) + if err != nil { + return nil, err + } + if userMinSeq < minSeq { + minSeq = userMinSeq + } + var newSeqs []int64 + for _, seq := range seqs { + if seq >= minSeq { + newSeqs = append(newSeqs, seq) + } + } + successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, newSeqs) if err != nil { if err != redis.Nil { prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) @@ -608,7 +544,7 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, conversationID st } prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs)) if len(failedSeqs) > 0 { - mongoMsgs, err := db.getMsgBySeqs(ctx, conversationID, seqs) + mongoMsgs, err := db.getMsgBySeqs(ctx, conversationID, failedSeqs) if err != nil { prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) return nil, err @@ -621,7 +557,8 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, conversationID st func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error { var delStruct delMsgRecursionStruct - minSeq, err := db.deleteMsgRecursion(ctx, conversationID, unRelationTb.OldestList, &delStruct, remainTime) + var skip int64 + minSeq, err := db.deleteMsgRecursion(ctx, conversationID, skip, &delStruct, remainTime) if err != nil { return err } @@ -653,69 +590,56 @@ func (d *delMsgRecursionStruct) getSetMinSeq() int64 { // recursion 删除list并且返回设置的最小seq func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversationID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) { // find from oldest list - //msgs, err := db.msgDocDatabase.GetMsgsByIndex(ctx, conversationID, index) - //if err != nil || msgs.DocID == "" { - // if err != nil { - // if err == unrelation.ErrMsgListNotExist { - // log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index) - // } else { - // log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index) - // } - // } - // // 获取报错,或者获取不到了,物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归 - // err = db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs) - // if err != nil { - // return 0, err - // } - // return delStruct.getSetMinSeq() + 1, nil - //} - //log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgs.DocID, "len", len(msgs.Msg)) - //if int64(len(msgs.Msg)) > db.msg.GetSingleGocMsgNum() { - // log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgs.Msg), "docID:", msgs.DocID) - //} - //if msgs.Msg[len(msgs.Msg)-1].SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() && msgs.IsFull() { - // delStruct.delDocIDs = append(delStruct.delDocIDs, msgs.DocID) - // lastMsgPb := &sdkws.MsgData{} - // err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb) - // if err != nil { - // log.ZError(ctx, "proto.Unmarshal failed", err, "index", len(msgs.Msg)-1, "docID", msgs.DocID) - // return 0, utils.Wrap(err, "proto.Unmarshal failed") - // } - // delStruct.minSeq = lastMsgPb.Seq - //} else { - // var hasMarkDelFlag bool - // for i, msg := range msgs.Msg { - // if msg.SendTime != 0 { - // msgPb := &sdkws.MsgData{} - // err = proto.Unmarshal(msg.Msg, msgPb) - // if err != nil { - // log.ZError(ctx, "proto.Unmarshal failed", err, "index", i, "docID", msgs.DocID) - // return 0, utils.Wrap(err, "proto.Unmarshal failed") - // } - // if utils.GetCurrentTimestampByMill() > msg.SendTime+(remainTime*1000) { - // msgPb.Status = constant.MsgDeleted - // bytes, _ := proto.Marshal(msgPb) - // msg.Msg = bytes - // msg.SendTime = 0 - // hasMarkDelFlag = true - // } else { - // // 到本条消息不需要删除, minSeq置为这条消息的seq - // if err := db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs); err != nil { - // return 0, err - // } - // if hasMarkDelFlag { - // if err := db.msgDocDatabase.UpdateOneDoc(ctx, msgs); err != nil { - // return delStruct.getSetMinSeq(), err - // } - // } - // return msgPb.Seq, nil - // } - // } - // } - //} - //// 继续递归 index+1 - //seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime) - //return seq, err + msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1) + if err != nil || msgDocModel.DocID == "" { + if err != nil { + if err == unrelation.ErrMsgListNotExist { + log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index) + } else { + log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index) + } + } + // 获取报错,或者获取不到了,物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归 + err = db.msgDocDatabase.DeleteDocs(ctx, delStruct.delDocIDs) + if err != nil { + return 0, err + } + return delStruct.getSetMinSeq() + 1, nil + } + log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgDocModel.DocID, "len", len(msgDocModel.Msg)) + if int64(len(msgDocModel.Msg)) > db.msg.GetSingleGocMsgNum() { + log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgDocModel.Msg), "docID:", msgDocModel.DocID) + } + if msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() && msgDocModel.IsFull() { + delStruct.delDocIDs = append(delStruct.delDocIDs, msgDocModel.DocID) + delStruct.minSeq = msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.Seq + } else { + var hasMarkDelFlag bool + var delMsgIndexs []int + for i, MsgInfoModel := range msgDocModel.Msg { + if MsgInfoModel != nil { + if utils.GetCurrentTimestampByMill() > MsgInfoModel.Msg.SendTime+(remainTime*1000) { + delMsgIndexs = append(delMsgIndexs, i) + hasMarkDelFlag = true + } else { + // 到本条消息不需要删除, minSeq置为这条消息的seq + if err := db.msgDocDatabase.DeleteDocs(ctx, delStruct.delDocIDs); err != nil { + return 0, err + } + if hasMarkDelFlag { + // mark del all delMsgIndexs + // if err := db.msgDocDatabase.UpdateOneDoc(ctx, msgDocModel); err != nil { + // return delStruct.getSetMinSeq(), err + // } + } + return MsgInfoModel.Msg.Seq, nil + } + } + } + } + // 继续递归 index+1 + seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime) + return seq, err return 0, nil } @@ -805,20 +729,12 @@ func (db *commonMsgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversation if err != nil { return } - msgPb, err := db.unmarshalMsg(oldestMsgMongo) - if err != nil { - return - } - minSeqMongo = msgPb.Seq + minSeqMongo = oldestMsgMongo.Msg.Seq newestMsgMongo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID) if err != nil { return } - msgPb, err = db.unmarshalMsg(newestMsgMongo) - if err != nil { - return - } - maxSeqMongo = msgPb.Seq + maxSeqMongo = newestMsgMongo.Msg.Seq return } diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go index 139350441..2a9d99583 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/unrelation/msg.go @@ -2,10 +2,11 @@ package unrelation import ( "context" - "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" - "go.mongodb.org/mongo-driver/mongo" "strconv" "strings" + + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" + "go.mongodb.org/mongo-driver/mongo" ) const ( @@ -16,8 +17,8 @@ const ( ) type MsgDocModel struct { - DocID string `bson:"doc_id"` - Msg []MsgInfoModel `bson:"msgs"` + DocID string `bson:"doc_id"` + Msg []*MsgInfoModel `bson:"msgs"` } type RevokeModel struct { @@ -59,10 +60,9 @@ type MsgDataModel struct { } type MsgInfoModel struct { - Msg *MsgDataModel `bson:"msg"` - Revoke *RevokeModel `bson:"revoke"` - DelList []string `bson:"del_list"` - ReadList []string `bson:"read_list"` + Msg *MsgDataModel `bson:"msg"` + Revoke *RevokeModel `bson:"revoke"` + DelList []string `bson:"del_list"` } type MsgDocModelInterface interface { @@ -72,15 +72,12 @@ type MsgDocModelInterface interface { PushUnique(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error) UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error IsExistDocID(ctx context.Context, docID string) (bool, error) - UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error) - GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) ([]*sdkws.MsgData, error) - GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) + GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) ([]*MsgInfoModel, error) GetNewestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error) GetOldestMsg(ctx context.Context, conversationID string) (*MsgInfoModel, error) - Delete(ctx context.Context, docIDs []string) error - GetMsgsByIndex(ctx context.Context, conversationID string, index int64) (*MsgDocModel, error) - UpdateOneDoc(ctx context.Context, msg *MsgDocModel) error + DeleteDocs(ctx context.Context, docIDs []string) error + GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*MsgDocModel, error) } func (MsgDocModel) TableName() string { @@ -92,8 +89,7 @@ func (MsgDocModel) GetSingleGocMsgNum() int64 { } func (m *MsgDocModel) IsFull() bool { - //return m.Msg[len(m.Msg)-1].SendTime != 0 - return false + return m.Msg[len(m.Msg)-1].Msg != nil } func (m MsgDocModel) GetDocID(conversationID string, seq int64) string { @@ -153,9 +149,9 @@ func (m MsgDocModel) indexGen(conversationID string, seqSuffix int64) string { func (MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) { for _, v := range seqs { - msg := new(sdkws.MsgData) - msg.Seq = v - exceptionMsg = append(exceptionMsg, msg) + msgModel := new(sdkws.MsgData) + msgModel.Seq = v + exceptionMsg = append(exceptionMsg, msgModel) } return exceptionMsg } diff --git a/pkg/common/db/unrelation/mongo.go b/pkg/common/db/unrelation/mongo.go index 4491d9d9b..fc98b85eb 100644 --- a/pkg/common/db/unrelation/mongo.go +++ b/pkg/common/db/unrelation/mongo.go @@ -65,7 +65,7 @@ func (m *Mongo) GetDatabase() *mongo.Database { } func (m *Mongo) CreateMsgIndex() error { - return m.createMongoIndex(unrelation.Msg, false, "uid") + return m.createMongoIndex(unrelation.Msg, true, "doc_id") } func (m *Mongo) CreateSuperGroupIndex() error { diff --git a/pkg/common/db/unrelation/msg.go b/pkg/common/db/unrelation/msg.go index 6a290d582..870d07ab4 100644 --- a/pkg/common/db/unrelation/msg.go +++ b/pkg/common/db/unrelation/msg.go @@ -6,6 +6,7 @@ import ( "fmt" table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" @@ -17,23 +18,14 @@ import ( ) var ErrMsgListNotExist = errors.New("user not have msg in mongoDB") -var ErrMsgNotFound = errors.New("msg not found") type MsgMongoDriver struct { MsgCollection *mongo.Collection - msg table.MsgDocModel + model table.MsgDocModel } func NewMsgMongoDriver(database *mongo.Database) table.MsgDocModelInterface { collection := database.Collection(table.MsgDocModel{}.TableName()) - indexModel := mongo.IndexModel{ - Keys: bson.M{"doc_id": 1}, - Options: options.Index().SetUnique(true), - } - _, err := collection.Indexes().CreateOne(context.Background(), indexModel) - if err != nil { - panic(err) - } return &MsgMongoDriver{MsgCollection: collection} } @@ -110,45 +102,17 @@ func (m *MsgMongoDriver) FindOneByDocID(ctx context.Context, docID string) (*tab return doc, err } -func (m *MsgMongoDriver) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) { - //doc, err := m.FindOneByDocID(ctx, docID) - //if err != nil { - // return nil, nil, nil, err - //} - //singleCount := 0 - //var hasSeqList []int64 - //for i := 0; i < len(doc.Msg); i++ { - // var msg sdkws.MsgData - // if err := proto.Unmarshal(doc.Msg[i].Msg, &msg); err != nil { - // return nil, nil, nil, err - // } - // if utils.Contain(msg.Seq, seqs...) { - // indexes = append(indexes, i) - // seqMsgs = append(seqMsgs, &msg) - // hasSeqList = append(hasSeqList, msg.Seq) - // singleCount++ - // if singleCount == len(seqs) { - // break - // } - // } - //} - //for _, i := range seqs { - // if utils.Contain(i, hasSeqList...) { - // continue - // } - // unExistSeqs = append(unExistSeqs, i) - //} - return seqMsgs, indexes, unExistSeqs, nil -} - -func (m *MsgMongoDriver) GetMsgsByIndex(ctx context.Context, conversationID string, index int64) (*table.MsgDocModel, error) { - findOpts := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"doc_id": 1}) +func (m *MsgMongoDriver) GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*table.MsgDocModel, error) { + if sort != 1 && sort != -1 { + return nil, errs.ErrArgs.Wrap("mongo sort must be 1 or -1") + } + findOpts := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"doc_id": sort}) cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)}}, findOpts) if err != nil { return nil, utils.Wrap(err, "") } var msgs []table.MsgDocModel - err = cursor.All(context.Background(), &msgs) + err = cursor.All(ctx, &msgs) if err != nil { return nil, utils.Wrap(err, fmt.Sprintf("cursor is %s", cursor.Current.String())) } @@ -159,53 +123,38 @@ func (m *MsgMongoDriver) GetMsgsByIndex(ctx context.Context, conversationID stri } func (m *MsgMongoDriver) GetNewestMsg(ctx context.Context, conversationID string) (*table.MsgInfoModel, error) { - var msgDocs []table.MsgDocModel - cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": bson.M{"$regex": fmt.Sprintf("^%s:", conversationID)}}, options.Find().SetLimit(1).SetSort(bson.M{"doc_id": -1})) - if err != nil { - return nil, utils.Wrap(err, "") - } - err = cursor.All(ctx, &msgDocs) - if err != nil { - return nil, utils.Wrap(err, "") - } - if len(msgDocs) > 0 { - if len(msgDocs[0].Msg) > 0 { - return &msgDocs[0].Msg[len(msgDocs[0].Msg)-1], nil + var skip int64 = 0 + for { + msgDocModel, err := m.GetMsgDocModelByIndex(ctx, conversationID, skip, -1) + if err != nil { + return nil, err + } + for i := len(msgDocModel.Msg) - 1; i >= 0; i-- { + if msgDocModel.Msg[i].Msg != nil { + return msgDocModel.Msg[i], nil + } } - return nil, errs.ErrRecordNotFound.Wrap("len(msgDocs[0].Msgs) < 0") + skip++ } - return nil, ErrMsgNotFound } func (m *MsgMongoDriver) GetOldestMsg(ctx context.Context, conversationID string) (*table.MsgInfoModel, error) { - //var msgDocs []table.MsgDocModel - //cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": bson.M{"$regex": fmt.Sprintf("^%s:", conversationID)}}, options.Find().SetLimit(1).SetSort(bson.M{"doc_id": 1})) - //if err != nil { - // return nil, err - //} - //err = cursor.All(ctx, &msgDocs) - //if err != nil { - // return nil, utils.Wrap(err, "") - //} - //var oldestMsg table.MsgInfoModel - //if len(msgDocs) > 0 { - // for _, v := range msgDocs[0].Msg { - // if v.SendTime != 0 { - // oldestMsg = v - // break - // } - // } - // if len(oldestMsg.Msg) == 0 { - // if len(msgDocs[0].Msg) > 0 { - // oldestMsg = msgDocs[0].Msg[0] - // } - // } - // return &oldestMsg, nil - //} - return nil, ErrMsgNotFound + var skip int64 = 0 + for { + msgDocModel, err := m.GetMsgDocModelByIndex(ctx, conversationID, skip, 1) + if err != nil { + return nil, err + } + for i, v := range msgDocModel.Msg { + if v.Msg != nil { + return msgDocModel.Msg[i], nil + } + } + skip++ + } } -func (m *MsgMongoDriver) Delete(ctx context.Context, docIDs []string) error { +func (m *MsgMongoDriver) DeleteDocs(ctx context.Context, docIDs []string) error { if docIDs == nil { return nil } @@ -213,56 +162,47 @@ func (m *MsgMongoDriver) Delete(ctx context.Context, docIDs []string) error { return err } -func (m *MsgMongoDriver) UpdateOneDoc(ctx context.Context, msg *table.MsgDocModel) error { - _, err := m.MsgCollection.UpdateOne(ctx, bson.M{"doc_id": msg.DocID}, bson.M{"$set": bson.M{"msgs": msg.Msg}}) - return err -} - -func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) (msgs []*sdkws.MsgData, err error) { - //beginSeq, endSeq := utils.GetSeqsBeginEnd(seqs) - //beginIndex := m.msg.GetMsgIndex(beginSeq) - //num := endSeq - beginSeq + 1 - //pipeline := bson.A{ - // bson.M{ - // "$match": bson.M{"doc_id": docID}, - // }, - // bson.M{ - // "$project": bson.M{ - // "msgs": bson.M{ - // "$slice": bson.A{"$msgs", beginIndex, num}, - // }, - // }, - // }, - //} - //cursor, err := m.MsgCollection.Aggregate(ctx, pipeline) - //if err != nil { - // return nil, errs.Wrap(err) - //} - //defer cursor.Close(ctx) - //var doc table.MsgDocModel - //i := 0 - //for cursor.Next(ctx) { - // err := cursor.Decode(&doc) - // if err != nil { - // return nil, err - // } - // if i == 0 { - // break - // } - //} - //log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg), "docID", docID) - //for _, v := range doc.Msg { - // var msg sdkws.MsgData - // if err := proto.Unmarshal(v.Msg, &msg); err != nil { - // return nil, err - // } - // if msg.Seq >= beginSeq && msg.Seq <= endSeq { - // log.ZDebug(ctx, "find msg", "msg", &msg) - // msgs = append(msgs, &msg) - // } else { - // log.ZWarn(ctx, "this msg is at wrong position", nil, "msg", &msg) - // } - //} +func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(ctx context.Context, docID string, seqs []int64) (msgs []*table.MsgInfoModel, err error) { + beginSeq, endSeq := utils.GetSeqsBeginEnd(seqs) + beginIndex := m.model.GetMsgIndex(beginSeq) + num := endSeq - beginSeq + 1 + pipeline := bson.A{ + bson.M{ + "$match": bson.M{"doc_id": docID}, + }, + bson.M{ + "$project": bson.M{ + "msgs": bson.M{ + "$slice": bson.A{"$msgs", beginIndex, num}, + }, + }, + }, + } + cursor, err := m.MsgCollection.Aggregate(ctx, pipeline) + if err != nil { + return nil, errs.Wrap(err) + } + defer cursor.Close(ctx) + var doc table.MsgDocModel + i := 0 + for cursor.Next(ctx) { + err := cursor.Decode(&doc) + if err != nil { + return nil, err + } + if i == 0 { + break + } + } + log.ZDebug(ctx, "msgInfos", "num", len(doc.Msg), "docID", docID) + for _, v := range doc.Msg { + if v.Msg.Seq >= beginSeq && v.Msg.Seq <= endSeq { + log.ZDebug(ctx, "find msg", "msg", v.Msg) + msgs = append(msgs, v) + } else { + log.ZWarn(ctx, "this msg is at wrong position", nil, "msg", v.Msg) + } + } return msgs, nil } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index fbf75b3fd..a6422c73e 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -10,6 +10,7 @@ import ( "time" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" sdkws "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/jinzhu/copier" "github.com/pkg/errors" @@ -230,6 +231,35 @@ func GenConversationUniqueKey(msg *sdkws.MsgData) string { return "" } +func GetConversationIDByMsgModel(msg *unrelation.MsgDataModel) string { + options := Options(msg.Options) + switch msg.SessionType { + case constant.SingleChatType: + l := []string{msg.SendID, msg.RecvID} + sort.Strings(l) + if !options.IsNotNotification() { + return "n_" + strings.Join(l, "_") + } + return "si_" + strings.Join(l, "_") // single chat + case constant.GroupChatType: + if !options.IsNotNotification() { + return "n_" + msg.GroupID // group chat + } + return "g_" + msg.GroupID // group chat + case constant.SuperGroupChatType: + if !options.IsNotNotification() { + return "n_" + msg.GroupID // super group chat + } + return "sg_" + msg.GroupID // super group chat + case constant.NotificationChatType: + if !options.IsNotNotification() { + return "n_" + msg.SendID + "_" + msg.RecvID // super group chat + } + return "sn_" + msg.SendID + "_" + msg.RecvID // server notification chat + } + return "" +} + func GetConversationIDByMsg(msg *sdkws.MsgData) string { options := Options(msg.Options) switch msg.SessionType {