From a1cf135e694b9541fead98a8210f0febdaf949b2 Mon Sep 17 00:00:00 2001 From: "lin.huang" Date: Thu, 23 Nov 2023 15:55:37 +0800 Subject: [PATCH] fix:fix error values&logs --- .../msgtransfer/online_history_msg_handler.go | 16 ++++++++-------- pkg/common/db/controller/msg.go | 6 +++--- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index b019b0120..eb8e500fe 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -252,7 +252,10 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification( return } log.ZDebug(ctx, "success to next topic", "conversationID", conversationID) - och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq) + err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq) + if err != nil { + log.ZError(ctx, "MsgToMongoMQ error", err) + } och.toPushTopic(ctx, key, conversationID, storageList) } } @@ -277,9 +280,6 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg( lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList) if err != nil && errs.Unwrap(err) != redis.Nil { log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageList) - och.singleMsgFailedCountMutex.Lock() - och.singleMsgFailedCount += uint64(len(storageList)) - och.singleMsgFailedCountMutex.Unlock() return } if isNewConversation { @@ -311,10 +311,10 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg( } log.ZDebug(ctx, "success incr to next topic") - och.singleMsgSuccessCountMutex.Lock() - och.singleMsgSuccessCount += uint64(len(storageList)) - och.singleMsgSuccessCountMutex.Unlock() - och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq) + err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq) + if err != nil { + log.ZError(ctx, "MsgToMongoMQ error", err) + } och.toPushTopic(ctx, key, conversationID, storageList) } } diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index fb0a9c702..fa1f400a6 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -384,7 +384,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum)) log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID) } else { - prommetrics.MsgInsertRedisSuccessCounter.Inc() + prommetrics.MsgInsertRedisSuccessCounter.Add(float64(len(msgs))) } err = db.cache.SetMaxSeq(ctx, conversationID, currentMaxSeq) if err != nil { @@ -392,11 +392,11 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa prommetrics.SeqSetFailedCounter.Inc() } err2 := db.cache.SetHasReadSeqs(ctx, conversationID, userSeqMap) - if err != nil { + if err2 != nil { log.ZError(ctx, "SetHasReadSeqs error", err2, "userSeqMap", userSeqMap, "conversationID", conversationID) prommetrics.SeqSetFailedCounter.Inc() } - return lastMaxSeq, isNew, utils.Wrap(err, "") + return lastMaxSeq, isNew, errs.Wrap(err, "redis SetMaxSeq error") } func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) {