You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
287 lines
10 KiB
287 lines
10 KiB
4 months ago
|
package controller
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
|
||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||
|
pbmsg "github.com/openimsdk/protocol/msg"
|
||
|
"github.com/openimsdk/protocol/sdkws"
|
||
|
"github.com/openimsdk/tools/errs"
|
||
|
"github.com/openimsdk/tools/log"
|
||
|
"github.com/openimsdk/tools/mq/kafka"
|
||
|
"go.mongodb.org/mongo-driver/mongo"
|
||
|
)
|
||
|
|
||
|
type MsgTransferDatabase interface {
|
||
|
// BatchInsertChat2DB inserts a batch of messages into the database for a specific conversation.
|
||
|
BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error
|
||
|
// DeleteMessagesFromCache deletes message caches from Redis by sequence numbers.
|
||
|
DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error
|
||
|
|
||
|
// BatchInsertChat2Cache increments the sequence number and then batch inserts messages into the cache.
|
||
|
BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error)
|
||
|
SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
|
||
|
|
||
|
// to mq
|
||
|
MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error)
|
||
|
MsgToMongoMQ(ctx context.Context, key, conversationID string, msgs []*sdkws.MsgData, lastSeq int64) error
|
||
|
}
|
||
|
|
||
|
func NewMsgTransferDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (MsgTransferDatabase, error) {
|
||
|
conf, err := kafka.BuildProducerConfig(*kafkaConf.Build())
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
producerToMongo, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToMongoTopic)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
producerToPush, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToPushTopic)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return &msgTransferDatabase{
|
||
|
msgDocDatabase: msgDocModel,
|
||
|
msg: msg,
|
||
|
seqUser: seqUser,
|
||
|
seqConversation: seqConversation,
|
||
|
producerToMongo: producerToMongo,
|
||
|
producerToPush: producerToPush,
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
type msgTransferDatabase struct {
|
||
|
msgDocDatabase database.Msg
|
||
|
msgTable model.MsgDocModel
|
||
|
msg cache.MsgCache
|
||
|
seqConversation cache.SeqConversationCache
|
||
|
seqUser cache.SeqUser
|
||
|
producerToMongo *kafka.Producer
|
||
|
producerToPush *kafka.Producer
|
||
|
}
|
||
|
|
||
|
func (db *msgTransferDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
|
||
|
if len(msgList) == 0 {
|
||
|
return errs.ErrArgs.WrapMsg("msgList is empty")
|
||
|
}
|
||
|
msgs := make([]any, len(msgList))
|
||
|
for i, msg := range msgList {
|
||
|
if msg == nil {
|
||
|
continue
|
||
|
}
|
||
|
var offlinePushModel *model.OfflinePushModel
|
||
|
if msg.OfflinePushInfo != nil {
|
||
|
offlinePushModel = &model.OfflinePushModel{
|
||
|
Title: msg.OfflinePushInfo.Title,
|
||
|
Desc: msg.OfflinePushInfo.Desc,
|
||
|
Ex: msg.OfflinePushInfo.Ex,
|
||
|
IOSPushSound: msg.OfflinePushInfo.IOSPushSound,
|
||
|
IOSBadgeCount: msg.OfflinePushInfo.IOSBadgeCount,
|
||
|
}
|
||
|
}
|
||
|
msgs[i] = &model.MsgDataModel{
|
||
|
SendID: msg.SendID,
|
||
|
RecvID: msg.RecvID,
|
||
|
GroupID: msg.GroupID,
|
||
|
ClientMsgID: msg.ClientMsgID,
|
||
|
ServerMsgID: msg.ServerMsgID,
|
||
|
SenderPlatformID: msg.SenderPlatformID,
|
||
|
SenderNickname: msg.SenderNickname,
|
||
|
SenderFaceURL: msg.SenderFaceURL,
|
||
|
SessionType: msg.SessionType,
|
||
|
MsgFrom: msg.MsgFrom,
|
||
|
ContentType: msg.ContentType,
|
||
|
Content: string(msg.Content),
|
||
|
Seq: msg.Seq,
|
||
|
SendTime: msg.SendTime,
|
||
|
CreateTime: msg.CreateTime,
|
||
|
Status: msg.Status,
|
||
|
Options: msg.Options,
|
||
|
OfflinePush: offlinePushModel,
|
||
|
AtUserIDList: msg.AtUserIDList,
|
||
|
AttachedInfo: msg.AttachedInfo,
|
||
|
Ex: msg.Ex,
|
||
|
}
|
||
|
}
|
||
|
return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq)
|
||
|
}
|
||
|
|
||
|
func (db *msgTransferDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error {
|
||
|
if len(fields) == 0 {
|
||
|
return nil
|
||
|
}
|
||
|
num := db.msgTable.GetSingleGocMsgNum()
|
||
|
// num = 100
|
||
|
for i, field := range fields { // Check the type of the field
|
||
|
var ok bool
|
||
|
switch key {
|
||
|
case updateKeyMsg:
|
||
|
var msg *model.MsgDataModel
|
||
|
msg, ok = field.(*model.MsgDataModel)
|
||
|
if msg != nil && msg.Seq != firstSeq+int64(i) {
|
||
|
return errs.ErrInternalServer.WrapMsg("seq is invalid")
|
||
|
}
|
||
|
case updateKeyRevoke:
|
||
|
_, ok = field.(*model.RevokeModel)
|
||
|
default:
|
||
|
return errs.ErrInternalServer.WrapMsg("key is invalid")
|
||
|
}
|
||
|
if !ok {
|
||
|
return errs.ErrInternalServer.WrapMsg("field type is invalid")
|
||
|
}
|
||
|
}
|
||
|
// Returns true if the document exists in the database, false if the document does not exist in the database
|
||
|
updateMsgModel := func(seq int64, i int) (bool, error) {
|
||
|
var (
|
||
|
res *mongo.UpdateResult
|
||
|
err error
|
||
|
)
|
||
|
docID := db.msgTable.GetDocID(conversationID, seq)
|
||
|
index := db.msgTable.GetMsgIndex(seq)
|
||
|
field := fields[i]
|
||
|
switch key {
|
||
|
case updateKeyMsg:
|
||
|
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", field)
|
||
|
case updateKeyRevoke:
|
||
|
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", field)
|
||
|
}
|
||
|
if err != nil {
|
||
|
return false, err
|
||
|
}
|
||
|
return res.MatchedCount > 0, nil
|
||
|
}
|
||
|
tryUpdate := true
|
||
|
for i := 0; i < len(fields); i++ {
|
||
|
seq := firstSeq + int64(i) // Current sequence number
|
||
|
if tryUpdate {
|
||
|
matched, err := updateMsgModel(seq, i)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if matched {
|
||
|
continue // The current data has been updated, skip the current data
|
||
|
}
|
||
|
}
|
||
|
doc := model.MsgDocModel{
|
||
|
DocID: db.msgTable.GetDocID(conversationID, seq),
|
||
|
Msg: make([]*model.MsgInfoModel, num),
|
||
|
}
|
||
|
var insert int // Inserted data number
|
||
|
for j := i; j < len(fields); j++ {
|
||
|
seq = firstSeq + int64(j)
|
||
|
if db.msgTable.GetDocID(conversationID, seq) != doc.DocID {
|
||
|
break
|
||
|
}
|
||
|
insert++
|
||
|
switch key {
|
||
|
case updateKeyMsg:
|
||
|
doc.Msg[db.msgTable.GetMsgIndex(seq)] = &model.MsgInfoModel{
|
||
|
Msg: fields[j].(*model.MsgDataModel),
|
||
|
}
|
||
|
case updateKeyRevoke:
|
||
|
doc.Msg[db.msgTable.GetMsgIndex(seq)] = &model.MsgInfoModel{
|
||
|
Revoke: fields[j].(*model.RevokeModel),
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
for i, msgInfo := range doc.Msg {
|
||
|
if msgInfo == nil {
|
||
|
msgInfo = &model.MsgInfoModel{}
|
||
|
doc.Msg[i] = msgInfo
|
||
|
}
|
||
|
if msgInfo.DelList == nil {
|
||
|
doc.Msg[i].DelList = []string{}
|
||
|
}
|
||
|
}
|
||
|
if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
|
||
|
if mongo.IsDuplicateKeyError(err) {
|
||
|
i-- // already inserted
|
||
|
tryUpdate = true // next block use update mode
|
||
|
continue
|
||
|
}
|
||
|
return err
|
||
|
}
|
||
|
tryUpdate = false // The current block is inserted successfully, and the next block is inserted preferentially
|
||
|
i += insert - 1 // Skip the inserted data
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (db *msgTransferDatabase) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error {
|
||
|
return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs)
|
||
|
}
|
||
|
|
||
|
func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
|
||
|
lenList := len(msgs)
|
||
|
if int64(lenList) > db.msgTable.GetSingleGocMsgNum() {
|
||
|
return 0, false, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap()
|
||
|
}
|
||
|
if lenList < 1 {
|
||
|
return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap()
|
||
|
}
|
||
|
currentMaxSeq, err := db.seqConversation.Malloc(ctx, conversationID, int64(len(msgs)))
|
||
|
if err != nil {
|
||
|
log.ZError(ctx, "storage.seq.Malloc", err)
|
||
|
return 0, false, err
|
||
|
}
|
||
|
isNew = currentMaxSeq == 0
|
||
|
lastMaxSeq := currentMaxSeq
|
||
|
userSeqMap := make(map[string]int64)
|
||
|
for _, m := range msgs {
|
||
|
currentMaxSeq++
|
||
|
m.Seq = currentMaxSeq
|
||
|
userSeqMap[m.SendID] = m.Seq
|
||
|
}
|
||
|
|
||
|
failedNum, err := db.msg.SetMessagesToCache(ctx, conversationID, msgs)
|
||
|
if err != nil {
|
||
|
prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum))
|
||
|
log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID)
|
||
|
} else {
|
||
|
prommetrics.MsgInsertRedisSuccessCounter.Inc()
|
||
|
}
|
||
|
err = db.setHasReadSeqs(ctx, conversationID, userSeqMap)
|
||
|
if err != nil {
|
||
|
log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID)
|
||
|
prommetrics.SeqSetFailedCounter.Inc()
|
||
|
}
|
||
|
return lastMaxSeq, isNew, errs.Wrap(err)
|
||
|
}
|
||
|
|
||
|
func (db *msgTransferDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {
|
||
|
for userID, seq := range userSeqMap {
|
||
|
if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
|
||
|
return db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, hasReadSeq)
|
||
|
}
|
||
|
|
||
|
func (db *msgTransferDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) {
|
||
|
partition, offset, err := db.producerToPush.SendMessage(ctx, key, &pbmsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID})
|
||
|
if err != nil {
|
||
|
log.ZError(ctx, "MsgToPushMQ", err, "key", key, "msg2mq", msg2mq)
|
||
|
return 0, 0, err
|
||
|
}
|
||
|
return partition, offset, nil
|
||
|
}
|
||
|
|
||
|
func (db *msgTransferDatabase) MsgToMongoMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error {
|
||
|
if len(messages) > 0 {
|
||
|
_, _, err := db.producerToMongo.SendMessage(ctx, key, &pbmsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages})
|
||
|
if err != nil {
|
||
|
log.ZError(ctx, "MsgToMongoMQ", err, "key", key, "conversationID", conversationID, "lastSeq", lastSeq)
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|