fix:fix error values&logs

pull/1476/head
lin.huang 2 years ago
parent 7a13284b2e
commit a1cf135e69

@ -252,7 +252,10 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(
return return
} }
log.ZDebug(ctx, "success to next topic", "conversationID", conversationID) log.ZDebug(ctx, "success to next topic", "conversationID", conversationID)
och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq) err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq)
if err != nil {
log.ZError(ctx, "MsgToMongoMQ error", err)
}
och.toPushTopic(ctx, key, conversationID, storageList) och.toPushTopic(ctx, key, conversationID, storageList)
} }
} }
@ -277,9 +280,6 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(
lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList) lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && errs.Unwrap(err) != redis.Nil {
log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageList) log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageList)
och.singleMsgFailedCountMutex.Lock()
och.singleMsgFailedCount += uint64(len(storageList))
och.singleMsgFailedCountMutex.Unlock()
return return
} }
if isNewConversation { if isNewConversation {
@ -311,10 +311,10 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(
} }
log.ZDebug(ctx, "success incr to next topic") log.ZDebug(ctx, "success incr to next topic")
och.singleMsgSuccessCountMutex.Lock() err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq)
och.singleMsgSuccessCount += uint64(len(storageList)) if err != nil {
och.singleMsgSuccessCountMutex.Unlock() log.ZError(ctx, "MsgToMongoMQ error", err)
och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq) }
och.toPushTopic(ctx, key, conversationID, storageList) och.toPushTopic(ctx, key, conversationID, storageList)
} }
} }

@ -384,7 +384,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum)) prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum))
log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID) log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID)
} else { } else {
prommetrics.MsgInsertRedisSuccessCounter.Inc() prommetrics.MsgInsertRedisSuccessCounter.Add(float64(len(msgs)))
} }
err = db.cache.SetMaxSeq(ctx, conversationID, currentMaxSeq) err = db.cache.SetMaxSeq(ctx, conversationID, currentMaxSeq)
if err != nil { if err != nil {
@ -392,11 +392,11 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
prommetrics.SeqSetFailedCounter.Inc() prommetrics.SeqSetFailedCounter.Inc()
} }
err2 := db.cache.SetHasReadSeqs(ctx, conversationID, userSeqMap) err2 := db.cache.SetHasReadSeqs(ctx, conversationID, userSeqMap)
if err != nil { if err2 != nil {
log.ZError(ctx, "SetHasReadSeqs error", err2, "userSeqMap", userSeqMap, "conversationID", conversationID) log.ZError(ctx, "SetHasReadSeqs error", err2, "userSeqMap", userSeqMap, "conversationID", conversationID)
prommetrics.SeqSetFailedCounter.Inc() prommetrics.SeqSetFailedCounter.Inc()
} }
return lastMaxSeq, isNew, utils.Wrap(err, "") return lastMaxSeq, isNew, errs.Wrap(err, "redis SetMaxSeq error")
} }
func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) {

Loading…
Cancel
Save