|
|
|
@ -9,6 +9,7 @@ import (
|
|
|
|
|
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
|
|
|
|
pbMsg "Open_IM/pkg/proto/chat"
|
|
|
|
|
pbPush "Open_IM/pkg/proto/push"
|
|
|
|
|
server_api_params "Open_IM/pkg/proto/sdk_ws"
|
|
|
|
|
"Open_IM/pkg/utils"
|
|
|
|
|
"context"
|
|
|
|
|
"errors"
|
|
|
|
@ -103,7 +104,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
|
|
|
|
|
msgList := msgChannelValue.msgList
|
|
|
|
|
triggerID := msgChannelValue.triggerID
|
|
|
|
|
storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
|
|
|
|
|
noStoragepushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
|
|
|
|
|
notStoragepushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80)
|
|
|
|
|
log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.userID, len(msgList))
|
|
|
|
|
for _, v := range msgList {
|
|
|
|
|
log.Debug(triggerID, "msg come to storage center", v.String())
|
|
|
|
@ -114,7 +115,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
|
|
|
|
|
//log.NewWarn(triggerID, "storageMsgList to mongodb client msgID: ", v.MsgData.ClientMsgID)
|
|
|
|
|
} else {
|
|
|
|
|
if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) {
|
|
|
|
|
noStoragepushMsgList = append(noStoragepushMsgList, v)
|
|
|
|
|
notStoragepushMsgList = append(notStoragepushMsgList, v)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -128,7 +129,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
|
|
|
|
|
// log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
|
|
|
|
|
// return
|
|
|
|
|
//}
|
|
|
|
|
log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(noStoragepushMsgList))
|
|
|
|
|
log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(notStoragepushMsgList))
|
|
|
|
|
err, lastSeq := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
singleMsgFailedCount += uint64(len(storageMsgList))
|
|
|
|
@ -146,7 +147,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
|
|
|
|
|
sendMessageToPush(x, msgChannelValue.userID)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}(noStoragepushMsgList, storageMsgList)
|
|
|
|
|
}(notStoragepushMsgList, storageMsgList)
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -175,6 +176,26 @@ func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) {
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.NewError(triggerID, "single data insert to mongo err", err.Error(), msgList)
|
|
|
|
|
}
|
|
|
|
|
for _, v := range msgList {
|
|
|
|
|
if v.MsgData.ContentType == constant.DeleteMessageNotification {
|
|
|
|
|
tips := server_api_params.TipsComm{}
|
|
|
|
|
DeleteMessageTips := server_api_params.DeleteMessageTips{}
|
|
|
|
|
err := proto.Unmarshal(v.MsgData.Content, &tips)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.NewError(triggerID, "tips unmarshal err:", err.Error(), v.String())
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
err = proto.Unmarshal(tips.Detail, &DeleteMessageTips)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.NewError(triggerID, "deleteMessageTips unmarshal err:", err.Error(), v.String())
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if err := db.DB.DelMsgBySeqList(DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID); err != nil {
|
|
|
|
|
log.NewError(v.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", DeleteMessageTips.UserID, DeleteMessageTips.SeqList, v.OperationID, err.Error())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|