refactor: improve db structure in `storage/controller` (#2604)

* refactor: refactor workflows contents.

* add tool workflows.

* update field.

* fix: remove chat error.

* Fix err.

* fix error.

* remove cn comment.

* update workflows files.

* update infra config.

* move workflows.

* feat: update bot.

* fix: solve uncorrect outdated msg get.

* update get docIDs logic.

* update

* update skip logic.

* fix

* update.

* fix: delay deleteObject func.

* remove unused content.

* update log type.

* feat: implement request batch count limit.

* update

* update

* refactor: improve db structure in `storage/controller`
pull/2611/head
Monet Lee 4 months ago committed by GitHub
parent eea2627a28
commit c581d43f17
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -18,19 +18,20 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"net/http"
"os"
"os/signal"
"syscall"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
"net/http"
"os"
"os/signal"
"syscall"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" discRegister "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
@ -65,6 +66,7 @@ type Config struct {
func Start(ctx context.Context, index int, config *Config) error { func Start(ctx context.Context, index int, config *Config) error {
log.CInfo(ctx, "MSG-TRANSFER server is initializing", "prometheusPorts", log.CInfo(ctx, "MSG-TRANSFER server is initializing", "prometheusPorts",
config.MsgTransfer.Prometheus.Ports, "index", index) config.MsgTransfer.Prometheus.Ports, "index", index)
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil { if err != nil {
return err return err
@ -73,12 +75,13 @@ func Start(ctx context.Context, index int, config *Config) error {
if err != nil { if err != nil {
return err return err
} }
client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share) client, err := discRegister.NewDiscoveryRegister(&config.Discovery, &config.Share)
if err != nil { if err != nil {
return err return err
} }
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
msgModel := redis.NewMsgCache(rdb) msgModel := redis.NewMsgCache(rdb)
msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB()) msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB())
if err != nil { if err != nil {
@ -94,17 +97,17 @@ func Start(ctx context.Context, index int, config *Config) error {
return err return err
} }
seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser) seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser)
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, &config.KafkaConfig) msgTransferDatabase, err := controller.NewMsgTransferDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, &config.KafkaConfig)
if err != nil { if err != nil {
return err return err
} }
conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
historyCH, err := NewOnlineHistoryRedisConsumerHandler(&config.KafkaConfig, msgDatabase, &conversationRpcClient, &groupRpcClient) historyCH, err := NewOnlineHistoryRedisConsumerHandler(&config.KafkaConfig, msgTransferDatabase, &conversationRpcClient, &groupRpcClient)
if err != nil { if err != nil {
return err return err
} }
historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(&config.KafkaConfig, msgDatabase) historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(&config.KafkaConfig, msgTransferDatabase)
if err != nil { if err != nil {
return err return err
} }

@ -18,6 +18,10 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"strconv"
"strings"
"time"
"github.com/IBM/sarama" "github.com/IBM/sarama"
"github.com/go-redis/redis" "github.com/go-redis/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -33,9 +37,6 @@ import (
"github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/mq/kafka"
"github.com/openimsdk/tools/utils/stringutil" "github.com/openimsdk/tools/utils/stringutil"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"strconv"
"strings"
"time"
) )
const ( const (
@ -56,19 +57,19 @@ type OnlineHistoryRedisConsumerHandler struct {
redisMessageBatches *batcher.Batcher[sarama.ConsumerMessage] redisMessageBatches *batcher.Batcher[sarama.ConsumerMessage]
msgDatabase controller.CommonMsgDatabase msgTransferDatabase controller.MsgTransferDatabase
conversationRpcClient *rpcclient.ConversationRpcClient conversationRpcClient *rpcclient.ConversationRpcClient
groupRpcClient *rpcclient.GroupRpcClient groupRpcClient *rpcclient.GroupRpcClient
} }
func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase, func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.MsgTransferDatabase,
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) { conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) {
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}, false) historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var och OnlineHistoryRedisConsumerHandler var och OnlineHistoryRedisConsumerHandler
och.msgDatabase = database och.msgTransferDatabase = database
b := batcher.New[sarama.ConsumerMessage]( b := batcher.New[sarama.ConsumerMessage](
batcher.WithSize(size), batcher.WithSize(size),
@ -161,7 +162,7 @@ func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context,
return return
} }
for key, seq := range readSeq { for key, seq := range readSeq {
if err := och.msgDatabase.SetHasReadSeqToDB(ctx, key.userID, key.conversationID, seq); err != nil { if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, key.userID, key.conversationID, seq); err != nil {
log.ZError(ctx, "set read seq to db error", err, "userID", key.userID, "conversationID", key.conversationID, "seq", seq) log.ZError(ctx, "set read seq to db error", err, "userID", key.userID, "conversationID", key.conversationID, "seq", seq)
} }
} }
@ -248,7 +249,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key
} }
if len(storageMessageList) > 0 { if len(storageMessageList) > 0 {
msg := storageMessageList[0] msg := storageMessageList[0]
lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) lastSeq, isNewConversation, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
if err != nil && !errors.Is(errs.Unwrap(err), redis.Nil) { if err != nil && !errors.Is(errs.Unwrap(err), redis.Nil) {
log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList) log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList)
return return
@ -282,7 +283,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key
} }
log.ZDebug(ctx, "success incr to next topic") log.ZDebug(ctx, "success incr to next topic")
err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq) err = och.msgTransferDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq)
if err != nil { if err != nil {
log.ZError(ctx, "Msg To MongoDB MQ error", err, "conversationID", log.ZError(ctx, "Msg To MongoDB MQ error", err, "conversationID",
conversationID, "storageList", storageMessageList, "lastSeq", lastSeq) conversationID, "storageList", storageMessageList, "lastSeq", lastSeq)
@ -299,14 +300,14 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
storageMessageList = append(storageMessageList, msg.message) storageMessageList = append(storageMessageList, msg.message)
} }
if len(storageMessageList) > 0 { if len(storageMessageList) > 0 {
lastSeq, _, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList) lastSeq, _, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
if err != nil { if err != nil {
log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID, log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID,
"storageList", storageMessageList) "storageList", storageMessageList)
return return
} }
log.ZDebug(ctx, "success to next topic", "conversationID", conversationID) log.ZDebug(ctx, "success to next topic", "conversationID", conversationID)
err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq) err = och.msgTransferDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq)
if err != nil { if err != nil {
log.ZError(ctx, "Msg To MongoDB MQ error", err, "conversationID", log.ZError(ctx, "Msg To MongoDB MQ error", err, "conversationID",
conversationID, "storageList", storageMessageList, "lastSeq", lastSeq) conversationID, "storageList", storageMessageList, "lastSeq", lastSeq)
@ -318,7 +319,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) { func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) {
for _, v := range msgs { for _, v := range msgs {
log.ZDebug(ctx, "push msg to topic", "msg", v.message.String()) log.ZDebug(ctx, "push msg to topic", "msg", v.message.String())
och.msgDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message) och.msgTransferDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message)
} }
} }

