|
|
|
@ -26,6 +26,8 @@ import (
|
|
|
|
|
"github.com/openimsdk/tools/discovery"
|
|
|
|
|
|
|
|
|
|
"github.com/go-redis/redis"
|
|
|
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
|
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
|
|
|
@ -37,7 +39,6 @@ import (
|
|
|
|
|
"github.com/openimsdk/tools/log"
|
|
|
|
|
"github.com/openimsdk/tools/mcontext"
|
|
|
|
|
"github.com/openimsdk/tools/utils/stringutil"
|
|
|
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
@ -134,53 +135,48 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) {
|
|
|
|
|
|
|
|
|
|
var conversationID string
|
|
|
|
|
var userSeqMap map[string]int64
|
|
|
|
|
// Outer map: conversationID -> (userID -> maxHasReadSeq)
|
|
|
|
|
conversationUserSeq := make(map[string]map[string]int64)
|
|
|
|
|
|
|
|
|
|
for _, msg := range msgs {
|
|
|
|
|
if msg.message.ContentType != constant.HasReadReceipt {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
var elem sdkws.NotificationElem
|
|
|
|
|
if err := json.Unmarshal(msg.message.Content, &elem); err != nil {
|
|
|
|
|
log.ZWarn(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg)
|
|
|
|
|
log.ZWarn(ctx, "Unmarshal NotificationElem error", err, "msg", msg)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
var tips sdkws.MarkAsReadTips
|
|
|
|
|
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
|
|
|
|
|
log.ZWarn(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg)
|
|
|
|
|
log.ZWarn(ctx, "Unmarshal MarkAsReadTips error", err, "msg", msg)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
//The conversation ID for each batch of messages processed by the batcher is the same.
|
|
|
|
|
conversationID = tips.ConversationID
|
|
|
|
|
if len(tips.Seqs) > 0 {
|
|
|
|
|
for _, seq := range tips.Seqs {
|
|
|
|
|
if tips.HasReadSeq < seq {
|
|
|
|
|
tips.HasReadSeq = seq
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
clear(tips.Seqs)
|
|
|
|
|
tips.Seqs = nil
|
|
|
|
|
}
|
|
|
|
|
if tips.HasReadSeq < 0 {
|
|
|
|
|
if len(tips.ConversationID) == 0 || tips.HasReadSeq < 0 {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if userSeqMap == nil {
|
|
|
|
|
userSeqMap = make(map[string]int64)
|
|
|
|
|
|
|
|
|
|
// Calculate the max seq from tips.Seqs
|
|
|
|
|
for _, seq := range tips.Seqs {
|
|
|
|
|
if tips.HasReadSeq < seq {
|
|
|
|
|
tips.HasReadSeq = seq
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if userSeqMap[tips.MarkAsReadUserID] > tips.HasReadSeq {
|
|
|
|
|
continue
|
|
|
|
|
if _, ok := conversationUserSeq[tips.ConversationID]; !ok {
|
|
|
|
|
conversationUserSeq[tips.ConversationID] = make(map[string]int64)
|
|
|
|
|
}
|
|
|
|
|
if conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] < tips.HasReadSeq {
|
|
|
|
|
conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] = tips.HasReadSeq
|
|
|
|
|
}
|
|
|
|
|
userSeqMap[tips.MarkAsReadUserID] = tips.HasReadSeq
|
|
|
|
|
}
|
|
|
|
|
if userSeqMap == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if len(conversationID) == 0 {
|
|
|
|
|
log.ZWarn(ctx, "conversation err", nil, "conversationID", conversationID)
|
|
|
|
|
}
|
|
|
|
|
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, conversationID, userSeqMap); err != nil {
|
|
|
|
|
log.ZWarn(ctx, "set read seq to db error", err, "conversationID", conversationID, "userSeqMap", userSeqMap)
|
|
|
|
|
log.ZInfo(ctx, "doSetReadSeq", "conversationUserSeq", conversationUserSeq)
|
|
|
|
|
|
|
|
|
|
// persist to db
|
|
|
|
|
for convID, userSeqMap := range conversationUserSeq {
|
|
|
|
|
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, convID, userSeqMap); err != nil {
|
|
|
|
|
log.ZWarn(ctx, "SetHasReadSeqToDB error", err, "conversationID", convID, "userSeqMap", userSeqMap)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|