pull/344/head
wangchuxiao 2 years ago
parent f384bb2aff
commit 59d1090a6d

@ -83,11 +83,47 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(cMsg *sarama.ConsumerMessage, msg
LatestUpdateTime: v.LatestUpdateTime, LatestUpdateTime: v.LatestUpdateTime,
} }
} }
// modify old msg
if msgDataToMQ.MsgData.SessionType == constant.SuperGroupChatType && utils.GetSwitchFromOptions(msgDataToMQ.MsgData.Options, constant.IsHistory) {
if msgDataToMQ.MsgData.Seq == 0 {
log.NewError(msgDataToMQ.OperationID, "seq==0, error msg", msgDataToMQ.MsgData)
continue
}
msg, err := db.DB.GetMsgBySeqIndex(notification.SourceID, notification.Seq, msgDataToMQ.OperationID)
if (msg != nil && msg.Seq != notification.Seq) || err != nil {
if err != nil {
log.NewError(msgDataToMQ.OperationID, "GetMsgBySeqIndex failed", notification, err.Error())
}
msgs, indexes, err := db.DB.GetSuperGroupMsgBySeqListMongo(notification.SourceID, []uint32{notification.Seq}, msgDataToMQ.OperationID)
if err != nil {
log.NewError(msgDataToMQ.OperationID, "GetSuperGroupMsgBySeqListMongo failed", notification, err.Error())
continue
}
var index int
if len(msgs) < 1 || len(indexes) < 1 {
log.NewError(msgDataToMQ.OperationID, "GetSuperGroupMsgBySeqListMongo failed", notification, "len<1", msgs, indexes)
continue
} else {
msg = msgs[0]
index = indexes[msg.Seq]
}
if err := db.DB.ReplaceMsgByIndex(notification.SourceID, msg, index); err != nil {
log.NewError(msgDataToMQ.OperationID, "ReplaceMsgByIndex failed", notification.SourceID, *msg)
}
} else {
msg.IsReact = true
if err = db.DB.ReplaceMsgBySeq(notification.SourceID, msg, msgDataToMQ.OperationID); err != nil {
log.NewError(msgDataToMQ.OperationID, "ReplaceMsgBySeq failed", notification.SourceID, *msg)
}
}
}
if err := db.DB.InsertExtendMsg(notification.SourceID, notification.SessionType, &extendMsg); err != nil { if err := db.DB.InsertExtendMsg(notification.SourceID, notification.SessionType, &extendMsg); err != nil {
log.NewError(msgDataToMQ.OperationID, "MsgFirstModify InsertExtendMsg failed", notification.SourceID, notification.SessionType, extendMsg, err.Error()) log.NewError(msgDataToMQ.OperationID, "MsgFirstModify InsertExtendMsg failed", notification.SourceID, notification.SessionType, extendMsg, err.Error())
continue continue
} }
} else { } else {
var reactionExtensionList = make(map[string]*server_api_params.KeyValue) var reactionExtensionList = make(map[string]*server_api_params.KeyValue)
for _, v := range notification.SuccessReactionExtensionList { for _, v := range notification.SuccessReactionExtensionList {

@ -85,7 +85,7 @@ func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *open_im_sdk.Pull
} else { } else {
log.Debug(in.OperationID, "get message from redis is nil", failedSeqList) log.Debug(in.OperationID, "get message from redis is nil", failedSeqList)
} }
msgList, err1 := commonDB.DB.GetSuperGroupMsgBySeqListMongo(k, failedSeqList, in.OperationID) msgList, _, err1 := commonDB.DB.GetSuperGroupMsgBySeqListMongo(k, failedSeqList, in.OperationID)
if err1 != nil { if err1 != nil {
promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList)) promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList))
log.Error(in.OperationID, "PullMessageBySeqList data error", in.String(), err1.Error()) log.Error(in.OperationID, "PullMessageBySeqList data error", in.String(), err1.Error())

@ -21,7 +21,7 @@ func (rpc *rpcChat) GetSuperGroupMsg(context context.Context, req *msg.GetSuperG
} else { } else {
log.Debug(req.OperationID, "get message from redis is nil", failedSeqList) log.Debug(req.OperationID, "get message from redis is nil", failedSeqList)
} }
msgList, err1 := commonDB.DB.GetSuperGroupMsgBySeqListMongo(req.GroupID, failedSeqList, req.OperationID) msgList, _, err1 := commonDB.DB.GetSuperGroupMsgBySeqListMongo(req.GroupID, failedSeqList, req.OperationID)
if err1 != nil { if err1 != nil {
promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList)) promePkg.PromeAdd(promePkg.MsgPullFromMongoFailedCounter, len(failedSeqList))
log.Error(req.OperationID, "GetSuperGroupMsg data error", req.String(), err.Error()) log.Error(req.OperationID, "GetSuperGroupMsg data error", req.String(), err.Error())

