@ -26,10 +26,12 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/tools/discovery"
"github.com/go-redis/redis"
"google.golang.org/protobuf/proto"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/IBM/sarama"
"github.com/go-redis/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
@ -41,7 +43,6 @@ import (
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/stringutil"
"google.golang.org/protobuf/proto"
)
const (
@ -140,53 +141,48 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
func ( och * OnlineHistoryRedisConsumerHandler ) doSetReadSeq ( ctx context . Context , msgs [ ] * ContextMsg ) {
var conversationID string
var userSeqMap map [ string ] int64
// Outer map: conversationID -> (userID -> maxHasReadSeq)
conversationUserSeq := make ( map [ string ] map [ string ] int64 )
for _ , msg := range msgs {
if msg . message . ContentType != constant . HasReadReceipt {
continue
}
var elem sdkws . NotificationElem
if err := json . Unmarshal ( msg . message . Content , & elem ) ; err != nil {
log . ZWarn ( ctx , " handlerConversationRead Unmarshal NotificationElem msg err", err , "msg" , msg )
log . ZWarn ( ctx , " Unmarshal NotificationElem erro r", err , "msg" , msg )
continue
}
var tips sdkws . MarkAsReadTips
if err := json . Unmarshal ( [ ] byte ( elem . Detail ) , & tips ) ; err != nil {
log . ZWarn ( ctx , " handlerConversationRead Unmarshal MarkAsReadTips msg err", err , "msg" , msg )
log . ZWarn ( ctx , " Unmarshal MarkAsReadTips erro r", err , "msg" , msg )
continue
}
//The conversation ID for each batch of messages processed by the batcher is the same.
conversationID = tips . ConversationID
if len ( tips . Seqs ) > 0 {
for _ , seq := range tips . Seqs {
if tips . HasReadSeq < seq {
tips . HasReadSeq = seq
}
}
clear ( tips . Seqs )
tips . Seqs = nil
}
if tips . HasReadSeq < 0 {
if len ( tips . ConversationID ) == 0 || tips . HasReadSeq < 0 {
continue
}
if userSeqMap == nil {
userSeqMap = make ( map [ string ] int64 )
// Calculate the max seq from tips.Seqs
for _ , seq := range tips . Seqs {
if tips . HasReadSeq < seq {
tips . HasReadSeq = seq
}
}
if userSeqMap [ tips . MarkAsReadUserID ] > tips . HasReadSeq {
continue
if _ , ok := conversationUserSeq [ tips . ConversationID ] ; ! ok {
conversationUserSeq [ tips . ConversationID ] = make ( map [ string ] int64 )
}
if conversationUserSeq [ tips . ConversationID ] [ tips . MarkAsReadUserID ] < tips . HasReadSeq {
conversationUserSeq [ tips . ConversationID ] [ tips . MarkAsReadUserID ] = tips . HasReadSeq
}
userSeqMap [ tips . MarkAsReadUserID ] = tips . HasReadSeq
}
if userSeqMap == nil {
return
}
if len ( conversationID ) == 0 {
log . ZWarn ( ctx , "conversation err" , nil , "conversationID" , conversationID )
}
if err := och . msgTransferDatabase . SetHasReadSeqToDB ( ctx , conversationID , userSeqMap ) ; err != nil {
log . ZWarn ( ctx , "set read seq to db error" , err , "conversationID" , conversationID , "userSeqMap" , userSeqMap )
log . ZInfo ( ctx , "doSetReadSeq" , "conversationUserSeq" , conversationUserSeq )
// persist to db
for convID , userSeqMap := range conversationUserSeq {
if err := och . msgTransferDatabase . SetHasReadSeqToDB ( ctx , convID , userSeqMap ) ; err != nil {
log . ZWarn ( ctx , "SetHasReadSeqToDB error" , err , "conversationID" , convID , "userSeqMap" , userSeqMap )
}
}
}