You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Open-IM-Server/pkg/common/db/controller/msg.go

786 lines
31 KiB

package controller
import (
2 years ago
"fmt"
"sort"
"time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
2 years ago
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/convert"
2 years ago
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"context"
2 years ago
"errors"
2 years ago
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
2 years ago
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/go-redis/redis/v8"
"go.mongodb.org/mongo-driver/mongo"
)
2 years ago
const (
updateKeyMsg = iota
updateKeyRevoke
)
2 years ago
type CommonMsgDatabase interface {
// 批量插入消息
2 years ago
BatchInsertChat2DB(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, currentMaxSeq int64) error
2 years ago
// 撤回消息
2 years ago
RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unRelationTb.RevokeModel) error
2 years ago
// 刪除redis中消息缓存
2 years ago
DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error
2 years ago
// incrSeq然后批量插入缓存
BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error)
2 years ago
2 years ago
// 通过seqList获取mongo中写扩散消息
2 years ago
GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error)
2 years ago
// 通过seqList获取大群在 mongo里面的消息
2 years ago
GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error)
2 years ago
// 删除会话消息重置最小seq remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error
2 years ago
// 用户根据seq删除消息
DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error
// 物理删除消息置空
DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, seqs []int64) error
2 years ago
2 years ago
SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error
2 years ago
GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
2 years ago
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error
2 years ago
GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
2 years ago
GetMinSeq(ctx context.Context, conversationID string) (int64, error)
GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error)
SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error
SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error)
2 years ago
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error)
2 years ago
GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (maxSeq, minSeq int64, err error)
2 years ago
GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
2 years ago
// to mq
MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error
2 years ago
MsgToModifyMQ(ctx context.Context, key, conversarionID string, msgs []*sdkws.MsgData) error
MsgToPushMQ(ctx context.Context, key, conversarionID string, msg2mq *sdkws.MsgData) (int32, int64, error)
MsgToMongoMQ(ctx context.Context, key, conversarionID string, msgs []*sdkws.MsgData, lastSeq int64) error
2 years ago
// modify
JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error)
GetExtendMsg(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error)
InsertOrUpdateReactionExtendMsgSet(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error
GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error)
GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error)
DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error
DeleteReactionExtendMsgSet(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error
}
func NewCommonMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.MsgModel) CommonMsgDatabase {
return &commonMsgDatabase{
msgDocDatabase: msgDocModel,
cache: cacheModel,
producer: kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic),
producerToMongo: kafka.NewKafkaProducer(config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.MsgToMongo.Topic),
producerToPush: kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic),
producerToModify: kafka.NewKafkaProducer(config.Config.Kafka.MsgToModify.Addr, config.Config.Kafka.MsgToModify.Topic),
}
}
func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database) CommonMsgDatabase {
cacheModel := cache.NewMsgCacheModel(rdb)
msgDocModel := unrelation.NewMsgMongoDriver(database)
CommonMsgDatabase := NewCommonMsgDatabase(msgDocModel, cacheModel)
return CommonMsgDatabase
}
type commonMsgDatabase struct {
msgDocDatabase unRelationTb.MsgDocModelInterface
extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface
extendMsgSetModel unRelationTb.ExtendMsgSetModel
2 years ago
msg unRelationTb.MsgDocModel
2 years ago
cache cache.MsgModel
producer *kafka.Producer
producerToMongo *kafka.Producer
producerToModify *kafka.Producer
producerToPush *kafka.Producer
}
func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error {
_, _, err := db.producer.SendMessage(ctx, key, msg2mq)
return err
}
2 years ago
func (db *commonMsgDatabase) MsgToModifyMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData) error {
2 years ago
if len(messages) > 0 {
2 years ago
_, _, err := db.producerToModify.SendMessage(ctx, key, &pbMsg.MsgDataToModifyByMQ{ConversationID: conversationID, Messages: messages})
2 years ago
return err
}
return nil
}
2 years ago
func (db *commonMsgDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) {
partition, offset, err := db.producerToPush.SendMessage(ctx, key, &pbMsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID})
2 years ago
if err != nil {
2 years ago
log.ZError(ctx, "MsgToPushMQ", err, "key", key, "msg2mq", msg2mq)
2 years ago
return 0, 0, err
2 years ago
}
2 years ago
return partition, offset, nil
2 years ago
}
2 years ago
func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error {
2 years ago
if len(messages) > 0 {
2 years ago
_, _, err := db.producerToMongo.SendMessage(ctx, key, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages})
2 years ago
return err
}
return nil
}
2 years ago
func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error {
if len(fields) == 0 {
2 years ago
return nil
}
num := db.msg.GetSingleGocMsgNum()
2 years ago
//num = 100
2 years ago
for i, field := range fields { // 检查类型
var ok bool
switch key {
case updateKeyMsg:
var msg *unRelationTb.MsgDataModel
msg, ok = field.(*unRelationTb.MsgDataModel)
if msg != nil && msg.Seq != firstSeq+int64(i) {
return errs.ErrInternalServer.Wrap("seq is invalid")
}
case updateKeyRevoke:
_, ok = field.(*unRelationTb.RevokeModel)
default:
return errs.ErrInternalServer.Wrap("key is invalid")
}
if !ok {
return errs.ErrInternalServer.Wrap("field type is invalid")
}
2 years ago
}
// 返回值为true表示数据库存在该文档false表示数据库不存在该文档
2 years ago
updateMsgModel := func(seq int64, i int) (bool, error) {
2 years ago
var (
res *mongo.UpdateResult
err error
)
2 years ago
docID := db.msg.GetDocID(conversationID, seq)
index := db.msg.GetMsgIndex(seq)
2 years ago
field := fields[i]
switch key {
case updateKeyMsg:
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "msg", field)
case updateKeyRevoke:
res, err = db.msgDocDatabase.UpdateMsg(ctx, docID, index, "revoke", field)
2 years ago
}
if err != nil {
return false, err
}
return res.MatchedCount > 0, nil
}
tryUpdate := true
2 years ago
for i := 0; i < len(fields); i++ {
seq := firstSeq + int64(i) // 当前seq
2 years ago
if tryUpdate {
2 years ago
matched, err := updateMsgModel(seq, i)
2 years ago
if err != nil {
return err
}
if matched {
2 years ago
continue // 匹配到了,继续下一个(不一定修改)
2 years ago
}
}
doc := unRelationTb.MsgDocModel{
2 years ago
DocID: db.msg.GetDocID(conversationID, seq),
2 years ago
Msg: make([]*unRelationTb.MsgInfoModel, num),
2 years ago
}
2 years ago
var insert int // 插入的数量
for j := i; j < len(fields); j++ {
2 years ago
seq = firstSeq + int64(j)
2 years ago
if db.msg.GetDocID(conversationID, seq) != doc.DocID {
2 years ago
break
}
insert++
2 years ago
switch key {
case updateKeyMsg:
2 years ago
doc.Msg[db.msg.GetMsgIndex(seq)] = &unRelationTb.MsgInfoModel{
2 years ago
Msg: fields[j].(*unRelationTb.MsgDataModel),
}
case updateKeyRevoke:
2 years ago
doc.Msg[db.msg.GetMsgIndex(seq)] = &unRelationTb.MsgInfoModel{
2 years ago
Revoke: fields[j].(*unRelationTb.RevokeModel),
2 years ago
}
2 years ago
}
2 years ago
}
2 years ago
for i, model := range doc.Msg {
2 years ago
if model == nil {
model = &unRelationTb.MsgInfoModel{}
doc.Msg[i] = model
}
2 years ago
if model.DelList == nil {
doc.Msg[i].DelList = []string{}
}
}
2 years ago
if err := db.msgDocDatabase.Create(ctx, &doc); err != nil {
if mongo.IsDuplicateKeyError(err) {
2 years ago
i-- // 存在并发,重试当前数据
tryUpdate = true // 以修改模式
2 years ago
continue
}
return err
}
2 years ago
tryUpdate = false // 当前以插入成功,下一块优先插入模式
i += insert - 1 // 跳过已插入的数据
2 years ago
}
return nil
}
2 years ago
func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error {
2 years ago
if len(msgList) == 0 {
return errs.ErrArgs.Wrap("msgList is empty")
}
2 years ago
msgs := make([]any, len(msgList))
for i, msg := range msgList {
if msg == nil {
continue
}
var offlinePushModel *unRelationTb.OfflinePushModel
if msg.OfflinePushInfo != nil {
offlinePushModel = &unRelationTb.OfflinePushModel{
Title: msg.OfflinePushInfo.Title,
Desc: msg.OfflinePushInfo.Desc,
Ex: msg.OfflinePushInfo.Ex,
IOSPushSound: msg.OfflinePushInfo.IOSPushSound,
IOSBadgeCount: msg.OfflinePushInfo.IOSBadgeCount,
}
}
msgs[i] = &unRelationTb.MsgDataModel{
SendID: msg.SendID,
RecvID: msg.RecvID,
GroupID: msg.GroupID,
ClientMsgID: msg.ClientMsgID,
ServerMsgID: msg.ServerMsgID,
SenderPlatformID: msg.SenderPlatformID,
SenderNickname: msg.SenderNickname,
SenderFaceURL: msg.SenderFaceURL,
SessionType: msg.SessionType,
MsgFrom: msg.MsgFrom,
ContentType: msg.ContentType,
Content: string(msg.Content),
Seq: msg.Seq,
SendTime: msg.SendTime,
CreateTime: msg.CreateTime,
Status: msg.Status,
Options: msg.Options,
OfflinePush: offlinePushModel,
AtUserIDList: msg.AtUserIDList,
AttachedInfo: msg.AttachedInfo,
Ex: msg.Ex,
}
}
2 years ago
return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq)
}
2 years ago
func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *unRelationTb.RevokeModel) error {
2 years ago
return db.BatchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq)
2 years ago
}
2 years ago
func (db *commonMsgDatabase) DeleteMessageFromCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error {
return db.cache.DeleteMessageFromCache(ctx, conversationID, msgs)
}
func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
currentMaxSeq, err := db.cache.GetMaxSeq(ctx, conversationID)
2 years ago
if err != nil && errs.Unwrap(err) != redis.Nil {
2 years ago
prome.Inc(prome.SeqGetFailedCounter)
return 0, false, err
}
prome.Inc(prome.SeqGetSuccessCounter)
lenList := len(msgs)
if int64(lenList) > db.msg.GetSingleGocMsgNum() {
return 0, false, errors.New("too large")
}
if lenList < 1 {
return 0, false, errors.New("too short as 0")
}
2 years ago
if errs.Unwrap(err) == redis.Nil {
2 years ago
isNew = true
}
lastMaxSeq := currentMaxSeq
for _, m := range msgs {
currentMaxSeq++
m.Seq = currentMaxSeq
}
failedNum, err := db.cache.SetMessageToCache(ctx, conversationID, msgs)
if err != nil {
prome.Add(prome.MsgInsertRedisFailedCounter, failedNum)
log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID)
} else {
prome.Inc(prome.MsgInsertRedisSuccessCounter)
}
err = db.cache.SetMaxSeq(ctx, conversationID, currentMaxSeq)
if err != nil {
prome.Inc(prome.SeqSetFailedCounter)
} else {
prome.Inc(prome.SeqSetSuccessCounter)
}
return lastMaxSeq, isNew, utils.Wrap(err, "")
}
func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) {
var totalUnExistSeqs []int64
2 years ago
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) {
2 years ago
log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs)
2 years ago
msgs, unexistSeqs, err := db.findMsgInfoBySeq(ctx, docID, seqs)
if err != nil {
return nil, err
}
2 years ago
for _, msg := range msgs {
totalMsgs = append(totalMsgs, convert.MsgDB2Pb(msg.Msg))
}
totalUnExistSeqs = append(totalUnExistSeqs, unexistSeqs...)
2 years ago
}
for _, unexistSeq := range totalUnExistSeqs {
totalMsgs = append(totalMsgs, db.msg.GenExceptionMessageBySeqs([]int64{unexistSeq})...)
2 years ago
}
return totalMsgs, nil
2 years ago
}
2 years ago
// func (db *commonMsgDatabase) refetchDelSeqsMsgs(ctx context.Context, conversationID string, delNums, rangeBegin, begin int64) (seqMsgs []*unRelationTb.MsgDataModel, err error) {
// var reFetchSeqs []int64
// if delNums > 0 {
// newBeginSeq := rangeBegin - delNums
// if newBeginSeq >= begin {
// newEndSeq := rangeBegin - 1
// for i := newBeginSeq; i <= newEndSeq; i++ {
// reFetchSeqs = append(reFetchSeqs, i)
// }
// }
// }
// if len(reFetchSeqs) == 0 {
// return
// }
// if len(reFetchSeqs) > 0 {
// m := db.msg.GetDocIDSeqsMap(conversationID, reFetchSeqs)
// for docID, seqs := range m {
// msgs, _, err := db.findMsgInfoBySeq(ctx, docID, seqs)
// if err != nil {
// return nil, err
// }
// for _, msg := range msgs {
// if msg.Status != constant.MsgDeleted {
// seqMsgs = append(seqMsgs, msg)
// }
// }
// }
// }
// if len(seqMsgs) < int(delNums) {
// seqMsgs2, err := db.refetchDelSeqsMsgs(ctx, conversationID, delNums-int64(len(seqMsgs)), rangeBegin-1, begin)
// if err != nil {
// return seqMsgs, err
// }
// seqMsgs = append(seqMsgs, seqMsgs2...)
// }
// return seqMsgs, nil
// }
2 years ago
2 years ago
func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, docID string, seqs []int64) (totalMsgs []*unRelationTb.MsgInfoModel, unExistSeqs []int64, err error) {
2 years ago
msgs, err := db.msgDocDatabase.GetMsgBySeqIndexIn1Doc(ctx, docID, seqs)
2 years ago
if err != nil {
return nil, nil, err
}
2 years ago
log.ZDebug(ctx, "findMsgInfoBySeq", "docID", docID, "seqs", seqs, "len(msgs)", len(msgs))
totalMsgs = append(totalMsgs, msgs...)
2 years ago
if len(msgs) == 0 {
unExistSeqs = seqs
} else {
for _, seq := range seqs {
for i, msg := range msgs {
2 years ago
if seq == msg.Msg.Seq {
2 years ago
break
}
if i == len(msgs)-1 {
unExistSeqs = append(unExistSeqs, seq)
}
2 years ago
}
}
}
2 years ago
return totalMsgs, unExistSeqs, nil
2 years ago
}
2 years ago
func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID string, conversationID string, allSeqs []int64, begin, end int64) (seqMsgs []*sdkws.MsgData, err error) {
2 years ago
log.ZDebug(ctx, "getMsgBySeqsRange", "conversationID", conversationID, "allSeqs", allSeqs, "begin", begin, "end", end)
2 years ago
var totalNotExistSeqs []int64
// mongo index
2 years ago
var delSeqs []int64
2 years ago
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) {
2 years ago
log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs)
2 years ago
msgs, notExistSeqs, err := db.findMsgInfoBySeq(ctx, docID, seqs)
2 years ago
if err != nil {
return nil, err
}
2 years ago
log.ZDebug(ctx, "getMsgBySeqsRange", "unExistSeqs", notExistSeqs, "msgs", len(msgs))
2 years ago
for _, msg := range msgs {
2 years ago
if utils.IsContain(userID, msg.DelList) {
delSeqs = append(delSeqs, msg.Msg.Seq)
}
2 years ago
seqMsgs = append(seqMsgs, convert.MsgDB2Pb(msg.Msg))
}
2 years ago
totalNotExistSeqs = append(totalNotExistSeqs, notExistSeqs...)
}
2 years ago
log.ZDebug(ctx, "getMsgBySeqsRange", "totalNotExistSeqs", totalNotExistSeqs, "del seqs", delSeqs)
// 补未找到的消息
2 years ago
seqMsgs = append(seqMsgs, db.msg.GenExceptionMessageBySeqs(totalNotExistSeqs)...)
2 years ago
if len(totalNotExistSeqs) > 0 || len(delSeqs) > 0 {
sort.Sort(utils.MsgBySeq(seqMsgs))
}
return seqMsgs, nil
}
2 years ago
func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num int64) (seqMsg []*sdkws.MsgData, err error) {
2 years ago
var seqs []int64
for i := end; i > end-num; i-- {
if i >= begin {
2 years ago
seqs = append([]int64{i}, seqs...)
2 years ago
} else {
break
}
}
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, seqs)
if err != nil {
if err != redis.Nil {
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
log.ZError(ctx, "get message from redis exception", err, conversationID, seqs)
}
}
2 years ago
if len(failedSeqs) != 0 {
log.ZDebug(ctx, "get message from redis failed", err, "seqs", seqs)
}
2 years ago
// get from cache or db
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
if len(failedSeqs) > 0 {
2 years ago
mongoMsgs, err := db.getMsgBySeqsRange(ctx, userID, conversationID, failedSeqs, begin, end)
2 years ago
if err != nil {
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
return nil, err
}
prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
successMsgs = append(successMsgs, mongoMsgs...)
}
return successMsgs, nil
}
2 years ago
func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (successMsgs []*sdkws.MsgData, err error) {
userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID)
if err != nil {
return nil, err
}
minSeq, err := db.cache.GetMinSeq(ctx, conversationID)
if err != nil {
return nil, err
}
if userMinSeq < minSeq {
minSeq = userMinSeq
}
var newSeqs []int64
for _, seq := range seqs {
if seq >= minSeq {
newSeqs = append(newSeqs, seq)
}
}
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, newSeqs)
2 years ago
if err != nil {
if err != redis.Nil {
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs))
log.ZError(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID)
}
}
prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs))
if len(failedSeqs) > 0 {
2 years ago
mongoMsgs, err := db.getMsgBySeqs(ctx, conversationID, failedSeqs)
2 years ago
if err != nil {
prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs))
return nil, err
}
prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
successMsgs = append(successMsgs, mongoMsgs...)
}
return successMsgs, nil
}
func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error {
var delStruct delMsgRecursionStruct
2 years ago
var skip int64
minSeq, err := db.deleteMsgRecursion(ctx, conversationID, skip, &delStruct, remainTime)
2 years ago
if err != nil {
return err
}
if minSeq == 0 {
return nil
}
if remainTime == 0 {
err = db.cache.CleanUpOneConversationAllMsg(ctx, conversationID)
if err != nil {
log.ZWarn(ctx, "CleanUpOneUserAllMsg", err, "conversationID", conversationID)
}
}
return db.cache.SetMinSeq(ctx, conversationID, minSeq)
}
// this is struct for recursion
type delMsgRecursionStruct struct {
minSeq int64
delDocIDs []string
}
func (d *delMsgRecursionStruct) getSetMinSeq() int64 {
return d.minSeq
}
// index 0....19(del) 20...69
// seq 70
// set minSeq 21
// recursion 删除list并且返回设置的最小seq
func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversationID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) {
// find from oldest list
2 years ago
msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1)
if err != nil || msgDocModel.DocID == "" {
if err != nil {
if err == unrelation.ErrMsgListNotExist {
log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index)
} else {
log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index)
}
}
// 获取报错或者获取不到了物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归
err = db.msgDocDatabase.DeleteDocs(ctx, delStruct.delDocIDs)
if err != nil {
return 0, err
}
return delStruct.getSetMinSeq() + 1, nil
}
log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgDocModel.DocID, "len", len(msgDocModel.Msg))
if int64(len(msgDocModel.Msg)) > db.msg.GetSingleGocMsgNum() {
log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgDocModel.Msg), "docID:", msgDocModel.DocID)
}
if msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() && msgDocModel.IsFull() {
delStruct.delDocIDs = append(delStruct.delDocIDs, msgDocModel.DocID)
delStruct.minSeq = msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.Seq
} else {
var hasMarkDelFlag bool
var delMsgIndexs []int
for i, MsgInfoModel := range msgDocModel.Msg {
if MsgInfoModel != nil {
if utils.GetCurrentTimestampByMill() > MsgInfoModel.Msg.SendTime+(remainTime*1000) {
delMsgIndexs = append(delMsgIndexs, i)
hasMarkDelFlag = true
} else {
// 到本条消息不需要删除, minSeq置为这条消息的seq
if err := db.msgDocDatabase.DeleteDocs(ctx, delStruct.delDocIDs); err != nil {
return 0, err
}
if hasMarkDelFlag {
// mark del all delMsgIndexs
2 years ago
if err := db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, msgDocModel.DocID, delMsgIndexs); err != nil {
return delStruct.getSetMinSeq(), err
}
2 years ago
}
return MsgInfoModel.Msg.Seq, nil
}
}
}
}
// 继续递归 index+1
seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime)
return seq, err
2 years ago
}
2 years ago
func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error {
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, allSeqs) {
var indexes []int
for _, seq := range seqs {
indexes = append(indexes, int(db.msg.GetMsgIndex(seq)))
}
if err := db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, docID, indexes); err != nil {
return err
}
}
return nil
}
func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error {
for docID, seqs := range db.msg.GetDocIDSeqsMap(conversationID, seqs) {
for _, seq := range seqs {
if _, err := db.msgDocDatabase.PushUnique(ctx, docID, db.msg.GetMsgIndex(seq), "del_list", []string{userID}); err != nil {
return err
}
}
}
return nil
}
2 years ago
func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, user string, conversationIDs []string) {
for _, conversationID := range conversationIDs {
maxSeq, err := db.cache.GetMaxSeq(ctx, conversationID)
if err != nil {
if err == redis.Nil {
log.ZInfo(ctx, "max seq is nil", "conversationID", conversationID)
} else {
log.ZError(ctx, "get max seq failed", err, "conversationID", conversationID)
}
continue
}
if err := db.cache.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil {
log.ZError(ctx, "set min seq failed", err, "conversationID", conversationID, "minSeq", maxSeq+1)
}
}
}
func (db *commonMsgDatabase) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error {
return db.cache.SetMaxSeq(ctx, conversationID, maxSeq)
}
func (db *commonMsgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
return db.cache.GetMaxSeqs(ctx, conversationIDs)
}
func (db *commonMsgDatabase) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
return db.cache.GetMaxSeq(ctx, conversationID)
}
func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
return db.cache.SetMinSeq(ctx, conversationID, minSeq)
}
func (db *commonMsgDatabase) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
return db.cache.GetMinSeqs(ctx, conversationIDs)
}
func (db *commonMsgDatabase) GetMinSeq(ctx context.Context, conversationID string) (int64, error) {
return db.cache.GetMinSeq(ctx, conversationID)
}
func (db *commonMsgDatabase) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) {
return db.cache.GetConversationUserMinSeq(ctx, conversationID, userID)
}
func (db *commonMsgDatabase) GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) {
return db.cache.GetConversationUserMinSeqs(ctx, conversationID, userIDs)
}
func (db *commonMsgDatabase) SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error {
return db.cache.SetConversationUserMinSeq(ctx, conversationID, userID, minSeq)
}
func (db *commonMsgDatabase) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) {
return db.cache.SetConversationUserMinSeqs(ctx, conversationID, seqs)
}
2 years ago
func (db *commonMsgDatabase) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
return db.cache.SetUserConversationsMinSeqs(ctx, userID, seqs)
}
2 years ago
func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return db.cache.SetSendMsgStatus(ctx, id, status)
}
func (db *commonMsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
return db.cache.GetSendMsgStatus(ctx, id)
}
func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, conversationID)
if err != nil {
return
}
minSeqCache, err = db.cache.GetMinSeq(ctx, conversationID)
if err != nil {
return
}
maxSeqCache, err = db.cache.GetMaxSeq(ctx, conversationID)
if err != nil {
return
}
return
}
2 years ago
func (db *commonMsgDatabase) GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (maxSeq, minSeq int64, err error) {
return db.GetMinMaxSeqMongo(ctx, conversationID)
}
2 years ago
func (db *commonMsgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) {
oldestMsgMongo, err := db.msgDocDatabase.GetOldestMsg(ctx, conversationID)
if err != nil {
return
}
2 years ago
minSeqMongo = oldestMsgMongo.Msg.Seq
2 years ago
newestMsgMongo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID)
if err != nil {
return
}
2 years ago
maxSeqMongo = newestMsgMongo.Msg.Seq
2 years ago
return
}
func (db *commonMsgDatabase) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
return db.cache.JudgeMessageReactionExist(ctx, clientMsgID, sessionType)
}
func (db *commonMsgDatabase) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
return db.cache.SetMessageTypeKeyValue(ctx, clientMsgID, sessionType, typeKey, value)
}
func (db *commonMsgDatabase) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
return db.cache.SetMessageReactionExpire(ctx, clientMsgID, sessionType, expiration)
}
func (db *commonMsgDatabase) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
return db.cache.GetMessageTypeKeyValue(ctx, clientMsgID, sessionType, typeKey)
}
func (db *commonMsgDatabase) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
return db.cache.GetOneMessageAllReactionList(ctx, clientMsgID, sessionType)
}
func (db *commonMsgDatabase) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
return db.cache.DeleteOneMessageKey(ctx, clientMsgID, sessionType, subKey)
}
func (db *commonMsgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensions map[string]*sdkws.KeyValue) error {
return db.extendMsgDatabase.InsertOrUpdateReactionExtendMsgSet(ctx, conversationID, sessionType, clientMsgID, msgFirstModifyTime, db.extendMsgSetModel.Pb2Model(reactionExtensions))
}
func (db *commonMsgDatabase) GetExtendMsg(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) {
extendMsgSet, err := db.extendMsgDatabase.GetExtendMsgSet(ctx, conversationID, sessionType, maxMsgUpdateTime)
if err != nil {
return nil, err
}
extendMsg, ok := extendMsgSet.ExtendMsgs[clientMsgID]
if !ok {
return nil, errs.ErrRecordNotFound.Wrap(fmt.Sprintf("cant find client msg id: %s", clientMsgID))
}
reactionExtensionList := make(map[string]*pbMsg.KeyValueResp)
for key, model := range extendMsg.ReactionExtensionList {
reactionExtensionList[key] = &pbMsg.KeyValueResp{
KeyValue: &sdkws.KeyValue{
TypeKey: model.TypeKey,
Value: model.Value,
LatestUpdateTime: model.LatestUpdateTime,
},
}
}
return &pbMsg.ExtendMsg{
ReactionExtensions: reactionExtensionList,
ClientMsgID: extendMsg.ClientMsgID,
MsgFirstModifyTime: extendMsg.MsgFirstModifyTime,
AttachedInfo: extendMsg.AttachedInfo,
Ex: extendMsg.Ex,
}, nil
}
func (db *commonMsgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensions map[string]*sdkws.KeyValue) error {
return db.extendMsgDatabase.DeleteReactionExtendMsgSet(ctx, conversationID, sessionType, clientMsgID, msgFirstModifyTime, db.extendMsgSetModel.Pb2Model(reactionExtensions))
}