@ -131,6 +131,7 @@ type ReactionMessageModifierNotification struct {
IsReact bool `json:"isReact"` IsReact bool `json:"isReact"`
IsExternalExtensions bool `json:"isExternalExtensions"` IsExternalExtensions bool `json:"isExternalExtensions"`
MsgFirstModifyTime int64 `json:"msgFirstModifyTime"` MsgFirstModifyTime int64 `json:"msgFirstModifyTime"`
Seq uint32 `json:"seq"`
} }
type ReactionMessageDeleteNotification struct { type ReactionMessageDeleteNotification struct {

@ -137,7 +137,7 @@ func (d *DataBases) DelMsgBySeqListInOneDoc(suffixUserID string, seqList []uint3
return nil, utils.Wrap(err, "") return nil, utils.Wrap(err, "")
} }
for i, v := range seqMsgList { for i, v := range seqMsgList {
if err := d.ReplaceMsgByIndex(suffixUserID, v, operationID, indexList[i]); err != nil { if err := d.DelMsgByIndex(suffixUserID, v, operationID, indexList[i]); err != nil {
return nil, utils.Wrap(err, "") return nil, utils.Wrap(err, "")
} }
} }
@ -161,7 +161,7 @@ func (d *DataBases) DelMsgLogic(uid string, seqList []uint32, operationID string
return nil return nil
} }
func (d *DataBases) ReplaceMsgByIndex(suffixUserID string, msg *open_im_sdk.MsgData, operationID string, seqIndex int) error { func (d *DataBases) DelMsgByIndex(suffixUserID string, msg *open_im_sdk.MsgData, operationID string, seqIndex int) error {
log.NewInfo(operationID, utils.GetSelfFuncName(), suffixUserID, *msg) log.NewInfo(operationID, utils.GetSelfFuncName(), suffixUserID, *msg)
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
@ -195,7 +195,6 @@ func (d *DataBases) ReplaceMsgBySeq(uid string, msg *open_im_sdk.MsgData, operat
log.NewError(operationID, utils.GetSelfFuncName(), "proto marshal", err.Error()) log.NewError(operationID, utils.GetSelfFuncName(), "proto marshal", err.Error())
return utils.Wrap(err, "") return utils.Wrap(err, "")
} }
updateResult, err := c.UpdateOne( updateResult, err := c.UpdateOne(
ctx, bson.M{"uid": uid}, ctx, bson.M{"uid": uid},
bson.M{"$set": bson.M{s: bytes}}) bson.M{"$set": bson.M{s: bytes}})
@ -207,6 +206,48 @@ func (d *DataBases) ReplaceMsgBySeq(uid string, msg *open_im_sdk.MsgData, operat
return nil return nil
} }
func (d *DataBases) GetMsgBySeqIndex(uid string, seq uint32, operationID string) (*open_im_sdk.MsgData, error) {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
uid = getSeqUid(uid, seq)
seqIndex := getMsgIndex(seq)
result, err := c.Find(ctx, bson.M{"msg": bson.M{"$slice": []int{seqIndex, 1}}})
if err != nil {
return nil, err
}
var msgInfos []MsgInfo
if err := result.Decode(&msgInfos); err != nil {
return nil, err
}
if len(msgInfos) < 1 {
return nil, ErrMsgListNotExist
}
var msg open_im_sdk.MsgData
if err := proto.Unmarshal(msgInfos[0].Msg, &msg); err != nil {
return nil, err
}
return &msg, nil
}
func (d *DataBases) ReplaceMsgByIndex(uid string, msg *open_im_sdk.MsgData, index int) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
uid = getSeqUid(uid, msg.Seq)
seqIndex := getMsgIndex(msg.Seq)
s := fmt.Sprintf("msg.%d.msg", seqIndex)
bytes, err := proto.Marshal(msg)
if err != nil {
return utils.Wrap(err, "")
}
_, err = c.UpdateOne(
ctx, bson.M{"uid": uid},
bson.M{"$set": bson.M{s: bytes}})
if err != nil {
return utils.Wrap(err, "")
}
return nil
}
func (d *DataBases) UpdateOneMsgList(msg *UserChat) error { func (d *DataBases) UpdateOneMsgList(msg *UserChat) error {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
@ -441,8 +482,9 @@ func (d *DataBases) GetMsgBySeqListMongo2(uid string, seqList []uint32, operatio
} }
return seqMsg, nil return seqMsg, nil
} }
func (d *DataBases) GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) { func (d *DataBases) GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, indexes map[uint32]int, err error) {
var hasSeqList []uint32 var hasSeqList []uint32
indexes = make(map[uint32]int)
singleCount := 0 singleCount := 0
ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second)
c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat)
@ -471,10 +513,11 @@ func (d *DataBases) GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uin
msg := new(open_im_sdk.MsgData) msg := new(open_im_sdk.MsgData)
if err = proto.Unmarshal(sChat.Msg[i].Msg, msg); err != nil { if err = proto.Unmarshal(sChat.Msg[i].Msg, msg); err != nil {
log.NewError(operationID, "Unmarshal err", seqUid, value, groupID, seqList, err.Error()) log.NewError(operationID, "Unmarshal err", seqUid, value, groupID, seqList, err.Error())
return nil, err return nil, nil, err
} }
if isContainInt32(msg.Seq, value) { if isContainInt32(msg.Seq, value) {
seqMsg = append(seqMsg, msg) seqMsg = append(seqMsg, msg)
indexes[msg.Seq] = i
hasSeqList = append(hasSeqList, msg.Seq) hasSeqList = append(hasSeqList, msg.Seq)
singleCount++ singleCount++
if singleCount == len(value) { if singleCount == len(value) {
@ -488,9 +531,8 @@ func (d *DataBases) GetSuperGroupMsgBySeqListMongo(groupID string, seqList []uin
diff = utils.Difference(hasSeqList, seqList) diff = utils.Difference(hasSeqList, seqList)
exceptionMSg := genExceptionSuperGroupMessageBySeqList(diff, groupID) exceptionMSg := genExceptionSuperGroupMessageBySeqList(diff, groupID)
seqMsg = append(seqMsg, exceptionMSg...) seqMsg = append(seqMsg, exceptionMSg...)
} }
return seqMsg, nil return seqMsg, nil, nil
} }
func (d *DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, indexList []int, unexistSeqList []uint32, err error) { func (d *DataBases) GetMsgAndIndexBySeqListInOneMongo2(suffixUserID string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, indexList []int, unexistSeqList []uint32, err error) {

Loading…
Cancel
Save