fix: correctly aggregate read seqs by conversation and user before DB update.

pull/3442/head
Gordon 3 months ago
parent 28265f3097
commit d3b328dc1e

@ -26,6 +26,8 @@ import (
"github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery"
"github.com/go-redis/redis" "github.com/go-redis/redis"
"google.golang.org/protobuf/proto"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
@ -37,7 +39,6 @@ import (
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/stringutil" "github.com/openimsdk/tools/utils/stringutil"
"google.golang.org/protobuf/proto"
) )
const ( const (
@ -134,53 +135,48 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) { func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) {
var conversationID string // Outer map: conversationID -> (userID -> maxHasReadSeq)
var userSeqMap map[string]int64 conversationUserSeq := make(map[string]map[string]int64)
for _, msg := range msgs { for _, msg := range msgs {
if msg.message.ContentType != constant.HasReadReceipt { if msg.message.ContentType != constant.HasReadReceipt {
continue continue
} }
var elem sdkws.NotificationElem var elem sdkws.NotificationElem
if err := json.Unmarshal(msg.message.Content, &elem); err != nil { if err := json.Unmarshal(msg.message.Content, &elem); err != nil {
log.ZWarn(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg) log.ZWarn(ctx, "Unmarshal NotificationElem error", err, "msg", msg)
continue continue
} }
var tips sdkws.MarkAsReadTips var tips sdkws.MarkAsReadTips
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil { if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
log.ZWarn(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg) log.ZWarn(ctx, "Unmarshal MarkAsReadTips error", err, "msg", msg)
continue continue
} }
//The conversation ID for each batch of messages processed by the batcher is the same. if len(tips.ConversationID) == 0 || tips.HasReadSeq < 0 {
conversationID = tips.ConversationID
if len(tips.Seqs) > 0 {
for _, seq := range tips.Seqs {
if tips.HasReadSeq < seq {
tips.HasReadSeq = seq
}
}
clear(tips.Seqs)
tips.Seqs = nil
}
if tips.HasReadSeq < 0 {
continue continue
} }
if userSeqMap == nil {
userSeqMap = make(map[string]int64) // Calculate the max seq from tips.Seqs
for _, seq := range tips.Seqs {
if tips.HasReadSeq < seq {
tips.HasReadSeq = seq
}
} }
if userSeqMap[tips.MarkAsReadUserID] > tips.HasReadSeq { if _, ok := conversationUserSeq[tips.ConversationID]; !ok {
continue conversationUserSeq[tips.ConversationID] = make(map[string]int64)
}
if conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] < tips.HasReadSeq {
conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] = tips.HasReadSeq
} }
userSeqMap[tips.MarkAsReadUserID] = tips.HasReadSeq
}
if userSeqMap == nil {
return
}
if len(conversationID) == 0 {
log.ZWarn(ctx, "conversation err", nil, "conversationID", conversationID)
} }
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, conversationID, userSeqMap); err != nil { log.ZInfo(ctx, "doSetReadSeq", "conversationUserSeq", conversationUserSeq)
log.ZWarn(ctx, "set read seq to db error", err, "conversationID", conversationID, "userSeqMap", userSeqMap)
// persist to db
for convID, userSeqMap := range conversationUserSeq {
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, convID, userSeqMap); err != nil {
log.ZWarn(ctx, "SetHasReadSeqToDB error", err, "conversationID", convID, "userSeqMap", userSeqMap)
}
} }
} }

Loading…
Cancel
Save