diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 905d39b10..8568f32b4 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -97,7 +97,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { msgList := msgChannelValue.msgList triggerID := msgChannelValue.triggerID storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) - pushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) + noStoragepushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.userID, len(msgList)) for _, v := range msgList { log.Debug(triggerID, "msg come to storage center", v.String()) @@ -106,10 +106,12 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { if isHistory { storageMsgList = append(storageMsgList, v) log.NewWarn(triggerID, "storageMsgList to mongodb client msgID: ", v.MsgData.ClientMsgID) + } else { + if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) { + noStoragepushMsgList = append(noStoragepushMsgList, v) + } } - if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) { - pushMsgList = append(pushMsgList, v) - } + } //switch msgChannelValue.msg.MsgData.SessionType { @@ -120,7 +122,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { // log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) // return //} - log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(pushMsgList)) + log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(noStoragepushMsgList)) err := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID) if err != nil { singleMsgFailedCount += uint64(len(storageMsgList)) @@ -129,15 +131,15 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { singleMsgSuccessCountMutex.Lock() singleMsgSuccessCount += uint64(len(storageMsgList)) singleMsgSuccessCountMutex.Unlock() - //go func(push,storage []*pbMsg.MsgDataToMQ) { - // for _, v := range storage { - // sendMessageToPush(v, msgChannelValue.userID) - // } - // for _, x := range utils.DifferenceString() { - // sendMessageToPush(v, msgChannelValue.userID) - // } - // - //}(pushMsgList,storageMsgList) + go func(push, storage []*pbMsg.MsgDataToMQ) { + for _, v := range storage { + sendMessageToPush(v, msgChannelValue.userID) + } + for _, x := range push { + sendMessageToPush(x, msgChannelValue.userID) + } + + }(noStoragepushMsgList, storageMsgList) } }