@ -16,6 +16,7 @@ package msgtransfer
import ( import (
"context" "context"
"github.com/IBM/sarama" "github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
@ -28,10 +29,10 @@ import (
type OnlineHistoryMongoConsumerHandler struct { type OnlineHistoryMongoConsumerHandler struct {
historyConsumerGroup *kafka.MConsumerGroup historyConsumerGroup *kafka.MConsumerGroup
msgDatabase controller.CommonMsgDatabase msgTransferDatabase controller.MsgTransferDatabase
} }
func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) { func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.MsgTransferDatabase) (*OnlineHistoryMongoConsumerHandler, error) {
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToMongoGroupID, []string{kafkaConf.ToMongoTopic}, true) historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToMongoGroupID, []string{kafkaConf.ToMongoTopic}, true)
if err != nil { if err != nil {
return nil, err return nil, err
@ -39,7 +40,7 @@ func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database cont
mc := &OnlineHistoryMongoConsumerHandler{ mc := &OnlineHistoryMongoConsumerHandler{
historyConsumerGroup: historyConsumerGroup, historyConsumerGroup: historyConsumerGroup,
msgDatabase: database, msgTransferDatabase: database,
} }
return mc, nil return mc, nil
} }
@ -57,7 +58,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
return return
} }
log.ZInfo(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String()) log.ZInfo(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String())
err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq) err = mc.msgTransferDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq)
if err != nil { if err != nil {
log.ZError( log.ZError(
ctx, ctx,
@ -76,7 +77,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
for _, msg := range msgFromMQ.MsgData { for _, msg := range msgFromMQ.MsgData {
seqs = append(seqs, msg.Seq) seqs = append(seqs, msg.Seq)
} }
err = mc.msgDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs) err = mc.msgTransferDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs)
if err != nil { if err != nil {
log.ZError( log.ZError(
ctx, ctx,

@ -26,7 +26,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
pbmsg "github.com/openimsdk/protocol/msg" pbmsg "github.com/openimsdk/protocol/msg"
@ -47,16 +46,10 @@ const (
// CommonMsgDatabase defines the interface for message database operations. // CommonMsgDatabase defines the interface for message database operations.
type CommonMsgDatabase interface { type CommonMsgDatabase interface {
// BatchInsertChat2DB inserts a batch of messages into the database for a specific conversation.
BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error
// RevokeMsg revokes a message in a conversation. // RevokeMsg revokes a message in a conversation.
RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *model.RevokeModel) error RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *model.RevokeModel) error
// MarkSingleChatMsgsAsRead marks messages as read for a single chat by sequence numbers. // MarkSingleChatMsgsAsRead marks messages as read for a single chat by sequence numbers.
MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, seqs []int64) error
// DeleteMessagesFromCache deletes message caches from Redis by sequence numbers.
DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error
// BatchInsertChat2Cache increments the sequence number and then batch inserts messages into the cache.
BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error)
// GetMsgBySeqsRange retrieves messages from MongoDB by a range of sequence numbers. // GetMsgBySeqsRange retrieves messages from MongoDB by a range of sequence numbers.
GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error)
// GetMsgBySeqs retrieves messages for large groups from MongoDB by sequence numbers. // GetMsgBySeqs retrieves messages for large groups from MongoDB by sequence numbers.
@ -77,7 +70,6 @@ type CommonMsgDatabase interface {
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error)
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error)
UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error
@ -91,8 +83,6 @@ type CommonMsgDatabase interface {
// to mq // to mq
MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error
MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error)
MsgToMongoMQ(ctx context.Context, key, conversationID string, msgs []*sdkws.MsgData, lastSeq int64) error
RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error)
RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error)
@ -114,22 +104,12 @@ func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser
if err != nil { if err != nil {
return nil, err return nil, err
} }
producerToMongo, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToMongoTopic)
if err != nil {
return nil, err
}
producerToPush, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToPushTopic)
if err != nil {
return nil, err
}
return &commonMsgDatabase{ return &commonMsgDatabase{
msgDocDatabase: msgDocModel, msgDocDatabase: msgDocModel,
msg: msg, msg: msg,
seqUser: seqUser, seqUser: seqUser,
seqConversation: seqConversation, seqConversation: seqConversation,
producer: producerToRedis, producer: producerToRedis,
producerToMongo: producerToMongo,
producerToPush: producerToPush,
}, nil }, nil
} }
@ -140,8 +120,6 @@ type commonMsgDatabase struct {
seqConversation cache.SeqConversationCache seqConversation cache.SeqConversationCache
seqUser cache.SeqUser seqUser cache.SeqUser
producer *kafka.Producer producer *kafka.Producer
producerToMongo *kafka.Producer
producerToPush *kafka.Producer
} }
func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error { func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error {
@ -149,23 +127,6 @@ func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sd
return err return err
} }
func (db *commonMsgDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) {
partition, offset, err := db.producerToPush.SendMessage(ctx, key, &pbmsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID})
if err != nil {
log.ZError(ctx, "MsgToPushMQ", err, "key", key, "msg2mq", msg2mq)
return 0, 0, err
}
return partition, offset, nil
}
func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error {
if len(messages) > 0 {
_, _, err := db.producerToMongo.SendMessage(ctx, key, &pbmsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages})
return err
}
return nil
}
func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error { func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error {
if len(fields) == 0 { if len(fields) == 0 {
return nil return nil
@ -267,52 +228,6 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
return nil return nil
} }
func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
if len(msgList) == 0 {
return errs.ErrArgs.WrapMsg("msgList is empty")
}
msgs := make([]any, len(msgList))
for i, msg := range msgList {
if msg == nil {
continue
}
var offlinePushModel *model.OfflinePushModel
if msg.OfflinePushInfo != nil {
offlinePushModel = &model.OfflinePushModel{
Title: msg.OfflinePushInfo.Title,
Desc: msg.OfflinePushInfo.Desc,
Ex: msg.OfflinePushInfo.Ex,
IOSPushSound: msg.OfflinePushInfo.IOSPushSound,
IOSBadgeCount: msg.OfflinePushInfo.IOSBadgeCount,
}
}
msgs[i] = &model.MsgDataModel{
SendID: msg.SendID,
RecvID: msg.RecvID,
GroupID: msg.GroupID,
ClientMsgID: msg.ClientMsgID,
ServerMsgID: msg.ServerMsgID,
SenderPlatformID: msg.SenderPlatformID,
SenderNickname: msg.SenderNickname,
SenderFaceURL: msg.SenderFaceURL,
SessionType: msg.SessionType,
MsgFrom: msg.MsgFrom,
ContentType: msg.ContentType,
Content: string(msg.Content),
Seq: msg.Seq,
SendTime: msg.SendTime,
CreateTime: msg.CreateTime,
Status: msg.Status,
Options: msg.Options,
OfflinePush: offlinePushModel,
AtUserIDList: msg.AtUserIDList,
AttachedInfo: msg.AttachedInfo,
Ex: msg.Ex,
}
}
return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq)
}
func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *model.RevokeModel) error { func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *model.RevokeModel) error {
return db.BatchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq) return db.BatchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq)
} }
@ -332,56 +247,6 @@ func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userI
return nil return nil
} }
func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error {
return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs)
}
func (db *commonMsgDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {
for userID, seq := range userSeqMap {
if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
return nil
}
func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
lenList := len(msgs)
if int64(lenList) > db.msgTable.GetSingleGocMsgNum() {
return 0, false, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap()
}
if lenList < 1 {
return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap()
}
currentMaxSeq, err := db.seqConversation.Malloc(ctx, conversationID, int64(len(msgs)))
if err != nil {
log.ZError(ctx, "storage.seq.Malloc", err)
return 0, false, err
}
isNew = currentMaxSeq == 0
lastMaxSeq := currentMaxSeq
userSeqMap := make(map[string]int64)
for _, m := range msgs {
currentMaxSeq++
m.Seq = currentMaxSeq
userSeqMap[m.SendID] = m.Seq
}
failedNum, err := db.msg.SetMessagesToCache(ctx, conversationID, msgs)
if err != nil {
prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum))
log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID)
} else {
prommetrics.MsgInsertRedisSuccessCounter.Inc()
}
err = db.setHasReadSeqs(ctx, conversationID, userSeqMap)
if err != nil {
log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID)
prommetrics.SeqSetFailedCounter.Inc()
}
return lastMaxSeq, isNew, errs.Wrap(err)
}
func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) {
for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, seqs) { for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, seqs) {
// log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs) // log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs)
@ -809,10 +674,6 @@ func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, c
return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq) return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq)
} }
func (db *commonMsgDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, hasReadSeq)
}
func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) { func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
return db.seqUser.GetUserReadSeqs(ctx, userID, conversationIDs) return db.seqUser.GetUserReadSeqs(ctx, userID, conversationIDs)
} }

