From 59d1090a6d1c2910ad609ee56354a6bfad025a8a Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Mon, 27 Feb 2023 12:29:40 +0800 Subject: [PATCH] modify msg --- .../msg_transfer/logic/modify_msg_handler.go | 36 ++++++++++++ internal/rpc/msg/pull_message.go | 2 +- internal/rpc/msg/query_msg.go | 2 +- pkg/base_info/msg.go | 1 + pkg/common/db/mongoModel.go | 56 ++++++++++++++++--- 5 files changed, 88 insertions(+), 9 deletions(-) diff --git a/internal/msg_transfer/logic/modify_msg_handler.go b/internal/msg_transfer/logic/modify_msg_handler.go index 782c6fcc0..40b0d0317 100644 --- a/internal/msg_transfer/logic/modify_msg_handler.go +++ b/internal/msg_transfer/logic/modify_msg_handler.go @@ -83,11 +83,47 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(cMsg *sarama.ConsumerMessage, msg LatestUpdateTime: v.LatestUpdateTime, } } + // modify old msg + if msgDataToMQ.MsgData.SessionType == constant.SuperGroupChatType && utils.GetSwitchFromOptions(msgDataToMQ.MsgData.Options, constant.IsHistory) { + if msgDataToMQ.MsgData.Seq == 0 { + log.NewError(msgDataToMQ.OperationID, "seq==0, error msg", msgDataToMQ.MsgData) + continue + } + msg, err := db.DB.GetMsgBySeqIndex(notification.SourceID, notification.Seq, msgDataToMQ.OperationID) + if (msg != nil && msg.Seq != notification.Seq) || err != nil { + if err != nil { + log.NewError(msgDataToMQ.OperationID, "GetMsgBySeqIndex failed", notification, err.Error()) + } + msgs, indexes, err := db.DB.GetSuperGroupMsgBySeqListMongo(notification.SourceID, []uint32{notification.Seq}, msgDataToMQ.OperationID) + if err != nil { + log.NewError(msgDataToMQ.OperationID, "GetSuperGroupMsgBySeqListMongo failed", notification, err.Error()) + continue + } + var index int + if len(msgs) < 1 || len(indexes) < 1 { + log.NewError(msgDataToMQ.OperationID, "GetSuperGroupMsgBySeqListMongo failed", notification, "len<1", msgs, indexes) + continue + } else { + msg = msgs[0] + index = indexes[msg.Seq] + } + if err := db.DB.ReplaceMsgByIndex(notification.SourceID, msg, index); err != nil { + log.NewError(msgDataToMQ.OperationID, "ReplaceMsgByIndex failed", notification.SourceID, *msg) + } + + } else { + msg.IsReact = true + if err = db.DB.ReplaceMsgBySeq(notification.SourceID, msg, msgDataToMQ.OperationID); err != nil { + log.NewError(msgDataToMQ.OperationID, "ReplaceMsgBySeq failed", notification.SourceID, *msg) + } + } + } if err := db.DB.InsertExtendMsg(notification.SourceID, notification.SessionType, &extendMsg); err != nil { log.NewError(msgDataToMQ.OperationID, "MsgFirstModify InsertExtendMsg failed", notification.SourceID, notification.SessionType, extendMsg, err.Error()) continue } + } else { var reactionExtensionList = make(map[string]*server_api_params.KeyValue) for _, v := range notification.SuccessReactionExtensionList { diff --git a/internal/rpc/msg/pull_message.go b/internal/rpc/msg/pull_message.go index 83f2275f2..ae482523b 100644 --- a/internal/rpc/msg/pull_message.go +++ b/internal/rpc/msg/pull_message.go @@ -85,7 +85,7 @@ func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *open_im_sdk.Pull } else { log.Debug(in.OperationID, "get message from redis is nil", failedSeqList) } - msgList, err1 := commonDB.DB.GetSuperGroupMsgBySeqListMongo(k, failedSeqList, in.OperationID) + msgList, _, err1 := commonDB.DB.GetSuperGroupMsgBySeqListMongo(k, failedSeqList, in.OperationID) if err1 != nil { promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList)) log.Error(in.OperationID, "PullMessageBySeqList data error", in.String(), err1.Error()) diff --git a/internal/rpc/msg/query_msg.go b/internal/rpc/msg/query_msg.go index f8a2afc7c..fea05114d 100644 --- a/internal/rpc/msg/query_msg.go +++ b/internal/rpc/msg/query_msg.go @@ -21,7 +21,7 @@ func (rpc *rpcChat) GetSuperGroupMsg(context context.Context, req *msg.GetSuperG } else { log.Debug(req.OperationID, "get message from redis is nil", failedSeqList) } - msgList, err1 := commonDB.DB.GetSuperGroupMsgBySeqListMongo(req.GroupID, failedSeqList, req.OperationID) + msgList, _, err1 := commonDB.DB.GetSuperGroupMsgBySeqListMongo(req.GroupID, failedSeqList, req.OperationID) if err1 != nil { promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList)) log.Error(req.OperationID, "GetSuperGroupMsg data error", req.String(), err.Error()) diff --git a/pkg/base_info/msg.go b/pkg/base_info/msg.go index d3ece59f5..09d5a0821 100644 --- a/pkg/base_info/msg.go +++ b/pkg/base_info/msg.go @@ -131,6 +131,7 @@ type ReactionMessageModifierNotification struct { IsReact bool `json:"isReact"` IsExternalExtensions bool `json:"isExternalExtensions"` MsgFirstModifyTime int64 `json:"msgFirstModifyTime"` + Seq uint32 `json:"seq"` } type ReactionMessageDeleteNotification struct { diff --git a/pkg/common/db/mongoModel.go b/pkg/common/db/mongoModel.go index 22165efd3..61ad642b3 100644 --- a/pkg/common/db/mongoModel.go +++ b/pkg/common/db/mongoModel.go @@ -137,7 +137,7 @@ func (d *DataBases) DelMsgBySeqListInOneDoc(suffixUserID string, seqList []uint3 return nil, utils.Wrap(err, "") } for i, v := range seqMsgList { - if err := d.ReplaceMsgByIndex(suffixUserID, v, operationID, indexList[i]); err != nil { + if err := d.DelMsgByIndex(suffixUserID, v, operationID, indexList[i]); err != nil { return nil, utils.Wrap(err, "") } } @@ -161,7 +161,7 @@ func (d *DataBases) DelMsgLogic(uid string, seqList []uint32, operationID string return nil } -func (d *DataBases) ReplaceMsgByIndex(suffixUserID string, msg *open_im_sdk.MsgData, operationID string, seqIndex int) error { +func (d *DataBases) DelMsgByIndex(suffixUserID string, msg *open_im_sdk.MsgData, operationID string, seqIndex int) error { log.NewInfo(operationID, utils.GetSelfFuncName(), suffixUserID, *msg) ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) @@ -195,7 +195,6 @@ func (d *DataBases) ReplaceMsgBySeq(uid string, msg *open_im_sdk.MsgData, operat log.NewError(operationID, utils.GetSelfFuncName(), "proto marshal", err.Error()) return utils.Wrap(err, "") } - updateResult, err := c.UpdateOne( ctx, bson.M{"uid": uid}, bson.M{"$set": bson.M{s: bytes}}) @@ -207,6 +206,48 @@ func (d *DataBases) ReplaceMsgBySeq(uid string, msg *open_im_sdk.MsgData, operat return nil } +func (d *DataBases) GetMsgBySeqIndex(uid string, seq uint32, operationID string) (*open_im_sdk.MsgData, error) { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) + uid = getSeqUid(uid, seq) + seqIndex := getMsgIndex(seq) + result, err := c.Find(ctx, bson.M{"msg": bson.M{"$slice": []int{seqIndex, 1}}}) + if err != nil { + return nil, err + } + var msgInfos []MsgInfo + if err := result.Decode(&msgInfos); err != nil { + return nil, err + } + if len(msgInfos) < 1 { + return nil, ErrMsgListNotExist + } + var msg open_im_sdk.MsgData + if err := proto.Unmarshal(msgInfos[0].Msg, &msg); err != nil { + return nil, err + } + return &msg, nil +} + +func (d *DataBases) ReplaceMsgByIndex(uid string, msg *open_im_sdk.MsgData, index int) error { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) + uid = getSeqUid(uid, msg.Seq) + seqIndex := getMsgIndex(msg.Seq) + s := fmt.Sprintf("msg.%d.msg", seqIndex) + bytes, err := proto.Marshal(msg) + if err != nil { + return utils.Wrap(err, "") + } + _, err = c.UpdateOne( + ctx, bson.M{"uid": uid}, + bson.M{"$set": bson.M{s: bytes}}) + if err != nil { + return utils.Wrap(err, "") + } + return nil +} + func (d *DataBases) UpdateOneMsgList(msg *UserChat) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) @@ -441,8 +482,9 @@ func (d *DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operatio } return seqMsg, nil } -func (d *DataBases) GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) { +func (d *DataBases) GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, indexes map[uint32]int, err error) { var hasSeqList []uint32 + indexes = make(map[uint32]int) singleCount := 0 ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) @@ -471,10 +513,11 @@ func (d *DataBases) GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uin msg := new(open_im_sdk.MsgData) if err = proto.Unmarshal(sChat.Msg[i].Msg, msg); err != nil { log.NewError(operationID, "Unmarshal err", seqUid, value, groupID, seqList, err.Error()) - return nil, err + return nil, nil, err } if isContainInt32(msg.Seq, value) { seqMsg = append(seqMsg, msg) + indexes[msg.Seq] = i hasSeqList = append(hasSeqList, msg.Seq) singleCount++ if singleCount == len(value) { @@ -488,9 +531,8 @@ func (d *DataBases) GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uin diff = utils.Difference(hasSeqList, seqList) exceptionMSg := genExceptionSuperGroupMessageBySeqList(diff, groupID) seqMsg = append(seqMsg, exceptionMSg...) - } - return seqMsg, nil + return seqMsg, nil, nil } func (d *DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, indexList []int, unexistSeqList []uint32, err error) {