diff --git a/internal/msg_transfer/logic/online_msg_to_mongo_handler.go b/internal/msg_transfer/logic/online_msg_to_mongo_handler.go index a18186714..9659c759f 100644 --- a/internal/msg_transfer/logic/online_msg_to_mongo_handler.go +++ b/internal/msg_transfer/logic/online_msg_to_mongo_handler.go @@ -100,12 +100,10 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Con } } else if v.MsgData.ContentType == 2301 { var req pbMsg.OperateMessageListReactionExtensionsReq - var clientMsgIDList []string for _, v := range req.MessageReactionKeyList { - clientMsgIDList = append(clientMsgIDList, v.ClientMsgID) - } - if err := db.DB.DeleteReactionExtendMsgSet(req.SourceID, req.SessionType, clientMsgIDList, req.OpUserID); err != nil { - log.NewError(req.OperationID, "InsertOrUpdateReactionExtendMsgSet failed") + if err := db.DB.DeleteReactionExtendMsgSet(req.SourceID, req.SessionType, v.ClientMsgID, v.MsgFirstModifyTime, v.ReactionExtensionList); err != nil { + log.NewError(req.OperationID, "InsertOrUpdateReactionExtendMsgSet failed") + } } } } diff --git a/pkg/common/db/extend_msg_mongo_model.go b/pkg/common/db/extend_msg_mongo_model.go index 1c5e3d10b..29ab12c20 100644 --- a/pkg/common/db/extend_msg_mongo_model.go +++ b/pkg/common/db/extend_msg_mongo_model.go @@ -2,12 +2,15 @@ package db import ( "Open_IM/pkg/common/config" + server_api_params "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" "context" + "errors" "fmt" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo/options" "strconv" + "strings" "time" "go.mongodb.org/mongo-driver/bson" @@ -17,12 +20,12 @@ const cExtendMsgSet = "extend_msgs" const MaxNum = 100 type ExtendMsgSet struct { - SourceID string `bson:"source_id" json:"ID"` + SourceID string `bson:"source_id" json:"sourceID"` SessionType int32 `bson:"session_type" json:"sessionType"` ExtendMsgs map[string]ExtendMsg `bson:"extend_msgs" json:"extendMsgs"` ExtendMsgNum int32 `bson:"extend_msg_num" json:"extendMsgNum"` - CreateTime int64 `bson:"create_time" json:"createTime"` // this block's create time - MaxMsgUpdateTime int64 `bson:"max_msg_update_time"` // index find msg + CreateTime int64 `bson:"create_time" json:"createTime"` // this block's create time + MaxMsgUpdateTime int64 `bson:"max_msg_update_time" json:"maxMsgUpdateTime"` // index find msg } type KeyValue struct { @@ -39,10 +42,20 @@ type ExtendMsg struct { Ex string `bson:"ex" json:"ex"` } -func GetExtendMsgSetID(ID string, index int32) string { +func GetExtendMsgMaxNum() int32 { + return MaxNum +} + +func GetExtendMsgSourceID(ID string, index int32) string { return ID + ":" + strconv.Itoa(int(index)) } +func SplitSourceIDAndGetIndex(sourceID string) int32 { + l := strings.Split(sourceID, ":") + index, _ := strconv.Atoi(l[len(l)-1]) + return int32(index) +} + func (d *DataBases) CreateExtendMsgSet(set *ExtendMsgSet) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) @@ -84,49 +97,108 @@ type GetExtendMsgSetOpts struct { func (d *DataBases) InsertExtendMsg(sourceID string, sessionType int32, msg *ExtendMsg) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) - result, err := c.UpdateOne(ctx, bson.M{"source_id": sourceID, "session_type": sessionType}, bson.M{"$set": bson.M{"max_msg_update_time": msg.MsgFirstModifyTime, "$inc": bson.M{"extend_msg_num": 1}, fmt.Sprintf("extend_msgs.%s", msg.ClientMsgID): msg}}) + regex := fmt.Sprintf("^%s", sourceID) + var err error + findOpts := options.Find().SetLimit(1).SetSkip(0).SetSort(bson.M{"source_id": -1}).SetProjection(bson.M{"extend_msgs": 0}) + // update newest + result, err := c.Find(ctx, bson.M{"source_id": primitive.Regex{Pattern: regex}, "session_type": sessionType}, findOpts) if err != nil { return utils.Wrap(err, "") } - if result.UpsertedCount == 0 { - if err := d.CreateExtendMsgSet(&ExtendMsgSet{ - SourceID: sourceID, + var setList []ExtendMsgSet + if err := result.All(ctx, &setList); err != nil { + return utils.Wrap(err, "") + } + if len(setList) == 0 || setList[0].ExtendMsgNum >= GetExtendMsgMaxNum() { + var index int32 + if len(setList) > 0 { + index = SplitSourceIDAndGetIndex(setList[0].SourceID) + } + err = d.CreateExtendMsgSet(&ExtendMsgSet{ + SourceID: GetExtendMsgSourceID(sourceID, index), SessionType: sessionType, ExtendMsgs: map[string]ExtendMsg{msg.ClientMsgID: *msg}, ExtendMsgNum: 1, CreateTime: msg.MsgFirstModifyTime, MaxMsgUpdateTime: msg.MsgFirstModifyTime, - }); err != nil { - return err - } + }) + } else { + _, err = c.UpdateOne(ctx, bson.M{"source_id": setList[0].SourceID, "session_type": sessionType}, bson.M{"$set": bson.M{"max_msg_update_time": msg.MsgFirstModifyTime, "$inc": bson.M{"extend_msg_num": 1}, fmt.Sprintf("extend_msgs.%s", msg.ClientMsgID): msg}}) } - return nil + return utils.Wrap(err, "") } // insert or update -func (d *DataBases) InsertOrUpdateReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID, typeKey, value string) error { +func (d *DataBases) InsertOrUpdateReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*server_api_params.KeyValue) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) - reactionExtendMsgSet := KeyValue{ - TypeKey: typeKey, - Value: value, - LatestUpdateTime: utils.GetCurrentTimestampBySecond(), + var updateBson = bson.M{} + for _, v := range reactionExtensionList { + updateBson[fmt.Sprintf("extend_msgs.%s.%s", clientMsgID, v.TypeKey)] = v } upsert := true opt := &options.UpdateOptions{ Upsert: &upsert, } - _, err := c.UpdateOne(ctx, bson.M{"source_id": sourceID, "session_type": sessionType}, bson.M{"$set": bson.M{fmt.Sprintf("extend_msgs.%s.%s", clientMsgID, typeKey): reactionExtendMsgSet}}, opt) - return err + findOpts := options.Find().SetLimit(1).SetSkip(0).SetSort(bson.M{"source_id": -1}).SetProjection(bson.M{"extend_msgs": 0}) + regex := fmt.Sprintf("^%s", sourceID) + result, err := c.Find(ctx, bson.M{"source_id": primitive.Regex{Pattern: regex}, "session_type": sessionType, "max_msg_update_time": bson.M{"$lte": msgFirstModifyTime}}, findOpts) + if err != nil { + return utils.Wrap(err, "") + } + var setList []ExtendMsgSet + if err := result.All(ctx, &setList); err != nil { + return utils.Wrap(err, "") + } + if len(setList) == 0 { + return utils.Wrap(errors.New("InsertOrUpdateReactionExtendMsgSet failed, len(setList) == 0"), "") + } + + _, err = c.UpdateOne(ctx, bson.M{"source_id": setList[0].SourceID, "session_type": sessionType}, bson.M{"$set": updateBson}, opt) + return utils.Wrap(err, "") } // delete TypeKey -func (d *DataBases) DeleteReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID []string, userID string) error { +func (d *DataBases) DeleteReactionExtendMsgSet(sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList []*server_api_params.KeyValue) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) - _, err := c.UpdateOne(ctx, bson.M{"source_id": sourceID, "session_type": sessionType}, bson.M{"$unset": bson.M{fmt.Sprintf("extend_msgs.%s.%s", clientMsgID, userID): ""}}) + var updateBson = bson.M{} + for _, v := range reactionExtensionList { + updateBson[fmt.Sprintf("extend_msgs.%s.%s", clientMsgID, v.TypeKey)] = "" + } + + findOpts := options.Find().SetLimit(1).SetSkip(0).SetSort(bson.M{"source_id": -1}).SetProjection(bson.M{"extend_msgs": 0}) + regex := fmt.Sprintf("^%s", sourceID) + result, err := c.Find(ctx, bson.M{"source_id": primitive.Regex{Pattern: regex}, "session_type": sessionType, "max_msg_update_time": bson.M{"$lte": msgFirstModifyTime}}, findOpts) + if err != nil { + return utils.Wrap(err, "") + } + var setList []ExtendMsgSet + if err := result.All(ctx, &setList); err != nil { + return utils.Wrap(err, "") + } + if len(setList) == 0 { + return utils.Wrap(errors.New("InsertOrUpdateReactionExtendMsgSet failed, len(setList) == 0"), "") + } + _, err = c.UpdateOne(ctx, bson.M{"source_id": setList[0].SourceID, "session_type": sessionType}, bson.M{"$unset": updateBson}) return err } -func (d *DataBases) GetExtendMsg(sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) error { +func (d *DataBases) GetExtendMsg(sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (extendMsg *ExtendMsg, err error) { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cExtendMsgSet) + findOpts := options.Find().SetLimit(1).SetSkip(0).SetSort(bson.M{"source_id": -1}).SetProjection(bson.M{fmt.Sprintf("extend_msgs.%s", clientMsgID): 1}) + regex := fmt.Sprintf("^%s", sourceID) + result, err := c.Find(ctx, bson.M{"source_id": primitive.Regex{Pattern: regex}, "session_type": sessionType, "msgFirstModifyTime": bson.M{"$lte": maxMsgUpdateTime}}, findOpts) + if err != nil { + return nil, utils.Wrap(err, "") + } + var extendMsgList []ExtendMsg + if err := result.All(ctx, &extendMsgList); err != nil { + return nil, utils.Wrap(err, "") + } + if len(extendMsgList) == 0 { + return nil, utils.Wrap(errors.New("GetExtendMsg failed, len(setList) == 0"), "") + } + return &extendMsgList[0], nil } diff --git a/pkg/common/db/rocks_cache/rocks_cache.go b/pkg/common/db/rocks_cache/rocks_cache.go index 3edb9f238..5ff836fd7 100644 --- a/pkg/common/db/rocks_cache/rocks_cache.go +++ b/pkg/common/db/rocks_cache/rocks_cache.go @@ -567,37 +567,9 @@ func DelConversationFromCache(ownerUserID, conversationID string) error { return utils.Wrap(db.DB.Rc.TagAsDeleted(conversationCache+ownerUserID+":"+conversationID), "DelConversationFromCache err") } -func GetExtendMsgSetFromCache(ID string, index int32) (*db.ExtendMsgSet, error) { - getExtendMsgSet := func() (string, error) { - extendMsgSet, err := db.DB.GetExtendMsgSet(ID, index, &db.GetExtendMsgSetOpts{ExcludeExtendMsgs: false}) - if err != nil { - return "", utils.Wrap(err, "GetExtendMsgSet failed") - } - bytes, err := json.Marshal(extendMsgSet) - if err != nil { - return "", utils.Wrap(err, "Marshal failed") - } - return string(bytes), nil - } - extendMsgSetStr, err := db.DB.Rc.Fetch(extendMsgSetCache+db.GetExtendMsgSetID(ID, index), time.Second*30*60, getExtendMsgSet) - if err != nil { - return nil, utils.Wrap(err, "Fetch failed") - } - extendMsgSet := &db.ExtendMsgSet{} - err = json.Unmarshal([]byte(extendMsgSetStr), extendMsgSet) - if err != nil { - return nil, utils.Wrap(err, "Unmarshal failed") - } - return extendMsgSet, nil -} - -func DelExtendMsgSetFromCache(ID string, index int32) error { - return utils.Wrap(db.DB.Rc.TagAsDeleted(extendMsgSetCache+db.GetExtendMsgSetID(ID, index)), "DelExtendMsgSetFromCache err") -} - -func GetExtendMsg(ID string, index int32, clientMsgID string) (*db.ExtendMsg, error) { +func GetExtendMsg(sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*db.ExtendMsg, error) { getExtendMsg := func() (string, error) { - extendMsg, err := db.DB.GetExtendMsgList(ID, index, clientMsgID) + extendMsg, err := db.DB.GetExtendMsg(sourceID, sessionType, clientMsgID, maxMsgUpdateTime) if err != nil { return "", utils.Wrap(err, "GetExtendMsgList failed") } @@ -608,7 +580,7 @@ func GetExtendMsg(ID string, index int32, clientMsgID string) (*db.ExtendMsg, er return string(bytes), nil } - extendMsgStr, err := db.DB.Rc.Fetch(extendMsgCache+db.GetExtendMsgSetID(ID, index)+":"+clientMsgID, time.Second*30*60, getExtendMsg) + extendMsgStr, err := db.DB.Rc.Fetch(extendMsgCache+clientMsgID, time.Second*30*60, getExtendMsg) if err != nil { return nil, utils.Wrap(err, "Fetch failed") } @@ -621,5 +593,5 @@ func GetExtendMsg(ID string, index int32, clientMsgID string) (*db.ExtendMsg, er } func DelExtendMsg(ID string, index int32, clientMsgID string) error { - return utils.Wrap(db.DB.Rc.TagAsDeleted(extendMsgCache+db.GetExtendMsgSetID(ID, index)+":"+clientMsgID), "DelExtendMsg err") + return utils.Wrap(db.DB.Rc.TagAsDeleted(extendMsgCache+clientMsgID), "DelExtendMsg err") }