|
|
|
@ -68,6 +68,42 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(cMsg *sarama.ConsumerMessage, msg
|
|
|
|
|
log.NewInfo(msgDataToMQ.OperationID, "msg:", notification, "this is external extensions")
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if msgDataToMQ.MsgData.SessionType == constant.SuperGroupChatType && utils.GetSwitchFromOptions(msgDataToMQ.MsgData.Options, constant.IsHistory) {
|
|
|
|
|
if msgDataToMQ.MsgData.Seq == 0 {
|
|
|
|
|
log.NewError(msgDataToMQ.OperationID, "seq==0, error msg", msgDataToMQ.MsgData)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
msg, err := db.DB.GetMsgBySeqIndex(notification.SourceID, notification.Seq, msgDataToMQ.OperationID)
|
|
|
|
|
if (msg != nil && msg.Seq != notification.Seq) || err != nil {
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.NewError(msgDataToMQ.OperationID, "GetMsgBySeqIndex failed", notification, err.Error())
|
|
|
|
|
}
|
|
|
|
|
msgs, indexes, err := db.DB.GetSuperGroupMsgBySeqListMongo(notification.SourceID, []uint32{notification.Seq}, msgDataToMQ.OperationID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.NewError(msgDataToMQ.OperationID, "GetSuperGroupMsgBySeqListMongo failed", notification, err.Error())
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
var index int
|
|
|
|
|
if len(msgs) < 1 || len(indexes) < 1 {
|
|
|
|
|
log.NewError(msgDataToMQ.OperationID, "GetSuperGroupMsgBySeqListMongo failed", notification, "len<1", msgs, indexes)
|
|
|
|
|
continue
|
|
|
|
|
} else {
|
|
|
|
|
msg = msgs[0]
|
|
|
|
|
index = indexes[msg.Seq]
|
|
|
|
|
}
|
|
|
|
|
if err := db.DB.ReplaceMsgByIndex(notification.SourceID, msg, index); err != nil {
|
|
|
|
|
log.NewError(msgDataToMQ.OperationID, "ReplaceMsgByIndex failed", notification.SourceID, *msg)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
msg.IsReact = true
|
|
|
|
|
if err = db.DB.ReplaceMsgBySeq(notification.SourceID, msg, msgDataToMQ.OperationID); err != nil {
|
|
|
|
|
log.NewError(msgDataToMQ.OperationID, "ReplaceMsgBySeq failed", notification.SourceID, *msg)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !notification.IsReact {
|
|
|
|
|
// first time to modify
|
|
|
|
|
var reactionExtensionList = make(map[string]db.KeyValue)
|
|
|
|
@ -84,41 +120,6 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(cMsg *sarama.ConsumerMessage, msg
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// modify old msg
|
|
|
|
|
if msgDataToMQ.MsgData.SessionType == constant.SuperGroupChatType && utils.GetSwitchFromOptions(msgDataToMQ.MsgData.Options, constant.IsHistory) {
|
|
|
|
|
if msgDataToMQ.MsgData.Seq == 0 {
|
|
|
|
|
log.NewError(msgDataToMQ.OperationID, "seq==0, error msg", msgDataToMQ.MsgData)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
msg, err := db.DB.GetMsgBySeqIndex(notification.SourceID, notification.Seq, msgDataToMQ.OperationID)
|
|
|
|
|
if (msg != nil && msg.Seq != notification.Seq) || err != nil {
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.NewError(msgDataToMQ.OperationID, "GetMsgBySeqIndex failed", notification, err.Error())
|
|
|
|
|
}
|
|
|
|
|
msgs, indexes, err := db.DB.GetSuperGroupMsgBySeqListMongo(notification.SourceID, []uint32{notification.Seq}, msgDataToMQ.OperationID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.NewError(msgDataToMQ.OperationID, "GetSuperGroupMsgBySeqListMongo failed", notification, err.Error())
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
var index int
|
|
|
|
|
if len(msgs) < 1 || len(indexes) < 1 {
|
|
|
|
|
log.NewError(msgDataToMQ.OperationID, "GetSuperGroupMsgBySeqListMongo failed", notification, "len<1", msgs, indexes)
|
|
|
|
|
continue
|
|
|
|
|
} else {
|
|
|
|
|
msg = msgs[0]
|
|
|
|
|
index = indexes[msg.Seq]
|
|
|
|
|
}
|
|
|
|
|
if err := db.DB.ReplaceMsgByIndex(notification.SourceID, msg, index); err != nil {
|
|
|
|
|
log.NewError(msgDataToMQ.OperationID, "ReplaceMsgByIndex failed", notification.SourceID, *msg)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
msg.IsReact = true
|
|
|
|
|
if err = db.DB.ReplaceMsgBySeq(notification.SourceID, msg, msgDataToMQ.OperationID); err != nil {
|
|
|
|
|
log.NewError(msgDataToMQ.OperationID, "ReplaceMsgBySeq failed", notification.SourceID, *msg)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := db.DB.InsertExtendMsg(notification.SourceID, notification.SessionType, &extendMsg); err != nil {
|
|
|
|
|
log.NewError(msgDataToMQ.OperationID, "MsgFirstModify InsertExtendMsg failed", notification.SourceID, notification.SessionType, extendMsg, err.Error())
|
|
|
|
|
continue
|
|
|
|
|