@ -0,0 +1,286 @@
package controller
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
pbmsg "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mq/kafka"
"go.mongodb.org/mongo-driver/mongo"
)
type MsgTransferDatabase interface {
// BatchInsertChat2DB inserts a batch of messages into the database for a specific conversation.
BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error
// DeleteMessagesFromCache deletes message caches from Redis by sequence numbers.
DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error
// BatchInsertChat2Cache increments the sequence number and then batch inserts messages into the cache.
BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error)
SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
// to mq
MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error)
MsgToMongoMQ(ctx context.Context, key, conversationID string, msgs []*sdkws.MsgData, lastSeq int64) error
}
func NewMsgTransferDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (MsgTransferDatabase, error) {
conf, err := kafka.BuildProducerConfig(*kafkaConf.Build())
if err != nil {
return nil, err
}
producerToMongo, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToMongoTopic)
if err != nil {
return nil, err
}
producerToPush, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToPushTopic)
if err != nil {
return nil, err
}
return &msgTransferDatabase{
msgDocDatabase: msgDocModel,
msg: msg,
seqUser: seqUser,
seqConversation: seqConversation,
producerToMongo: producerToMongo,
producerToPush: producerToPush,
}, nil
}
type msgTransferDatabase struct {
msgDocDatabase database.Msg
msgTable model.MsgDocModel
msg cache.MsgCache
seqConversation cache.SeqConversationCache
seqUser cache.SeqUser
producerToMongo *kafka.Producer
producerToPush *kafka.Producer
}
func (db *msgTransferDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
if len(msgList) == 0 {
return errs.ErrArgs.WrapMsg("msgList is empty")
}
msgs := make([]any, len(msgList))
for i, msg := range msgList {
if msg == nil {
continue
}
var offlinePushModel *model.OfflinePushModel
if msg.OfflinePushInfo != nil {
offlinePushModel = &model.OfflinePushModel{
Title: msg.OfflinePushInfo.Title,
Desc: msg.OfflinePushInfo.Desc,
Ex: msg.OfflinePushInfo.Ex,
IOSPushSound: msg.OfflinePushInfo.IOSPushSound,
IOSBadgeCount: msg.OfflinePushInfo.IOSBadgeCount,
}
}
msgs[i] = &model.MsgDataModel{
SendID: msg.SendID,
RecvID: msg.RecvID,
GroupID: msg.GroupID,
ClientMsgID: msg.ClientMsgID,
ServerMsgID: msg.ServerMsgID,
SenderPlatformID: msg.SenderPlatformID,
SenderNickname: msg.SenderNickname,
SenderFaceURL: msg.SenderFaceURL,
SessionType: msg.SessionType,
MsgFrom: msg.MsgFrom,
ContentType: msg.ContentType,
Content: string(msg.Content),
Seq: msg.Seq,
SendTime: msg.SendTime,
CreateTime: msg.CreateTime,
Status: msg.Status,
Options: msg.Options,
OfflinePush: offlinePushModel,
AtUserIDList: msg.AtUserIDList,
AttachedInfo: msg.AttachedInfo,
Ex: msg.Ex,
}
}
return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq)
}
func (db *msgTransferDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error {
if len(fields) == 0 {
return nil
}
num := db.msgTable.GetSingleGocMsgNum()
// num = 100
for i, field := range fields { // Check the type of the field
var ok bool
switch key {
case updateKeyMsg:
var msg *model.MsgDataModel
msg, ok = field.(*model.MsgDataModel)
if msg != nil && msg.Seq != firstSeq+int64(i) {
return errs.ErrInternalServer.WrapMsg("seq is invalid")
}
case updateKeyRevoke:
_, ok = field.(*model.RevokeModel)
default:
return errs.ErrInternalServer.WrapMsg("key is invalid")
}
if !ok {
return errs.ErrInternalServer.WrapMsg("field type is invalid")
}
}
// Returns true if the document exists in the database, false if the document does not exist in the database
updateMsgModel := func(seq int64, i int) (bool, error) {
var (
res *mongo.UpdateResult
err error
)
docID := db.msgTable.GetDocID(conversationID, seq)
index := db.msgTable.GetMsgIndex(seq)
field := fields[i]
switch key {
case updateKeyMsg:
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", field)
case updateKeyRevoke:
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", field)
}
if err != nil {
return false, err
}
return res.MatchedCount > 0, nil
}
tryUpdate := true
for i := 0; i < len(fields); i++ {
seq := firstSeq + int64(i) // Current sequence number
if tryUpdate {
matched, err := updateMsgModel(seq, i)
if err != nil {
return err
}
if matched {
continue // The current data has been updated, skip the current data
}
}
doc := model.MsgDocModel{
DocID: db.msgTable.GetDocID(conversationID, seq),
Msg: make([]*model.MsgInfoModel, num),
}
var insert int // Inserted data number
for j := i; j < len(fields); j++ {
seq = firstSeq + int64(j)
if db.msgTable.GetDocID(conversationID, seq) != doc.DocID {
break
}
insert++
switch key {
case updateKeyMsg:
doc.Msg[db.msgTable.GetMsgIndex(seq)] = &model.MsgInfoModel{
Msg: fields[j].(*model.MsgDataModel),
}
case updateKeyRevoke:
doc.Msg[db.msgTable.GetMsgIndex(seq)] = &model.MsgInfoModel{
Revoke: fields[j].(*model.RevokeModel),
}
}
}
for i, msgInfo := range doc.Msg {
if msgInfo == nil {
msgInfo = &model.MsgInfoModel{}
doc.Msg[i] = msgInfo
}
if msgInfo.DelList == nil {
doc.Msg[i].DelList = []string{}
}
}
if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
if mongo.IsDuplicateKeyError(err) {
i-- // already inserted
tryUpdate = true // next block use update mode
continue
}
return err
}
tryUpdate = false // The current block is inserted successfully, and the next block is inserted preferentially
i += insert - 1 // Skip the inserted data
}
return nil
}
func (db *msgTransferDatabase) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error {
return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs)
}
func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
lenList := len(msgs)
if int64(lenList) > db.msgTable.GetSingleGocMsgNum() {
return 0, false, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap()
}
if lenList < 1 {
return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap()
}
currentMaxSeq, err := db.seqConversation.Malloc(ctx, conversationID, int64(len(msgs)))
if err != nil {
log.ZError(ctx, "storage.seq.Malloc", err)
return 0, false, err
}
isNew = currentMaxSeq == 0
lastMaxSeq := currentMaxSeq
userSeqMap := make(map[string]int64)
for _, m := range msgs {
currentMaxSeq++
m.Seq = currentMaxSeq
userSeqMap[m.SendID] = m.Seq
}
failedNum, err := db.msg.SetMessagesToCache(ctx, conversationID, msgs)
if err != nil {
prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum))
log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID)
} else {
prommetrics.MsgInsertRedisSuccessCounter.Inc()
}
err = db.setHasReadSeqs(ctx, conversationID, userSeqMap)
if err != nil {
log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID)
prommetrics.SeqSetFailedCounter.Inc()
}
return lastMaxSeq, isNew, errs.Wrap(err)
}
func (db *msgTransferDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {
for userID, seq := range userSeqMap {
if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
return nil
}
func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, hasReadSeq)
}
func (db *msgTransferDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) {
partition, offset, err := db.producerToPush.SendMessage(ctx, key, &pbmsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID})
if err != nil {
log.ZError(ctx, "MsgToPushMQ", err, "key", key, "msg2mq", msg2mq)
return 0, 0, err
}
return partition, offset, nil
}
func (db *msgTransferDatabase) MsgToMongoMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error {
if len(messages) > 0 {
_, _, err := db.producerToMongo.SendMessage(ctx, key, &pbmsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages})
if err != nil {
log.ZError(ctx, "MsgToMongoMQ", err, "key", key, "conversationID", conversationID, "lastSeq", lastSeq)
return err
}
}
return nil
}
Loading…
Cancel
Save