From d3b328dc1ed47e83b0301a7caed685e88698e82e Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Thu, 26 Jun 2025 18:17:47 +0800 Subject: [PATCH] fix: correctly aggregate read seqs by conversation and user before DB update. --- .../msgtransfer/online_history_msg_handler.go | 56 +++++++++---------- 1 file changed, 26 insertions(+), 30 deletions(-) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index a2d0cca67..05775a1e6 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -26,6 +26,8 @@ import ( "github.com/openimsdk/tools/discovery" "github.com/go-redis/redis" + "google.golang.org/protobuf/proto" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" @@ -37,7 +39,6 @@ import ( "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/stringutil" - "google.golang.org/protobuf/proto" ) const ( @@ -134,53 +135,48 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) { - var conversationID string - var userSeqMap map[string]int64 + // Outer map: conversationID -> (userID -> maxHasReadSeq) + conversationUserSeq := make(map[string]map[string]int64) + for _, msg := range msgs { if msg.message.ContentType != constant.HasReadReceipt { continue } var elem sdkws.NotificationElem if err := json.Unmarshal(msg.message.Content, &elem); err != nil { - log.ZWarn(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg) + log.ZWarn(ctx, "Unmarshal NotificationElem error", err, "msg", msg) continue } var tips sdkws.MarkAsReadTips if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil { - log.ZWarn(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg) + log.ZWarn(ctx, "Unmarshal MarkAsReadTips error", err, "msg", msg) continue } - //The conversation ID for each batch of messages processed by the batcher is the same. - conversationID = tips.ConversationID - if len(tips.Seqs) > 0 { - for _, seq := range tips.Seqs { - if tips.HasReadSeq < seq { - tips.HasReadSeq = seq - } - } - clear(tips.Seqs) - tips.Seqs = nil - } - if tips.HasReadSeq < 0 { + if len(tips.ConversationID) == 0 || tips.HasReadSeq < 0 { continue } - if userSeqMap == nil { - userSeqMap = make(map[string]int64) + + // Calculate the max seq from tips.Seqs + for _, seq := range tips.Seqs { + if tips.HasReadSeq < seq { + tips.HasReadSeq = seq + } } - if userSeqMap[tips.MarkAsReadUserID] > tips.HasReadSeq { - continue + if _, ok := conversationUserSeq[tips.ConversationID]; !ok { + conversationUserSeq[tips.ConversationID] = make(map[string]int64) + } + if conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] < tips.HasReadSeq { + conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] = tips.HasReadSeq } - userSeqMap[tips.MarkAsReadUserID] = tips.HasReadSeq - } - if userSeqMap == nil { - return - } - if len(conversationID) == 0 { - log.ZWarn(ctx, "conversation err", nil, "conversationID", conversationID) } - if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, conversationID, userSeqMap); err != nil { - log.ZWarn(ctx, "set read seq to db error", err, "conversationID", conversationID, "userSeqMap", userSeqMap) + log.ZInfo(ctx, "doSetReadSeq", "conversationUserSeq", conversationUserSeq) + + // persist to db + for convID, userSeqMap := range conversationUserSeq { + if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, convID, userSeqMap); err != nil { + log.ZWarn(ctx, "SetHasReadSeqToDB error", err, "conversationID", convID, "userSeqMap", userSeqMap) + } } }