You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Open-IM-Server/pkg/common/db/controller/msg.go

758 lines
30 KiB

2 years ago
package controller
2 years ago
import (
2 years ago
"fmt"
2 years ago
"sync"
"time"
2 years ago
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
2 years ago
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
2 years ago
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
2 years ago
"github.com/gogo/protobuf/sortkeys"
2 years ago
"context"
2 years ago
"errors"
2 years ago
2 years ago
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
2 years ago
"github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/mongo"
"github.com/golang/protobuf/proto"
2 years ago
)
2 years ago
type MsgDatabase interface {
2 years ago
// 批量插入消息
BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error
2 years ago
// 刪除redis中消息缓存
DeleteMessageFromCache(ctx context.Context, conversationID string, msgList []*pbMsg.MsgDataToMQ) error
2 years ago
// incrSeq然后批量插入缓存
BatchInsertChat2Cache(ctx context.Context, conversationID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) (int64, error)
// incrSeq通知seq然后批量插入缓存
NotificationBatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*pbMsg.MsgDataToMQ) (int64, error)
2 years ago
// 删除消息 返回不存在的seqList
2 years ago
DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error)
2 years ago
// 获取群ID或者UserID最新一条在mongo里面的消息
2 years ago
// 通过seqList获取mongo中写扩散消息
2 years ago
GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
GetMsgBySeqsRange(ctx context.Context, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error)
2 years ago
// 通过seqList获取大群在 mongo里面的消息
2 years ago
GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
2 years ago
// 删除用户所有消息/redis/mongo然后重置seq
2 years ago
CleanUpUserMsg(ctx context.Context, userID string) error
2 years ago
// 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache)
2 years ago
DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error
2 years ago
// 删除用户消息重置最小seq remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error
2 years ago
// 获取用户 seq mongo和redis
GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
// 获取群 seq mongo和redis
GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error)
// 设置群用户最小seq 直接调用cache
SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error)
2 years ago
GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error)
2 years ago
// 设置用户最小seq 直接调用cache
SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error)
2 years ago
2 years ago
JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
2 years ago
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error)
GetExtendMsg(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error)
InsertOrUpdateReactionExtendMsgSet(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error
2 years ago
GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error)
GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error)
DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error
DeleteReactionExtendMsgSet(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error
2 years ago
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
2 years ago
GetUserMaxSeq(ctx context.Context, userID string) (int64, error)
GetUserMinSeq(ctx context.Context, userID string) (int64, error)
GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
GetGroupMinSeq(ctx context.Context, groupID string) (int64, error)
2 years ago
MsgToMQ(ctx context.Context, key string, msg2mq *pbMsg.MsgDataToMQ) error
MsgToModifyMQ(ctx context.Context, conversationID string, messages []*pbMsg.MsgDataToMQ) error
MsgToPushMQ(ctx context.Context, conversationID string, msg2mq *pbMsg.MsgDataToMQ) (int32, int64, error)
MsgToMongoMQ(ctx context.Context, conversationID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error
2 years ago
}
2 years ago
2 years ago
func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.Model) MsgDatabase {
return &msgDatabase{
2 years ago
msgDocDatabase: msgDocModel,
cache: cacheModel,
producer: kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic),
producerToMongo: kafka.NewKafkaProducer(config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.MsgToMongo.Topic),
producerToPush: kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic),
producerToModify: kafka.NewKafkaProducer(config.Config.Kafka.MsgToModify.Addr, config.Config.Kafka.MsgToModify.Topic),
2 years ago
}
}
func InitMsgDatabase(rdb redis.UniversalClient, database *mongo.Database) MsgDatabase {
cacheModel := cache.NewCacheModel(rdb)
msgDocModel := unrelation.NewMsgMongoDriver(database)
msgDatabase := NewMsgDatabase(msgDocModel, cacheModel)
return msgDatabase
2 years ago
}
2 years ago
type msgDatabase struct {
2 years ago
msgDocDatabase unRelationTb.MsgDocModelInterface
extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface
cache cache.Model
2 years ago
producer *kafka.Producer
2 years ago
producerToMongo *kafka.Producer
producerToModify *kafka.Producer
producerToPush *kafka.Producer
2 years ago
// model
2 years ago
msg unRelationTb.MsgDocModel
extendMsgSetModel unRelationTb.ExtendMsgSetModel
2 years ago
}
2 years ago
func (db *msgDatabase) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
return db.cache.JudgeMessageReactionExist(ctx, clientMsgID, sessionType)
2 years ago
}
2 years ago
func (db *msgDatabase) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
2 years ago
return db.cache.SetMessageTypeKeyValue(ctx, clientMsgID, sessionType, typeKey, value)
2 years ago
}
2 years ago
func (db *msgDatabase) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
2 years ago
return db.cache.SetMessageReactionExpire(ctx, clientMsgID, sessionType, expiration)
2 years ago
}
2 years ago
func (db *msgDatabase) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
2 years ago
return db.cache.GetMessageTypeKeyValue(ctx, clientMsgID, sessionType, typeKey)
2 years ago
}
2 years ago
func (db *msgDatabase) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
2 years ago
return db.cache.GetOneMessageAllReactionList(ctx, clientMsgID, sessionType)
2 years ago
}
2 years ago
func (db *msgDatabase) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
2 years ago
return db.cache.DeleteOneMessageKey(ctx, clientMsgID, sessionType, subKey)
2 years ago
}
func (db *msgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensions map[string]*sdkws.KeyValue) error {
return db.extendMsgDatabase.InsertOrUpdateReactionExtendMsgSet(ctx, conversationID, sessionType, clientMsgID, msgFirstModifyTime, db.extendMsgSetModel.Pb2Model(reactionExtensions))
2 years ago
}
func (db *msgDatabase) GetExtendMsg(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) {
extendMsgSet, err := db.extendMsgDatabase.GetExtendMsgSet(ctx, conversationID, sessionType, maxMsgUpdateTime)
2 years ago
if err != nil {
return nil, err
}
extendMsg, ok := extendMsgSet.ExtendMsgs[clientMsgID]
if !ok {
return nil, errors.New(fmt.Sprintf("cant find client msg id: %s", clientMsgID))
}
reactionExtensionList := make(map[string]*pbMsg.KeyValueResp)
for key, model := range extendMsg.ReactionExtensionList {
reactionExtensionList[key] = &pbMsg.KeyValueResp{
KeyValue: &sdkws.KeyValue{
TypeKey: model.TypeKey,
Value: model.Value,
LatestUpdateTime: model.LatestUpdateTime,
},
}
}
return &pbMsg.ExtendMsg{
2 years ago
ReactionExtensions: reactionExtensionList,
ClientMsgID: extendMsg.ClientMsgID,
MsgFirstModifyTime: extendMsg.MsgFirstModifyTime,
AttachedInfo: extendMsg.AttachedInfo,
Ex: extendMsg.Ex,
2 years ago
}, nil
2 years ago
}
func (db *msgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensions map[string]*sdkws.KeyValue) error {
return db.extendMsgDatabase.DeleteReactionExtendMsgSet(ctx, conversationID, sessionType, clientMsgID, msgFirstModifyTime, db.extendMsgSetModel.Pb2Model(reactionExtensions))
2 years ago
}
2 years ago
func (db *msgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
2 years ago
return db.cache.SetSendMsgStatus(ctx, id, status)
2 years ago
}
2 years ago
func (db *msgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
2 years ago
return db.cache.GetSendMsgStatus(ctx, id)
2 years ago
}
2 years ago
func (db *msgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *pbMsg.MsgDataToMQ) error {
_, _, err := db.producer.SendMessage(ctx, key, msg2mq)
return err
2 years ago
}
func (db *msgDatabase) MsgToModifyMQ(ctx context.Context, conversationID string, messages []*pbMsg.MsgDataToMQ) error {
2 years ago
if len(messages) > 0 {
_, _, err := db.producerToModify.SendMessage(ctx, conversationID, &pbMsg.MsgDataToModifyByMQ{ConversationID: conversationID, Messages: messages})
2 years ago
return err
}
return nil
}
func (db *msgDatabase) MsgToPushMQ(ctx context.Context, conversationID string, msg2mq *pbMsg.MsgDataToMQ) (int32, int64, error) {
mqPushMsg := pbMsg.PushMsgDataToMQ{MsgData: msg2mq.MsgData, ConversationID: conversationID}
partition, offset, err := db.producerToPush.SendMessage(ctx, conversationID, &mqPushMsg)
if err != nil {
log.ZError(ctx, "MsgToPushMQ", err, "key", conversationID, "msg2mq", msg2mq)
}
return partition, offset, err
2 years ago
}
func (db *msgDatabase) MsgToMongoMQ(ctx context.Context, conversationID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error {
2 years ago
if len(messages) > 0 {
_, _, err := db.producerToModify.SendMessage(ctx, conversationID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, Messages: messages})
2 years ago
return err
}
return nil
}
2 years ago
func (db *msgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) {
2 years ago
return db.cache.GetUserMaxSeq(ctx, userID)
2 years ago
}
2 years ago
func (db *msgDatabase) GetUserMinSeq(ctx context.Context, userID string) (int64, error) {
2 years ago
return db.cache.GetUserMinSeq(ctx, userID)
2 years ago
}
2 years ago
func (db *msgDatabase) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) {
2 years ago
return db.cache.GetGroupMaxSeq(ctx, groupID)
2 years ago
}
2 years ago
func (db *msgDatabase) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) {
2 years ago
return db.cache.GetGroupMinSeq(ctx, groupID)
2 years ago
}
func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error {
2 years ago
//newTime := utils.GetCurrentTimestampByMill()
2 years ago
if int64(len(msgList)) > db.msg.GetSingleGocMsgNum() {
2 years ago
return errors.New("too large")
}
2 years ago
var remain int64
blk0 := db.msg.GetSingleGocMsgNum() - 1
2 years ago
//currentMaxSeq 4998
2 years ago
if currentMaxSeq < db.msg.GetSingleGocMsgNum() {
2 years ago
remain = blk0 - currentMaxSeq //1
} else {
excludeBlk0 := currentMaxSeq - blk0 //=1
//(5000-1)%5000 == 4999
2 years ago
remain = (db.msg.GetSingleGocMsgNum() - (excludeBlk0 % db.msg.GetSingleGocMsgNum())) % db.msg.GetSingleGocMsgNum()
2 years ago
}
//remain=1
2 years ago
var insertCounter int64
2 years ago
msgsToMongo := make([]unRelationTb.MsgInfoModel, 0)
msgsToMongoNext := make([]unRelationTb.MsgInfoModel, 0)
docID := ""
docIDNext := ""
var err error
for _, m := range msgList {
//log.Debug(operationID, "msg node ", m.String(), m.MsgData.ClientMsgID)
currentMaxSeq++
sMsg := unRelationTb.MsgInfoModel{}
sMsg.SendTime = m.MsgData.SendTime
2 years ago
m.MsgData.Seq = currentMaxSeq
2 years ago
if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil {
return utils.Wrap(err, "")
}
if insertCounter < remain {
msgsToMongo = append(msgsToMongo, sMsg)
insertCounter++
docID = db.msg.GetDocID(conversationID, currentMaxSeq)
2 years ago
//log.Debug(operationID, "msgListToMongo ", seqUid, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID)
} else {
msgsToMongoNext = append(msgsToMongoNext, sMsg)
docIDNext = db.msg.GetDocID(conversationID, currentMaxSeq)
2 years ago
//log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain, "userID: ", userID)
}
}
if docID != "" {
//filter := bson.M{"uid": seqUid}
//log.NewDebug(operationID, "filter ", seqUid, "list ", msgListToMongo, "userID: ", userID)
//err := c.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgsToMongo}}}).Err()
2 years ago
err = db.msgDocDatabase.PushMsgsToDoc(ctx, docID, msgsToMongo)
2 years ago
if err != nil {
if err == mongo.ErrNoDocuments {
doc := &unRelationTb.MsgDocModel{}
doc.DocID = docID
doc.Msg = msgsToMongo
2 years ago
if err = db.msgDocDatabase.Create(ctx, doc); err != nil {
2 years ago
prome.Inc(prome.MsgInsertMongoFailedCounter)
2 years ago
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
return utils.Wrap(err, "")
}
2 years ago
prome.Inc(prome.MsgInsertMongoSuccessCounter)
2 years ago
} else {
2 years ago
prome.Inc(prome.MsgInsertMongoFailedCounter)
2 years ago
//log.Error(operationID, "FindOneAndUpdate failed ", err.Error(), filter)
return utils.Wrap(err, "")
}
} else {
2 years ago
prome.Inc(prome.MsgInsertMongoSuccessCounter)
2 years ago
}
}
if docIDNext != "" {
nextDoc := &unRelationTb.MsgDocModel{}
nextDoc.DocID = docIDNext
nextDoc.Msg = msgsToMongoNext
//log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID)
2 years ago
if err = db.msgDocDatabase.Create(ctx, nextDoc); err != nil {
2 years ago
prome.Inc(prome.MsgInsertMongoFailedCounter)
2 years ago
//log.NewError(operationID, "InsertOne failed", filter, err.Error(), sChat)
return utils.Wrap(err, "")
}
2 years ago
prome.Inc(prome.MsgInsertMongoSuccessCounter)
2 years ago
}
//log.Debug(operationID, "batch mgo cost time ", mongo2.getCurrentTimestampByMill()-newTime, userID, len(msgList))
return nil
}
2 years ago
func (db *msgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbMsg.MsgDataToMQ) error {
2 years ago
return db.cache.DeleteMessageFromCache(ctx, userID, msgs)
2 years ago
}
func (db *msgDatabase) NotificationBatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*pbMsg.MsgDataToMQ) (int64, error) {
lenList := len(msgs)
if int64(lenList) > db.msg.GetSingleGocMsgNum() {
return 0, errors.New("too large")
}
if lenList < 1 {
return 0, errors.New("too short as 0")
}
return 0, nil
}
func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) (int64, error) {
2 years ago
//newTime := utils.GetCurrentTimestampByMill()
lenList := len(msgList)
2 years ago
if int64(lenList) > db.msg.GetSingleGocMsgNum() {
2 years ago
return 0, errors.New("too large")
}
if lenList < 1 {
return 0, errors.New("too short as 0")
}
// judge sessionType to get seq
lastMaxSeq := currentMaxSeq
for _, m := range msgList {
currentMaxSeq++
2 years ago
m.MsgData.Seq = currentMaxSeq
//log.Debug(operationID, "cache msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", conversationID, "seq: ", currentMaxSeq)
2 years ago
}
//log.Debug(operationID, "SetMessageToCache ", conversationID, len(msgList))
failedNum, err := db.cache.SetMessageToCache(ctx, conversationID, msgList)
2 years ago
if err != nil {
2 years ago
prome.Add(prome.MsgInsertRedisFailedCounter, failedNum)
//log.Error(operationID, "setMessageToCache failed, continue ", err.Error(), len(msgList), conversationID)
2 years ago
} else {
2 years ago
prome.Inc(prome.MsgInsertRedisSuccessCounter)
2 years ago
}
//log.Debug(operationID, "batch to redis cost time ", mongo2.getCurrentTimestampByMill()-newTime, conversationID, len(msgList))
2 years ago
if msgList[0].MsgData.SessionType == constant.SuperGroupChatType {
err = db.cache.SetGroupMaxSeq(ctx, conversationID, currentMaxSeq)
2 years ago
} else {
err = db.cache.SetUserMaxSeq(ctx, conversationID, currentMaxSeq)
2 years ago
}
if err != nil {
2 years ago
prome.Inc(prome.SeqSetFailedCounter)
2 years ago
} else {
2 years ago
prome.Inc(prome.SeqSetSuccessCounter)
2 years ago
}
return lastMaxSeq, utils.Wrap(err, "")
}
2 years ago
func (db *msgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) {
2 years ago
sortkeys.Int64s(seqs)
2 years ago
docIDSeqsMap := db.msg.GetDocIDSeqsMap(userID, seqs)
lock := sync.Mutex{}
var wg sync.WaitGroup
wg.Add(len(docIDSeqsMap))
for k, v := range docIDSeqsMap {
2 years ago
go func(docID string, seqs []int64) {
2 years ago
defer wg.Done()
unExistSeqList, err := db.DelMsgBySeqsInOneDoc(ctx, docID, seqs)
if err != nil {
return
}
lock.Lock()
totalUnExistSeqs = append(totalUnExistSeqs, unExistSeqList...)
lock.Unlock()
}(k, v)
}
return totalUnExistSeqs, nil
}
2 years ago
func (db *msgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) {
2 years ago
seqMsgs, indexes, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs)
if err != nil {
return nil, err
}
for i, v := range seqMsgs {
2 years ago
if err = db.msgDocDatabase.UpdateMsgStatusByIndexInOneDoc(ctx, docID, v, indexes[i], constant.MsgDeleted); err != nil {
2 years ago
return nil, err
}
}
return unExistSeqs, nil
}
2 years ago
func (db *msgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) {
2 years ago
doc, err := db.msgDocDatabase.FindOneByDocID(ctx, docID)
2 years ago
if err != nil {
return nil, nil, nil, err
}
singleCount := 0
2 years ago
var hasSeqList []int64
2 years ago
for i := 0; i < len(doc.Msg); i++ {
msgPb, err := db.unmarshalMsg(&doc.Msg[i])
if err != nil {
return nil, nil, nil, err
}
2 years ago
if utils.Contain(msgPb.Seq, seqs...) {
2 years ago
indexes = append(indexes, i)
seqMsgs = append(seqMsgs, msgPb)
hasSeqList = append(hasSeqList, msgPb.Seq)
singleCount++
if singleCount == len(seqs) {
break
}
}
}
for _, i := range seqs {
2 years ago
if utils.Contain(i, hasSeqList...) {
2 years ago
continue
}
unExistSeqs = append(unExistSeqs, i)
}
return seqMsgs, indexes, unExistSeqs, nil
}
func (db *msgDatabase) GetNewestMsg(ctx context.Context, conversationID string) (msgPb *sdkws.MsgData, err error) {
msgInfo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID)
2 years ago
if err != nil {
return nil, err
}
return db.unmarshalMsg(msgInfo)
}
func (db *msgDatabase) GetOldestMsg(ctx context.Context, conversationID string) (msgPb *sdkws.MsgData, err error) {
msgInfo, err := db.msgDocDatabase.GetOldestMsg(ctx, conversationID)
2 years ago
if err != nil {
return nil, err
}
return db.unmarshalMsg(msgInfo)
}
2 years ago
func (db *msgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) {
2 years ago
msgPb = &sdkws.MsgData{}
err = proto.Unmarshal(msgInfo.Msg, msgPb)
if err != nil {
return nil, utils.Wrap(err, "")
}
return msgPb, nil
}
func (db *msgDatabase) getMsgBySeqs(ctx context.Context, conversationID string, seqs []int64, diffusionType int) (seqMsgs []*sdkws.MsgData, err error) {
2 years ago
var hasSeqs []int64
2 years ago
singleCount := 0
m := db.msg.GetDocIDSeqsMap(conversationID, seqs)
2 years ago
for docID, value := range m {
2 years ago
doc, err := db.msgDocDatabase.FindOneByDocID(ctx, docID)
2 years ago
if err != nil {
//log.NewError(operationID, "not find seqUid", seqUid, value, uid, seqList, err.Error())
continue
}
singleCount = 0
for i := 0; i < len(doc.Msg); i++ {
msgPb, err := db.unmarshalMsg(&doc.Msg[i])
if err != nil {
//log.NewError(operationID, "Unmarshal err", seqUid, value, uid, seqList, err.Error())
return nil, err
}
2 years ago
if utils.Contain(msgPb.Seq, value...) {
2 years ago
seqMsgs = append(seqMsgs, msgPb)
2 years ago
hasSeqs = append(hasSeqs, msgPb.Seq)
singleCount++
if singleCount == len(value) {
break
}
}
}
}
if len(hasSeqs) != len(seqs) {
2 years ago
var diff []int64
2 years ago
var exceptionMsg []*sdkws.MsgData
diff = utils.Difference(hasSeqs, seqs)
if diffusionType == constant.WriteDiffusion {
exceptionMsg = db.msg.GenExceptionMessageBySeqs(diff)
} else if diffusionType == constant.ReadDiffusion {
exceptionMsg = db.msg.GenExceptionSuperGroupMessageBySeqs(diff, conversationID)
2 years ago
}
2 years ago
seqMsgs = append(seqMsgs, exceptionMsg...)
2 years ago
}
2 years ago
return seqMsgs, nil
2 years ago
}
2 years ago
func (db *msgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (successMsgs []*sdkws.MsgData, err error) {
2 years ago
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, userID, seqs)
2 years ago
if err != nil {
if err != redis.Nil {
2 years ago
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
2 years ago
log.Error(mcontext.GetOperationID(ctx), "get message from redis exception", err.Error(), failedSeqs)
2 years ago
}
}
2 years ago
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
2 years ago
if len(failedSeqs) > 0 {
mongoMsgs, err := db.getMsgBySeqs(ctx, userID, seqs, constant.WriteDiffusion)
if err != nil {
2 years ago
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
2 years ago
return nil, err
}
2 years ago
prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
2 years ago
successMsgs = append(successMsgs, mongoMsgs...)
}
return successMsgs, nil
2 years ago
}
func (db *msgDatabase) GetMsgBySeqsRange(ctx context.Context, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) {
var seqs []int64
for i := begin; i <= end; i++ {
seqs = append(seqs, i)
}
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs)
if err != nil {
if err != redis.Nil {
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
log.Error(mcontext.GetOperationID(ctx), "get message from redis exception", err.Error(), failedSeqs)
}
}
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
if len(failedSeqs) > 0 {
mongoMsgs, err := db.getMsgBySeqs(ctx, conversationID, seqs, constant.ReadDiffusion)
if err != nil {
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
return nil, err
}
prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
successMsgs = append(successMsgs, mongoMsgs...)
}
return successMsgs, nil
}
2 years ago
func (db *msgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (successMsgs []*sdkws.MsgData, err error) {
2 years ago
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, groupID, seqs)
2 years ago
if err != nil {
2 years ago
if err != redis.Nil {
2 years ago
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
2 years ago
log.Error(mcontext.GetOperationID(ctx), "get message from redis exception", err.Error(), failedSeqs)
2 years ago
}
2 years ago
}
2 years ago
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
2 years ago
if len(failedSeqs) > 0 {
mongoMsgs, err := db.getMsgBySeqs(ctx, groupID, seqs, constant.ReadDiffusion)
if err != nil {
2 years ago
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
2 years ago
return nil, err
}
2 years ago
prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
2 years ago
successMsgs = append(successMsgs, mongoMsgs...)
2 years ago
}
2 years ago
return successMsgs, nil
}
2 years ago
func (db *msgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error {
2 years ago
err := db.DeleteUserMsgsAndSetMinSeq(ctx, userID, 0)
2 years ago
if err != nil {
return err
}
2 years ago
err = db.cache.CleanUpOneUserAllMsg(ctx, userID)
2 years ago
return utils.Wrap(err, "")
2 years ago
}
2 years ago
func (db *msgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error {
2 years ago
var delStruct delMsgRecursionStruct
minSeq, err := db.deleteMsgRecursion(ctx, groupID, unRelationTb.OldestList, &delStruct, remainTime)
if err != nil {
//log.NewError(operationID, utils.GetSelfFuncName(), groupID, "deleteMsg failed")
}
if minSeq == 0 {
return nil
}
//log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDList:", delStruct, "minSeq", minSeq)
for _, userID := range userIDs {
2 years ago
userMinSeq, err := db.cache.GetGroupUserMinSeq(ctx, groupID, userID)
2 years ago
if err != nil && err != redis.Nil {
//log.NewError(operationID, utils.GetSelfFuncName(), "GetGroupUserMinSeq failed", groupID, userID, err.Error())
continue
}
2 years ago
if userMinSeq > minSeq {
err = db.cache.SetGroupUserMinSeq(ctx, groupID, userID, userMinSeq)
2 years ago
} else {
2 years ago
err = db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
2 years ago
}
if err != nil {
//log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userID, userMinSeq, minSeq)
}
}
return nil
}
2 years ago
func (db *msgDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error {
2 years ago
var delStruct delMsgRecursionStruct
minSeq, err := db.deleteMsgRecursion(ctx, userID, unRelationTb.OldestList, &delStruct, remainTime)
if err != nil {
return utils.Wrap(err, "")
}
if minSeq == 0 {
return nil
}
2 years ago
return db.cache.SetUserMinSeq(ctx, userID, minSeq)
2 years ago
}
// this is struct for recursion
type delMsgRecursionStruct struct {
2 years ago
minSeq int64
delDocIDs []string
2 years ago
}
2 years ago
func (d *delMsgRecursionStruct) getSetMinSeq() int64 {
2 years ago
return d.minSeq
}
2 years ago
2 years ago
// index 0....19(del) 20...69
// seq 70
// set minSeq 21
// recursion 删除list并且返回设置的最小seq
func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, conversationID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) {
2 years ago
// find from oldest list
msgs, err := db.msgDocDatabase.GetMsgsByIndex(ctx, conversationID, index)
2 years ago
if err != nil || msgs.DocID == "" {
if err != nil {
if err == unrelation.ErrMsgListNotExist {
log.NewDebug(mcontext.GetOperationID(ctx), utils.GetSelfFuncName(), "ID:", conversationID, "index:", index, err.Error())
2 years ago
} else {
//log.NewError(operationID, utils.GetSelfFuncName(), "GetUserMsgListByIndex failed", err.Error(), index, ID)
}
}
2 years ago
// 获取报错或者获取不到了物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归
2 years ago
err = db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs)
2 years ago
if err != nil {
return 0, err
}
return delStruct.getSetMinSeq() + 1, nil
}
//log.NewDebug(operationID, "ID:", conversationID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg))
2 years ago
if int64(len(msgs.Msg)) > db.msg.GetSingleGocMsgNum() {
2 years ago
log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgs.Msg), "docID:", msgs.DocID)
2 years ago
}
if msgs.Msg[len(msgs.Msg)-1].SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() && msgs.IsFull() {
2 years ago
delStruct.delDocIDs = append(delStruct.delDocIDs, msgs.DocID)
2 years ago
lastMsgPb := &sdkws.MsgData{}
err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb)
if err != nil {
//log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID)
return 0, utils.Wrap(err, "proto.Unmarshal failed")
}
delStruct.minSeq = lastMsgPb.Seq
} else {
var hasMarkDelFlag bool
for _, msg := range msgs.Msg {
msgPb := &sdkws.MsgData{}
err = proto.Unmarshal(msg.Msg, msgPb)
if err != nil {
//log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID)
return 0, utils.Wrap(err, "proto.Unmarshal failed")
}
if utils.GetCurrentTimestampByMill() > msg.SendTime+(remainTime*1000) {
msgPb.Status = constant.MsgDeleted
bytes, _ := proto.Marshal(msgPb)
msg.Msg = bytes
msg.SendTime = 0
hasMarkDelFlag = true
} else {
2 years ago
// 到本条消息不需要删除, minSeq置为这条消息的seq
2 years ago
if err := db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs); err != nil {
2 years ago
return 0, err
}
if hasMarkDelFlag {
2 years ago
if err := db.msgDocDatabase.UpdateOneDoc(ctx, msgs); err != nil {
2 years ago
return delStruct.getSetMinSeq(), utils.Wrap(err, "")
}
}
2 years ago
return msgPb.Seq, nil
2 years ago
}
}
}
// 继续递归 index+1
seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime)
2 years ago
return seq, utils.Wrap(err, "deleteMsg failed")
2 years ago
}
2 years ago
2 years ago
func (db *msgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
2 years ago
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, userID)
if err != nil {
return 0, 0, 0, 0, err
}
// from cache
minSeqCache, err = db.cache.GetUserMinSeq(ctx, userID)
if err != nil {
return 0, 0, 0, 0, err
}
maxSeqCache, err = db.cache.GetUserMaxSeq(ctx, userID)
if err != nil {
return 0, 0, 0, 0, err
}
return
}
2 years ago
func (db *msgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) {
2 years ago
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, groupID)
if err != nil {
return 0, 0, 0, err
}
maxSeqCache, err = db.cache.GetGroupMaxSeq(ctx, groupID)
if err != nil {
return 0, 0, 0, err
}
return
}
func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) {
oldestMsgMongo, err := db.msgDocDatabase.GetOldestMsg(ctx, conversationID)
2 years ago
if err != nil {
return 0, 0, err
}
msgPb, err := db.unmarshalMsg(oldestMsgMongo)
if err != nil {
return 0, 0, err
}
minSeqMongo = msgPb.Seq
newestMsgMongo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID)
2 years ago
if err != nil {
return 0, 0, err
}
msgPb, err = db.unmarshalMsg(newestMsgMongo)
if err != nil {
return 0, 0, err
}
maxSeqMongo = msgPb.Seq
return
}
2 years ago
func (db *msgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) {
2 years ago
return db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq)
}
2 years ago
func (db *msgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) {
2 years ago
return db.cache.SetUserMinSeq(ctx, userID, minSeq)
}
2 years ago
func (db *msgDatabase) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) {
return db.cache.GetGroupUserMinSeq(ctx, groupID, userID)
}