// Copyright © 2023 OpenIM. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package controller import ( "context" "encoding/json" "errors" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "strings" "time" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/protocol/constant" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/timeutil" "github.com/redis/go-redis/v9" "go.mongodb.org/mongo-driver/mongo" ) const ( updateKeyMsg = iota updateKeyRevoke ) // CommonMsgDatabase defines the interface for message database operations. type CommonMsgDatabase interface { // BatchInsertChat2DB inserts a batch of messages into the database for a specific conversation. BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error // RevokeMsg revokes a message in a conversation. RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *model.RevokeModel) error // MarkSingleChatMsgsAsRead marks messages as read for a single chat by sequence numbers. MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error // DeleteMessagesFromCache deletes message caches from Redis by sequence numbers. DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error // BatchInsertChat2Cache increments the sequence number and then batch inserts messages into the cache. BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error) // GetMsgBySeqsRange retrieves messages from MongoDB by a range of sequence numbers. GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error) // GetMsgBySeqs retrieves messages for large groups from MongoDB by sequence numbers. GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error) // DeleteConversationMsgsAndSetMinSeq deletes conversation messages and resets the minimum sequence number. If `remainTime` is 0, all messages are deleted (this method does not delete Redis // cache). DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error // UserMsgsDestruct marks messages for deletion based on destruct time and returns a list of sequence numbers for marked messages. UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) // DeleteUserMsgsBySeqs allows a user to delete messages based on sequence numbers. DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error // DeleteMsgsPhysicalBySeqs physically deletes messages by emptying them based on sequence numbers. DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, seqs []int64) error //SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) SetMinSeqs(ctx context.Context, seqs map[string]int64) error SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error //GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) //GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) SetSendMsgStatus(ctx context.Context, id string, status int32) error GetSendMsgStatus(ctx context.Context, id string) (int32, error) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error) FindOneByDocIDs(ctx context.Context, docIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) // to mq MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error MsgToPushMQ(ctx context.Context, key, conversarionID string, msg2mq *sdkws.MsgData) (int32, int64, error) MsgToMongoMQ(ctx context.Context, key, conversarionID string, msgs []*sdkws.MsgData, lastSeq int64) error RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) // clear msg GetBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) } func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { conf, err := kafka.BuildProducerConfig(*kafkaConf.Build()) if err != nil { return nil, err } producerToRedis, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToRedisTopic) if err != nil { return nil, err } producerToMongo, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToMongoTopic) if err != nil { return nil, err } producerToPush, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToPushTopic) if err != nil { return nil, err } return &commonMsgDatabase{ msgDocDatabase: msgDocModel, msg: msg, seqUser: seqUser, seqConversation: seqConversation, producer: producerToRedis, producerToMongo: producerToMongo, producerToPush: producerToPush, }, nil } type commonMsgDatabase struct { msgDocDatabase database.Msg msgTable model.MsgDocModel msg cache.MsgCache seqConversation cache.SeqConversationCache seqUser cache.SeqUser producer *kafka.Producer producerToMongo *kafka.Producer producerToPush *kafka.Producer } func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error { _, _, err := db.producer.SendMessage(ctx, key, msg2mq) return err } func (db *commonMsgDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) { partition, offset, err := db.producerToPush.SendMessage(ctx, key, &pbmsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID}) if err != nil { log.ZError(ctx, "MsgToPushMQ", err, "key", key, "msg2mq", msg2mq) return 0, 0, err } return partition, offset, nil } func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error { if len(messages) > 0 { _, _, err := db.producerToMongo.SendMessage(ctx, key, &pbmsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages}) return err } return nil } func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error { if len(fields) == 0 { return nil } num := db.msgTable.GetSingleGocMsgNum() // num = 100 for i, field := range fields { // Check the type of the field var ok bool switch key { case updateKeyMsg: var msg *model.MsgDataModel msg, ok = field.(*model.MsgDataModel) if msg != nil && msg.Seq != firstSeq+int64(i) { return errs.ErrInternalServer.WrapMsg("seq is invalid") } case updateKeyRevoke: _, ok = field.(*model.RevokeModel) default: return errs.ErrInternalServer.WrapMsg("key is invalid") } if !ok { return errs.ErrInternalServer.WrapMsg("field type is invalid") } } // Returns true if the document exists in the database, false if the document does not exist in the database updateMsgModel := func(seq int64, i int) (bool, error) { var ( res *mongo.UpdateResult err error ) docID := db.msgTable.GetDocID(conversationID, seq) index := db.msgTable.GetMsgIndex(seq) field := fields[i] switch key { case updateKeyMsg: res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", field) case updateKeyRevoke: res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", field) } if err != nil { return false, err } return res.MatchedCount > 0, nil } tryUpdate := true for i := 0; i < len(fields); i++ { seq := firstSeq + int64(i) // Current sequence number if tryUpdate { matched, err := updateMsgModel(seq, i) if err != nil { return err } if matched { continue // The current data has been updated, skip the current data } } doc := model.MsgDocModel{ DocID: db.msgTable.GetDocID(conversationID, seq), Msg: make([]*model.MsgInfoModel, num), } var insert int // Inserted data number for j := i; j < len(fields); j++ { seq = firstSeq + int64(j) if db.msgTable.GetDocID(conversationID, seq) != doc.DocID { break } insert++ switch key { case updateKeyMsg: doc.Msg[db.msgTable.GetMsgIndex(seq)] = &model.MsgInfoModel{ Msg: fields[j].(*model.MsgDataModel), } case updateKeyRevoke: doc.Msg[db.msgTable.GetMsgIndex(seq)] = &model.MsgInfoModel{ Revoke: fields[j].(*model.RevokeModel), } } } for i, msgInfo := range doc.Msg { if msgInfo == nil { msgInfo = &model.MsgInfoModel{} doc.Msg[i] = msgInfo } if msgInfo.DelList == nil { doc.Msg[i].DelList = []string{} } } if err := db.msgDocDatabase.Create(ctx, &doc); err != nil { if mongo.IsDuplicateKeyError(err) { i-- // already inserted tryUpdate = true // next block use update mode continue } return err } tryUpdate = false // The current block is inserted successfully, and the next block is inserted preferentially i += insert - 1 // Skip the inserted data } return nil } func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error { if len(msgList) == 0 { return errs.ErrArgs.WrapMsg("msgList is empty") } msgs := make([]any, len(msgList)) for i, msg := range msgList { if msg == nil { continue } var offlinePushModel *model.OfflinePushModel if msg.OfflinePushInfo != nil { offlinePushModel = &model.OfflinePushModel{ Title: msg.OfflinePushInfo.Title, Desc: msg.OfflinePushInfo.Desc, Ex: msg.OfflinePushInfo.Ex, IOSPushSound: msg.OfflinePushInfo.IOSPushSound, IOSBadgeCount: msg.OfflinePushInfo.IOSBadgeCount, } } msgs[i] = &model.MsgDataModel{ SendID: msg.SendID, RecvID: msg.RecvID, GroupID: msg.GroupID, ClientMsgID: msg.ClientMsgID, ServerMsgID: msg.ServerMsgID, SenderPlatformID: msg.SenderPlatformID, SenderNickname: msg.SenderNickname, SenderFaceURL: msg.SenderFaceURL, SessionType: msg.SessionType, MsgFrom: msg.MsgFrom, ContentType: msg.ContentType, Content: string(msg.Content), Seq: msg.Seq, SendTime: msg.SendTime, CreateTime: msg.CreateTime, Status: msg.Status, Options: msg.Options, OfflinePush: offlinePushModel, AtUserIDList: msg.AtUserIDList, AttachedInfo: msg.AttachedInfo, Ex: msg.Ex, } } return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq) } func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *model.RevokeModel) error { return db.BatchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq) } func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, totalSeqs []int64) error { for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, totalSeqs) { var indexes []int64 for _, seq := range seqs { indexes = append(indexes, db.msgTable.GetMsgIndex(seq)) } log.ZDebug(ctx, "MarkSingleChatMsgsAsRead", "userID", userID, "docID", docID, "indexes", indexes) if err := db.msgDocDatabase.MarkSingleChatMsgsAsRead(ctx, userID, docID, indexes); err != nil { log.ZError(ctx, "MarkSingleChatMsgsAsRead", err, "userID", userID, "docID", docID, "indexes", indexes) return err } } return nil } func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error { return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs) } func (db *commonMsgDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { for userID, seq := range userSeqMap { if err := db.seqUser.SetReadSeq(ctx, conversationID, userID, seq); err != nil { return err } } return nil } func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { lenList := len(msgs) if int64(lenList) > db.msgTable.GetSingleGocMsgNum() { return 0, false, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap() } if lenList < 1 { return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap() } currentMaxSeq, err := db.seqConversation.Malloc(ctx, conversationID, int64(len(msgs))) if err != nil { log.ZError(ctx, "storage.seq.Malloc", err) return 0, false, err } isNew = currentMaxSeq == 0 lastMaxSeq := currentMaxSeq userSeqMap := make(map[string]int64) for _, m := range msgs { currentMaxSeq++ m.Seq = currentMaxSeq userSeqMap[m.SendID] = m.Seq } failedNum, err := db.msg.SetMessagesToCache(ctx, conversationID, msgs) if err != nil { prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum)) log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID) } else { prommetrics.MsgInsertRedisSuccessCounter.Inc() } err = db.setHasReadSeqs(ctx, conversationID, userSeqMap) if err != nil { log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID) prommetrics.SeqSetFailedCounter.Inc() } return lastMaxSeq, isNew, errs.Wrap(err) } func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, seqs) { // log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs) msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs) if err != nil { return nil, err } for _, msg := range msgs { totalMsgs = append(totalMsgs, convert.MsgDB2Pb(msg.Msg)) } } return totalMsgs, nil } func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][]*model.MsgInfoModel, userID, conversationID string, msg *model.MsgInfoModel) { if msg.IsRead { msg.Msg.IsRead = true } if msg.Msg.ContentType != constant.Quote { return } if msg.Msg.Content == "" { return } var quoteMsg struct { Text string `json:"text,omitempty"` QuoteMessage *sdkws.MsgData `json:"quoteMessage,omitempty"` MessageEntityList json.RawMessage `json:"messageEntityList,omitempty"` } if err := json.Unmarshal([]byte(msg.Msg.Content), "eMsg); err != nil { log.ZError(ctx, "json.Unmarshal", err) return } if quoteMsg.QuoteMessage == nil || quoteMsg.QuoteMessage.ContentType == constant.MsgRevokeNotification { return } var msgs []*model.MsgInfoModel if v, ok := cache[quoteMsg.QuoteMessage.Seq]; ok { msgs = v } else { if quoteMsg.QuoteMessage.Seq > 0 { ms, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, db.msgTable.GetDocID(conversationID, quoteMsg.QuoteMessage.Seq), []int64{quoteMsg.QuoteMessage.Seq}) if err != nil { log.ZError(ctx, "GetMsgBySeqIndexIn1Doc", err, "conversationID", conversationID, "seq", quoteMsg.QuoteMessage.Seq) return } msgs = ms cache[quoteMsg.QuoteMessage.Seq] = ms } } if len(msgs) != 0 && msgs[0].Msg.ContentType != constant.MsgRevokeNotification { return } quoteMsg.QuoteMessage.ContentType = constant.MsgRevokeNotification if len(msgs) > 0 { quoteMsg.QuoteMessage.Content = []byte(msgs[0].Msg.Content) } else { quoteMsg.QuoteMessage.Content = []byte("{}") } data, err := json.Marshal("eMsg) if err != nil { log.ZError(ctx, "json.Marshal", err) return } msg.Msg.Content = string(data) if _, err := db.msgDocDatabase.UpdateMsg(ctx, db.msgTable.GetDocID(conversationID, msg.Msg.Seq), db.msgTable.GetMsgIndex(msg.Msg.Seq), "msg", msg.Msg); err != nil { log.ZError(ctx, "UpdateMsgContent", err) } } func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, conversationID string, seqs []int64) (totalMsgs []*model.MsgInfoModel, err error) { msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, userID, docID, seqs) if err != nil { return nil, err } tempCache := make(map[int64][]*model.MsgInfoModel) for _, msg := range msgs { db.handlerDBMsg(ctx, tempCache, userID, conversationID, msg) } return msgs, err } func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID string, conversationID string, allSeqs []int64, begin, end int64) (seqMsgs []*sdkws.MsgData, err error) { log.ZDebug(ctx, "getMsgBySeqsRange", "conversationID", conversationID, "allSeqs", allSeqs, "begin", begin, "end", end) for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, allSeqs) { log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs) msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs) if err != nil { return nil, err } for _, msg := range msgs { if msg.IsRead { msg.Msg.IsRead = true } seqMsgs = append(seqMsgs, convert.MsgDB2Pb(msg.Msg)) } } return seqMsgs, nil } // GetMsgBySeqsRange In the context of group chat, we have the following parameters: // // "maxSeq" of a conversation: It represents the maximum value of messages in the group conversation. // "minSeq" of a conversation (default: 1): It represents the minimum value of messages in the group conversation. // // For a user's perspective regarding the group conversation, we have the following parameters: // // "userMaxSeq": It represents the user's upper limit for message retrieval in the group. If not set (default: 0), // it means the upper limit is the same as the conversation's "maxSeq". // "userMinSeq": It represents the user's starting point for message retrieval in the group. If not set (default: 0), // it means the starting point is the same as the conversation's "minSeq". // // The scenarios for these parameters are as follows: // // For users who have been kicked out of the group, "userMaxSeq" can be set as the maximum value they had before // being kicked out. This limits their ability to retrieve messages up to a certain point. // For new users joining the group, if they don't need to receive old messages, // "userMinSeq" can be set as the same value as the conversation's "maxSeq" at the moment they join the group. // This ensures that their message retrieval starts from the point they joined. func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (int64, int64, []*sdkws.MsgData, error) { userMinSeq, err := db.seqUser.GetMinSeq(ctx, conversationID, userID) if err != nil && errs.Unwrap(err) != redis.Nil { return 0, 0, nil, err } minSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID) if err != nil { return 0, 0, nil, err } if userMinSeq > minSeq { minSeq = userMinSeq } // "minSeq" represents the startSeq value that the user can retrieve. if minSeq > end { log.ZWarn(ctx, "minSeq > end", errs.New("minSeq>end"), "minSeq", minSeq, "end", end) return 0, 0, nil, nil } maxSeq, err := db.seqConversation.GetMaxSeq(ctx, conversationID) if err != nil { return 0, 0, nil, err } log.ZDebug(ctx, "GetMsgBySeqsRange", "userMinSeq", userMinSeq, "conMinSeq", minSeq, "conMaxSeq", maxSeq, "userMaxSeq", userMaxSeq) if userMaxSeq != 0 { if userMaxSeq < maxSeq { maxSeq = userMaxSeq } } // "maxSeq" represents the endSeq value that the user can retrieve. if begin < minSeq { begin = minSeq } if end > maxSeq { end = maxSeq } // "begin" and "end" represent the actual startSeq and endSeq values that the user can retrieve. if end < begin { return 0, 0, nil, errs.ErrArgs.WrapMsg("seq end < begin") } var seqs []int64 if end-begin+1 <= num { for i := begin; i <= end; i++ { seqs = append(seqs, i) } } else { for i := end - num + 1; i <= end; i++ { seqs = append(seqs, i) } } if len(seqs) == 0 { return 0, 0, nil, nil } newBegin := seqs[0] newEnd := seqs[len(seqs)-1] var successMsgs []*sdkws.MsgData log.ZDebug(ctx, "GetMsgBySeqsRange", "first seqs", seqs, "newBegin", newBegin, "newEnd", newEnd) cachedMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, seqs) if err != nil && !errors.Is(err, redis.Nil) { log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", seqs) } successMsgs = append(successMsgs, cachedMsgs...) log.ZDebug(ctx, "get msgs from cache", "cachedMsgs", cachedMsgs) // get from cache or db if len(failedSeqs) > 0 { log.ZDebug(ctx, "msgs not exist in redis", "seqs", failedSeqs) mongoMsgs, err := db.getMsgBySeqsRange(ctx, userID, conversationID, failedSeqs, begin, end) if err != nil { return 0, 0, nil, err } successMsgs = append(mongoMsgs, successMsgs...) } return minSeq, maxSeq, successMsgs, nil } func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (int64, int64, []*sdkws.MsgData, error) { userMinSeq, err := db.seqUser.GetMinSeq(ctx, conversationID, userID) if err != nil && errs.Unwrap(err) != redis.Nil { return 0, 0, nil, err } minSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID) if err != nil { return 0, 0, nil, err } maxSeq, err := db.seqConversation.GetMaxSeq(ctx, conversationID) if err != nil { return 0, 0, nil, err } if userMinSeq < minSeq { minSeq = userMinSeq } var newSeqs []int64 for _, seq := range seqs { if seq >= minSeq && seq <= maxSeq { newSeqs = append(newSeqs, seq) } } successMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, newSeqs) if err != nil { if err != redis.Nil { log.ZError(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID) } } log.ZDebug(ctx, "db.seq.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs", seqs, "len(successMsgs)", len(successMsgs), "failedSeqs", failedSeqs) if len(failedSeqs) > 0 { mongoMsgs, err := db.getMsgBySeqs(ctx, userID, conversationID, failedSeqs) if err != nil { return 0, 0, nil, err } successMsgs = append(successMsgs, mongoMsgs...) } return minSeq, maxSeq, successMsgs, nil } func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error { var delStruct delMsgRecursionStruct var skip int64 minSeq, err := db.deleteMsgRecursion(ctx, conversationID, skip, &delStruct, remainTime) if err != nil { return err } log.ZDebug(ctx, "DeleteConversationMsgsAndSetMinSeq", "conversationID", conversationID, "minSeq", minSeq) if minSeq == 0 { return nil } return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq) } func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) { var index int64 for { // from oldest 2 newest msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1) if err != nil || msgDocModel.DocID == "" { if err != nil { if err == model.ErrMsgListNotExist { log.ZDebug(ctx, "not doc find", "conversationID", conversationID, "userID", userID, "index", index) } else { log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index) } } // If an error is reported, or the error cannot be obtained, it is physically deleted and seq delMongoMsgsPhysical(delStruct.delDocIDList) is returned to end the recursion break } index++ // && msgDocModel.Msg[0].Msg.SendTime > lastMsgDestructTime.UnixMilli() if len(msgDocModel.Msg) > 0 { i := 0 var over bool for _, msg := range msgDocModel.Msg { i++ if msg != nil && msg.Msg != nil && msg.Msg.SendTime+destructTime*1000 <= time.Now().UnixMilli() { if msg.Msg.SendTime+destructTime*1000 > lastMsgDestructTime.UnixMilli() && !datautil.Contain(userID, msg.DelList...) { seqs = append(seqs, msg.Msg.Seq) } } else { log.ZDebug(ctx, "all msg need destruct is found", "conversationID", conversationID, "userID", userID, "index", index, "stop index", i) over = true break } } if over { break } } } log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs) if len(seqs) > 0 { userMinSeq := seqs[len(seqs)-1] + 1 currentUserMinSeq, err := db.seqUser.GetMinSeq(ctx, conversationID, userID) if err != nil { return nil, err } if currentUserMinSeq < userMinSeq { if err := db.seqUser.SetMinSeq(ctx, conversationID, userID, userMinSeq); err != nil { return nil, err } } } return seqs, nil } // this is struct for recursion. type delMsgRecursionStruct struct { minSeq int64 delDocIDs []string } func (d *delMsgRecursionStruct) getSetMinSeq() int64 { return d.minSeq } // index 0....19(del) 20...69 // seq 70 // set minSeq 21 // recursion deletes the list and returns the set minimum seq. func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversationID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) { // find from oldest list msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1) if err != nil || msgDocModel.DocID == "" { if err != nil { if err == model.ErrMsgListNotExist { log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index) } else { log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index) } } // If an error is reported, or the error cannot be obtained, it is physically deleted and seq delMongoMsgsPhysical(delStruct.delDocIDList) is returned to end the recursion err = db.msgDocDatabase.DeleteDocs(ctx, delStruct.delDocIDs) if err != nil { return 0, err } return delStruct.getSetMinSeq() + 1, nil } log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgDocModel.DocID, "len", len(msgDocModel.Msg)) if int64(len(msgDocModel.Msg)) > db.msgTable.GetSingleGocMsgNum() { log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgDocModel.Msg), "docID:", msgDocModel.DocID) } if msgDocModel.IsFull() && msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.SendTime+(remainTime*1000) < timeutil.GetCurrentTimestampByMill() { log.ZDebug(ctx, "doc is full and all msg is expired", "docID", msgDocModel.DocID) delStruct.delDocIDs = append(delStruct.delDocIDs, msgDocModel.DocID) delStruct.minSeq = msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.Seq } else { var delMsgIndexs []int for i, MsgInfoModel := range msgDocModel.Msg { if MsgInfoModel != nil && MsgInfoModel.Msg != nil { if timeutil.GetCurrentTimestampByMill() > MsgInfoModel.Msg.SendTime+(remainTime*1000) { delMsgIndexs = append(delMsgIndexs, i) } } } if len(delMsgIndexs) > 0 { if err = db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, msgDocModel.DocID, delMsgIndexs); err != nil { log.ZError(ctx, "deleteMsgRecursion DeleteMsgsInOneDocByIndex failed", err, "conversationID", conversationID, "index", index) } delStruct.minSeq = int64(msgDocModel.Msg[delMsgIndexs[len(delMsgIndexs)-1]].Msg.Seq) } } seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime) return seq, err } func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error { if err := db.msg.DeleteMessagesFromCache(ctx, conversationID, allSeqs); err != nil { return err } for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, allSeqs) { var indexes []int for _, seq := range seqs { indexes = append(indexes, int(db.msgTable.GetMsgIndex(seq))) } if err := db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, docID, indexes); err != nil { return err } } return nil } func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error { if err := db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs); err != nil { return err } for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, seqs) { for _, seq := range seqs { if _, err := db.msgDocDatabase.PushUnique(ctx, docID, db.msgTable.GetMsgIndex(seq), "del_list", []string{userID}); err != nil { return err } } } return nil } func (db *commonMsgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { return db.seqConversation.GetMaxSeqs(ctx, conversationIDs) } func (db *commonMsgDatabase) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { return db.seqConversation.GetMaxSeq(ctx, conversationID) } func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq) } func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { return db.seqConversation.SetMinSeqs(ctx, seqs) } func (db *commonMsgDatabase) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error { return db.seqUser.SetMinSeqs(ctx, userID, seqs) } func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error { return db.seqUser.SetReadSeqs(ctx, userID, hasReadSeqs) } func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { return db.seqUser.SetReadSeq(ctx, conversationID, userID, hasReadSeq) } func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) { return db.seqUser.GetReadSeqs(ctx, userID, conversationIDs) } func (db *commonMsgDatabase) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) { return db.seqUser.GetReadSeq(ctx, conversationID, userID) } func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error { return db.msg.SetSendMsgStatus(ctx, id, status) } func (db *commonMsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { return db.msg.GetSendMsgStatus(ctx, id) } func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, conversationID) if err != nil { return } minSeqCache, err = db.seqConversation.GetMinSeq(ctx, conversationID) if err != nil { return } maxSeqCache, err = db.seqConversation.GetMaxSeq(ctx, conversationID) if err != nil { return } return } func (db *commonMsgDatabase) GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) { return db.GetMinMaxSeqMongo(ctx, conversationID) } func (db *commonMsgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) { oldestMsgMongo, err := db.msgDocDatabase.GetOldestMsg(ctx, conversationID) if err != nil { return } minSeqMongo = oldestMsgMongo.Msg.Seq newestMsgMongo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID) if err != nil { return } maxSeqMongo = newestMsgMongo.Msg.Seq return } func (db *commonMsgDatabase) RangeUserSendCount( ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32, ) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) { return db.msgDocDatabase.RangeUserSendCount(ctx, start, end, group, ase, pageNumber, showNumber) } func (db *commonMsgDatabase) RangeGroupSendCount( ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32, ) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) { return db.msgDocDatabase.RangeGroupSendCount(ctx, start, end, ase, pageNumber, showNumber) } func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error) { var totalMsgs []*sdkws.MsgData total, msgs, err := db.msgDocDatabase.SearchMessage(ctx, req) if err != nil { return 0, nil, err } for _, msg := range msgs { if msg.IsRead { msg.Msg.IsRead = true } totalMsgs = append(totalMsgs, convert.MsgDB2Pb(msg.Msg)) } return total, totalMsgs, nil } func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) { totalMsgs := make(map[string]*sdkws.MsgData) for _, conversationID := range conversationIDs { seq := seqs[conversationID] docID := db.msgTable.GetDocID(conversationID, seq) msgs, err := db.msgDocDatabase.FindOneByDocID(ctx, docID) if err != nil { return nil, err } index := db.msgTable.GetMsgIndex(seq) totalMsgs[conversationID] = convert.MsgDB2Pb(msgs.Msg[index].Msg) } return totalMsgs, nil } func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) { db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs) } func (db *commonMsgDatabase) GetBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) { return db.msgDocDatabase.GetBeforeMsg(ctx, ts, limit) } func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) { var notNull int index := make([]int, 0, len(doc.Msg)) for i, message := range doc.Msg { if message.Msg != nil { notNull++ if message.Msg.SendTime < ts { index = append(index, i) } } } if len(index) == 0 { return index, nil } maxSeq := doc.Msg[index[len(index)-1]].Msg.Seq conversationID := doc.DocID[:strings.LastIndex(doc.DocID, ":")] if err := db.setMinSeq(ctx, conversationID, maxSeq+1); err != nil { return index, err } if len(index) == notNull { return index, db.msgDocDatabase.DeleteDoc(ctx, doc.DocID) } else { return index, db.msgDocDatabase.DeleteMsgByIndex(ctx, doc.DocID, index) } } func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID string, seq int64) error { dbSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID) if err != nil { if errors.Is(errs.Unwrap(err), redis.Nil) { return nil } return err } if dbSeq >= seq { return nil } return db.seqConversation.SetMinSeq(ctx, conversationID, seq) }