@ -18,8 +18,10 @@ import (
"context"
"context"
"encoding/json"
"encoding/json"
"errors"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"strconv"
"strconv"
"strings"
"strings"
"sync"
"time"
"time"
"github.com/IBM/sarama"
"github.com/IBM/sarama"
@ -40,11 +42,12 @@ import (
)
)
const (
const (
size = 500
size = 500
mainDataBuffer = 500
mainDataBuffer = 500
subChanBuffer = 50
subChanBuffer = 50
worker = 50
worker = 50
interval = 100 * time . Millisecond
interval = 100 * time . Millisecond
hasReadChanBuffer = 1000
)
)
type ContextMsg struct {
type ContextMsg struct {
@ -52,14 +55,23 @@ type ContextMsg struct {
ctx context . Context
ctx context . Context
}
}
// This structure is used for asynchronously writing the sender’ s read sequence (seq) regarding a message into MongoDB.
// For example, if the sender sends a message with a seq of 10, then their own read seq for this conversation should be set to 10.
type userHasReadSeq struct {
conversationID string
userHasReadMap map [ string ] int64
}
type OnlineHistoryRedisConsumerHandler struct {
type OnlineHistoryRedisConsumerHandler struct {
historyConsumerGroup * kafka . MConsumerGroup
historyConsumerGroup * kafka . MConsumerGroup
redisMessageBatches * batcher . Batcher [ sarama . ConsumerMessage ]
redisMessageBatches * batcher . Batcher [ sarama . ConsumerMessage ]
msgTransferDatabase controller . MsgTransferDatabase
msgTransferDatabase controller . MsgTransferDatabase
conversationRpcClient * rpcclient . ConversationRpcClient
conversationRpcClient * rpcclient . ConversationRpcClient
groupRpcClient * rpcclient . GroupRpcClient
groupRpcClient * rpcclient . GroupRpcClient
conversationUserHasReadChan chan * userHasReadSeq
wg sync . WaitGroup
}
}
func NewOnlineHistoryRedisConsumerHandler ( kafkaConf * config . Kafka , database controller . MsgTransferDatabase ,
func NewOnlineHistoryRedisConsumerHandler ( kafkaConf * config . Kafka , database controller . MsgTransferDatabase ,
@ -70,6 +82,8 @@ func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database cont
}
}
var och OnlineHistoryRedisConsumerHandler
var och OnlineHistoryRedisConsumerHandler
och . msgTransferDatabase = database
och . msgTransferDatabase = database
och . conversationUserHasReadChan = make ( chan * userHasReadSeq , hasReadChanBuffer )
och . wg . Add ( 1 )
b := batcher . New [ sarama . ConsumerMessage ] (
b := batcher . New [ sarama . ConsumerMessage ] (
batcher . WithSize ( size ) ,
batcher . WithSize ( size ) ,
@ -115,25 +129,25 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
}
}
func ( och * OnlineHistoryRedisConsumerHandler ) doSetReadSeq ( ctx context . Context , msgs [ ] * ContextMsg ) {
func ( och * OnlineHistoryRedisConsumerHandler ) doSetReadSeq ( ctx context . Context , msgs [ ] * ContextMsg ) {
type seqKey struct {
conversationID string
var conversationID string
userID string
var userSeqMap map [ string ] int64
}
var readSeq map [ seqKey ] int64
for _ , msg := range msgs {
for _ , msg := range msgs {
if msg . message . ContentType != constant . HasReadReceipt {
if msg . message . ContentType != constant . HasReadReceipt {
continue
continue
}
}
var elem sdkws . NotificationElem
var elem sdkws . NotificationElem
if err := json . Unmarshal ( msg . message . Content , & elem ) ; err != nil {
if err := json . Unmarshal ( msg . message . Content , & elem ) ; err != nil {
log . Z Error ( ctx , "handlerConversationRead Unmarshal NotificationElem msg err" , err , "msg" , msg )
log . Z Warn ( ctx , "handlerConversationRead Unmarshal NotificationElem msg err" , err , "msg" , msg )
continue
continue
}
}
var tips sdkws . MarkAsReadTips
var tips sdkws . MarkAsReadTips
if err := json . Unmarshal ( [ ] byte ( elem . Detail ) , & tips ) ; err != nil {
if err := json . Unmarshal ( [ ] byte ( elem . Detail ) , & tips ) ; err != nil {
log . Z Error ( ctx , "handlerConversationRead Unmarshal MarkAsReadTips msg err" , err , "msg" , msg )
log . Z Warn ( ctx , "handlerConversationRead Unmarshal MarkAsReadTips msg err" , err , "msg" , msg )
continue
continue
}
}
//The conversation ID for each batch of messages processed by the batcher is the same.
conversationID = tips . ConversationID
if len ( tips . Seqs ) > 0 {
if len ( tips . Seqs ) > 0 {
for _ , seq := range tips . Seqs {
for _ , seq := range tips . Seqs {
if tips . HasReadSeq < seq {
if tips . HasReadSeq < seq {
@ -146,26 +160,25 @@ func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context,
if tips . HasReadSeq < 0 {
if tips . HasReadSeq < 0 {
continue
continue
}
}
if readSeq == nil {
if userSeqMap == nil {
readSeq = make ( map [ seqKey ] int64 )
userSeqMap = make ( map [ string ] int64 )
}
key := seqKey {
conversationID : tips . ConversationID ,
userID : tips . MarkAsReadUserID ,
}
}
if readSeq [ key ] > tips . HasReadSeq {
if userSeqMap [ tips . MarkAsReadUserID ] > tips . HasReadSeq {
continue
continue
}
}
readSeq[ key ] = tips . HasReadSeq
userSeqMap[ tips . MarkAsReadUserID ] = tips . HasReadSeq
}
}
if read Seq == nil {
if use rSeqMap == nil {
return
return
}
}
for key , seq := range readSeq {
if len ( conversationID ) == 0 {
if err := och . msgTransferDatabase . SetHasReadSeqToDB ( ctx , key . userID , key . conversationID , seq ) ; err != nil {
log . ZWarn ( ctx , "conversation err" , nil , "conversationID" , conversationID )
log . ZError ( ctx , "set read seq to db error" , err , "userID" , key . userID , "conversationID" , key . conversationID , "seq" , seq )
}
}
}
if err := och . msgTransferDatabase . SetHasReadSeqToDB ( ctx , conversationID , userSeqMap ) ; err != nil {
log . ZWarn ( ctx , "set read seq to db error" , err , "conversationID" , conversationID , "userSeqMap" , userSeqMap )
}
}
}
func ( och * OnlineHistoryRedisConsumerHandler ) parseConsumerMessages ( ctx context . Context , consumerMessages [ ] * sarama . ConsumerMessage ) [ ] * ContextMsg {
func ( och * OnlineHistoryRedisConsumerHandler ) parseConsumerMessages ( ctx context . Context , consumerMessages [ ] * sarama . ConsumerMessage ) [ ] * ContextMsg {
@ -250,12 +263,21 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key
}
}
if len ( storageMessageList ) > 0 {
if len ( storageMessageList ) > 0 {
msg := storageMessageList [ 0 ]
msg := storageMessageList [ 0 ]
lastSeq , isNewConversation , err := och . msgTransferDatabase . BatchInsertChat2Cache ( ctx , conversationID , storageMessageList )
lastSeq , isNewConversation , userSeqMap, err := och . msgTransferDatabase . BatchInsertChat2Cache ( ctx , conversationID , storageMessageList )
if err != nil && ! errors . Is ( errs . Unwrap ( err ) , redis . Nil ) {
if err != nil && ! errors . Is ( errs . Unwrap ( err ) , redis . Nil ) {
log . Z Error ( ctx , "batch data insert to redis err" , err , "storageMsgList" , storageMessageList )
log . Z Warn ( ctx , "batch data insert to redis err" , err , "storageMsgList" , storageMessageList )
return
return
}
}
log . ZInfo ( ctx , "BatchInsertChat2Cache end" )
log . ZInfo ( ctx , "BatchInsertChat2Cache end" )
err = och . msgTransferDatabase . SetHasReadSeqs ( ctx , conversationID , userSeqMap )
if err != nil {
log . ZWarn ( ctx , "SetHasReadSeqs error" , err , "userSeqMap" , userSeqMap , "conversationID" , conversationID )
prommetrics . SeqSetFailedCounter . Inc ( )
}
och . conversationUserHasReadChan <- & userHasReadSeq {
conversationID : conversationID ,
userHasReadMap : userSeqMap ,
}
if isNewConversation {
if isNewConversation {
switch msg . SessionType {
switch msg . SessionType {
@ -308,7 +330,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
storageMessageList = append ( storageMessageList , msg . message )
storageMessageList = append ( storageMessageList , msg . message )
}
}
if len ( storageMessageList ) > 0 {
if len ( storageMessageList ) > 0 {
lastSeq , _ , err := och . msgTransferDatabase . BatchInsertChat2Cache ( ctx , conversationID , storageMessageList )
lastSeq , _ , _, err := och . msgTransferDatabase . BatchInsertChat2Cache ( ctx , conversationID , storageMessageList )
if err != nil {
if err != nil {
log . ZError ( ctx , "notification batch insert to redis error" , err , "conversationID" , conversationID ,
log . ZError ( ctx , "notification batch insert to redis error" , err , "conversationID" , conversationID ,
"storageList" , storageMessageList )
"storageList" , storageMessageList )
@ -323,6 +345,21 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
och . toPushTopic ( ctx , key , conversationID , storageList )
och . toPushTopic ( ctx , key , conversationID , storageList )
}
}
}
}
func ( och * OnlineHistoryRedisConsumerHandler ) HandleUserHasReadSeqMessages ( ctx context . Context ) {
defer och . wg . Done ( )
for msg := range och . conversationUserHasReadChan {
if err := och . msgTransferDatabase . SetHasReadSeqToDB ( ctx , msg . conversationID , msg . userHasReadMap ) ; err != nil {
log . ZWarn ( ctx , "set read seq to db error" , err , "conversationID" , msg . conversationID , "userSeqMap" , msg . userHasReadMap )
}
}
log . ZInfo ( ctx , "Channel closed, exiting handleUserHasReadSeqMessages" )
}
func ( och * OnlineHistoryRedisConsumerHandler ) Close ( ) {
close ( och . conversationUserHasReadChan )
och . wg . Wait ( )
}
func ( och * OnlineHistoryRedisConsumerHandler ) toPushTopic ( ctx context . Context , key , conversationID string , msgs [ ] * ContextMsg ) {
func ( och * OnlineHistoryRedisConsumerHandler ) toPushTopic ( ctx context . Context , key , conversationID string , msgs [ ] * ContextMsg ) {
for _ , v := range msgs {
for _ , v := range msgs {