read seq is written to mongo

pull/2558/head
withchao 1 year ago
parent 5f8713ad7e
commit 764b47ed92

@ -17,9 +17,13 @@ package main
import (
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/system/program"
"os"
)
func main() {
if len(os.Args) == 1 {
os.Args = append(os.Args, "-i", "0", "-c", "/Users/chao/Desktop/withchao/open-im-server/config/")
}
if err := cmd.NewMsgTransferCmd().Exec(); err != nil {
program.ExitWithError(err)
}

@ -16,6 +16,7 @@ package msgtransfer
import (
"context"
"encoding/json"
"errors"
"github.com/IBM/sarama"
"github.com/go-redis/redis"
@ -89,6 +90,7 @@ func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database cont
och.conversationRpcClient = conversationRpcClient
och.groupRpcClient = groupRpcClient
och.historyConsumerGroup = historyConsumerGroup
return &och, err
}
func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[sarama.ConsumerMessage]) {
@ -97,6 +99,7 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
ctx = withAggregationCtx(ctx, ctxMessages)
log.ZInfo(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMessages),
"key", val.Key())
och.doSetReadSeq(ctx, ctxMessages)
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList :=
och.categorizeMessageLists(ctxMessages)
@ -110,6 +113,60 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
och.handleNotification(ctx, val.Key(), conversationIDNotification, storageNotificationList, notStorageNotificationList)
}
func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) {
type seqKey struct {
conversationID string
userID string
}
var readSeq map[seqKey]int64
for _, msg := range msgs {
if msg.message.ContentType != constant.HasReadReceipt {
continue
}
var elem sdkws.NotificationElem
if err := json.Unmarshal(msg.message.Content, &elem); err != nil {
log.ZError(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg)
continue
}
var tips sdkws.MarkAsReadTips
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
log.ZError(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg)
continue
}
if len(tips.Seqs) > 0 {
for _, seq := range tips.Seqs {
if tips.HasReadSeq < seq {
tips.HasReadSeq = seq
}
}
clear(tips.Seqs)
tips.Seqs = nil
}
if tips.HasReadSeq < 0 {
continue
}
if readSeq == nil {
readSeq = make(map[seqKey]int64)
}
key := seqKey{
conversationID: tips.ConversationID,
userID: tips.MarkAsReadUserID,
}
if readSeq[key] > tips.HasReadSeq {
continue
}
readSeq[key] = tips.HasReadSeq
}
if readSeq == nil {
return
}
for key, seq := range readSeq {
if err := och.msgDatabase.SetHasReadSeqToDB(ctx, key.userID, key.conversationID, seq); err != nil {
log.ZError(ctx, "set read seq to db error", err, "userID", key.userID, "conversationID", key.conversationID, "seq", seq)
}
}
}
func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*sarama.ConsumerMessage) []*ContextMsg {
var ctxMessages []*ContextMsg
for i := 0; i < len(consumerMessages); i++ {

@ -16,7 +16,6 @@ package msgtransfer
import (
"context"
"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"

@ -17,7 +17,6 @@ package push
import (
"context"
"encoding/json"
"fmt"
"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
@ -36,17 +35,11 @@ import (
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mq/kafka"
"github.com/openimsdk/tools/mq/memamq"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/jsonutil"
"github.com/openimsdk/tools/utils/timeutil"
"github.com/redis/go-redis/v9"
"google.golang.org/protobuf/proto"
"math/rand"
"os"
"strconv"
"sync/atomic"
"time"
)
type ConsumerHandler struct {
@ -61,7 +54,6 @@ type ConsumerHandler struct {
groupRpcClient rpcclient.GroupRpcClient
webhookClient *webhook.Client
config *Config
readCh chan *sdkws.MarkAsReadTips
}
func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient,
@ -84,8 +76,6 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher,
consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL)
consumerHandler.config = config
consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, nil)
consumerHandler.readCh = make(chan *sdkws.MarkAsReadTips, 1024*8)
go consumerHandler.loopRead()
return &consumerHandler, nil
}
@ -99,7 +89,6 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
MsgData: msgFromMQ.MsgData,
ConversationID: msgFromMQ.ConversationID,
}
c.handlerConversationRead(ctx, pbData.MsgData)
sec := msgFromMQ.MsgData.SendTime / 1000
nowSec := timeutil.GetCurrentTimestampBySecond()
log.ZDebug(ctx, "push msg", "msg", pbData.String(), "sec", sec, "nowSec", nowSec)
@ -129,98 +118,6 @@ func (*ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil }
func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (c *ConsumerHandler) loopRead() {
type markKey struct {
ConversationID string
UserID string
}
type markSeq struct {
ReadSeq int64
MarkSeq int64
Count int64
}
type asyncRequest struct {
ConversationID string
UserID string
ReadSeq int64
}
ctx := context.Background()
ctx = mcontext.WithOpUserIDContext(ctx, c.config.Share.IMAdminUserID[0])
opIDPrefix := fmt.Sprintf("mark_read_%d_%d_", os.Getpid(), rand.Uint32())
var incr atomic.Uint64
maxSeq := make(map[markKey]*markSeq, 1024*8)
queue := memamq.NewMemoryQueue(32, 1024)
defer queue.Stop()
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for {
select {
case <-ticker.C:
var markSeqs []asyncRequest
for key, seq := range maxSeq {
if seq.MarkSeq >= seq.ReadSeq {
seq.Count++
if seq.Count > 6 {
delete(maxSeq, key)
}
continue
}
seq.Count = 0
seq.MarkSeq = seq.ReadSeq
markSeqs = append(markSeqs, asyncRequest{
ConversationID: key.ConversationID,
UserID: key.UserID,
ReadSeq: seq.ReadSeq,
})
}
if len(markSeqs) == 0 {
continue
}
go func() {
for i := range markSeqs {
seq := markSeqs[i]
_ = queue.PushCtx(ctx, func() {
ctx = mcontext.SetOperationID(ctx, opIDPrefix+strconv.FormatUint(incr.Add(1), 10))
_, err := c.msgRpcClient.Client.SetConversationHasReadSeq(ctx, &pbchat.SetConversationHasReadSeqReq{
ConversationID: seq.ConversationID,
UserID: seq.UserID,
HasReadSeq: seq.ReadSeq,
NoNotification: true,
})
if err != nil {
log.ZError(ctx, "ConsumerHandler SetConversationHasReadSeq", err, "conversationID", seq.ConversationID, "userID", seq.UserID, "readSeq", seq.ReadSeq)
}
})
}
}()
case tips, ok := <-c.readCh:
if !ok {
return
}
if tips.HasReadSeq <= 0 {
continue
}
key := markKey{
ConversationID: tips.ConversationID,
UserID: tips.MarkAsReadUserID,
}
ms, ok := maxSeq[key]
if ok {
if ms.ReadSeq < tips.HasReadSeq {
ms.ReadSeq = tips.HasReadSeq
}
} else {
ms = &markSeq{
ReadSeq: tips.HasReadSeq,
MarkSeq: 0,
}
maxSeq[key] = ms
}
}
}
}
func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
ctx := c.pushConsumerGroup.GetContextFromMsg(msg)
@ -318,39 +215,6 @@ func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.
return result, nil
}
func (c *ConsumerHandler) handlerConversationRead(ctx context.Context, msg *sdkws.MsgData) {
if msg.ContentType != constant.HasReadReceipt {
return
}
var elem sdkws.NotificationElem
if err := json.Unmarshal(msg.Content, &elem); err != nil {
log.ZError(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg)
return
}
var tips sdkws.MarkAsReadTips
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
log.ZError(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg)
return
}
if len(tips.Seqs) > 0 {
for _, seq := range tips.Seqs {
if tips.HasReadSeq < seq {
tips.HasReadSeq = seq
}
}
clear(tips.Seqs)
tips.Seqs = nil
}
if tips.HasReadSeq < 0 {
return
}
select {
case c.readCh <- &tips:
default:
log.ZWarn(ctx, "handlerConversationRead readCh is full", nil, "markAsReadTips", &tips)
}
}
func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
log.ZDebug(ctx, "Get group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
var pushToUserIDs []string

@ -80,16 +80,10 @@ func (m *msgServer) SetConversationHasReadSeq(ctx context.Context, req *msg.SetC
if req.HasReadSeq > maxSeq {
return nil, errs.ErrArgs.WrapMsg("hasReadSeq must not be bigger than maxSeq")
}
if req.NoNotification {
if err := m.MsgDatabase.SetHasReadSeqToDB(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil {
return nil, err
}
} else {
if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil {
return nil, err
}
m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, req.UserID, nil, req.HasReadSeq)
if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil {
return nil, err
}
m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, req.UserID, nil, req.HasReadSeq)
return &msg.SetConversationHasReadSeqResp{}, nil
}

@ -73,18 +73,23 @@ func (s *seqUserCacheRedis) GetUserReadSeq(ctx context.Context, conversationID s
})
}
func (s *seqUserCacheRedis) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64, writeDB bool) error {
if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil {
return errs.Wrap(err)
func (s *seqUserCacheRedis) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
dbSeq, err := s.GetUserReadSeq(ctx, conversationID, userID)
if err != nil {
return err
}
if writeDB {
if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
if dbSeq < seq {
if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil {
return errs.Wrap(err)
}
}
return nil
}
func (s *seqUserCacheRedis) SetUserReadSeqToDB(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq)
}
func (s *seqUserCacheRedis) SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
keys := make([]string, 0, len(seqs))
for conversationID, seq := range seqs {
@ -128,13 +133,6 @@ func (s *seqUserCacheRedis) SetUserReadSeqs(ctx context.Context, userID string,
if err := s.setUserRedisReadSeqs(ctx, userID, seqs); err != nil {
return err
}
for conversationID, seq := range seqs {
if seq%s.readSeqWriteRatio == 0 {
if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
}
return nil
}

@ -8,7 +8,8 @@ type SeqUser interface {
GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64, writeDB bool) error
SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error
SetUserReadSeqToDB(ctx context.Context, conversationID string, userID string, seq int64) error
SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
SetUserReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error
GetUserReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)

@ -338,7 +338,7 @@ func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conver
func (db *commonMsgDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {
for userID, seq := range userSeqMap {
if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq, false); err != nil {
if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
@ -806,11 +806,11 @@ func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID stri
}
func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq, false)
return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq)
}
func (db *commonMsgDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq, true)
return db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, hasReadSeq)
}
func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {

@ -115,5 +115,12 @@ func (s *seqUserMongo) GetUserReadSeqs(ctx context.Context, userID string, conve
}
func (s *seqUserMongo) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
dbSeq, err := s.GetUserReadSeq(ctx, conversationID, userID)
if err != nil {
return err
}
if dbSeq > seq {
return nil
}
return s.setSeq(ctx, conversationID, userID, seq, "read_seq")
}

Loading…
Cancel
Save