diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 35cb2a0e8..7d74d4c22 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -97,36 +97,50 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { // return //} log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(notStoragePushMsgList)) - err, lastSeq := saveUserChatList(msgChannelValue.aggregationID, storageMsgList, triggerID) - if err != nil { - singleMsgFailedCount += uint64(len(storageMsgList)) - log.NewError(triggerID, "single data insert to redis err", err.Error(), storageMsgList) + if len(storageMsgList) > 0 { + err, lastSeq := saveUserChatList(msgChannelValue.aggregationID, storageMsgList, triggerID) + if err != nil { + singleMsgFailedCount += uint64(len(storageMsgList)) + log.NewError(triggerID, "single data insert to redis err", err.Error(), storageMsgList) + } else { + singleMsgSuccessCountMutex.Lock() + singleMsgSuccessCount += uint64(len(storageMsgList)) + singleMsgSuccessCountMutex.Unlock() + och.SendMessageToMongoCH(msgChannelValue.aggregationID, triggerID, storageMsgList, lastSeq) + go func(push, storage []*pbMsg.MsgDataToMQ) { + for _, v := range storage { + sendMessageToPush(v, msgChannelValue.aggregationID) + } + for _, x := range push { + sendMessageToPush(x, msgChannelValue.aggregationID) + } + + }(notStoragePushMsgList, storageMsgList) + + } + } else { - singleMsgSuccessCountMutex.Lock() - singleMsgSuccessCount += uint64(len(storageMsgList)) - singleMsgSuccessCountMutex.Unlock() - och.SendMessageToMongoCH(msgChannelValue.aggregationID, triggerID, storageMsgList, lastSeq) - go func(push, storage []*pbMsg.MsgDataToMQ) { - for _, v := range storage { - sendMessageToPush(v, msgChannelValue.aggregationID) - } + go func(push []*pbMsg.MsgDataToMQ) { for _, x := range push { sendMessageToPush(x, msgChannelValue.aggregationID) } - - }(notStoragePushMsgList, storageMsgList) + }(notStoragePushMsgList) } } + } } + } func (och *OnlineHistoryRedisConsumerHandler) SendMessageToMongoCH(aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) { - pid, offset, err := producerToMongo.SendMessage(&pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID, triggerID) - if err != nil { - log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID) - } else { - // log.NewWarn(m.OperationID, "sendMsgToKafka client msgID ", m.MsgData.ClientMsgID) + if len(messages) > 0 { + pid, offset, err := producerToMongo.SendMessage(&pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID, triggerID) + if err != nil { + log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID) + } else { + // log.NewWarn(m.OperationID, "sendMsgToKafka client msgID ", m.MsgData.ClientMsgID) + } } //hashCode := getHashCode(aggregationID) //channelID := hashCode % ChannelNum