|
|
@ -115,10 +115,10 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
|
|
|
if len(storageMsgList) > 0 {
|
|
|
|
if len(storageMsgList) > 0 {
|
|
|
|
lastSeq, err := och.msgDatabase.BatchInsertChat2Cache(ctx, msgChannelValue.aggregationID, storageMsgList)
|
|
|
|
lastSeq, err := och.msgDatabase.BatchInsertChat2Cache(ctx, msgChannelValue.aggregationID, storageMsgList)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
log.NewError(triggerID, "single data insert to redis err", err.Error(), storageMsgList)
|
|
|
|
och.singleMsgFailedCountMutex.Lock()
|
|
|
|
och.singleMsgFailedCountMutex.Lock()
|
|
|
|
och.singleMsgFailedCount += uint64(len(storageMsgList))
|
|
|
|
och.singleMsgFailedCount += uint64(len(storageMsgList))
|
|
|
|
och.singleMsgFailedCountMutex.Unlock()
|
|
|
|
och.singleMsgFailedCountMutex.Unlock()
|
|
|
|
log.NewError(triggerID, "single data insert to redis err", err.Error(), storageMsgList)
|
|
|
|
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
och.singleMsgSuccessCountMutex.Lock()
|
|
|
|
och.singleMsgSuccessCountMutex.Lock()
|
|
|
|
och.singleMsgSuccessCount += uint64(len(storageMsgList))
|
|
|
|
och.singleMsgSuccessCount += uint64(len(storageMsgList))
|
|
|
|