@ -1,9 +1,6 @@
package controller
package controller
import (
import (
relation2 "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
"gorm.io/gorm"
"time"
"time"
"github.com/redis/go-redis/v9"
"github.com/redis/go-redis/v9"
@ -21,11 +18,10 @@ import (
"context"
"context"
"errors"
"errors"
"go.mongodb.org/mongo-driver/mongo"
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"go.mongodb.org/mongo-driver/mongo"
)
)
const (
const (
@ -44,36 +40,16 @@ type CommonMsgDatabase interface {
DeleteMessagesFromCache ( ctx context . Context , conversationID string , seqs [ ] int64 ) error
DeleteMessagesFromCache ( ctx context . Context , conversationID string , seqs [ ] int64 ) error
DelUserDeleteMsgsList ( ctx context . Context , conversationID string , seqs [ ] int64 )
DelUserDeleteMsgsList ( ctx context . Context , conversationID string , seqs [ ] int64 )
// incrSeq然后批量插入缓存
// incrSeq然后批量插入缓存
BatchInsertChat2Cache (
BatchInsertChat2Cache ( ctx context . Context , conversationID string , msgs [ ] * sdkws . MsgData ) ( seq int64 , isNewConversation bool , err error )
ctx context . Context ,
conversationID string ,
msgs [ ] * sdkws . MsgData ,
) ( seq int64 , isNewConversation bool , err error )
// 通过seqList获取mongo中写扩散消息
// 通过seqList获取mongo中写扩散消息
GetMsgBySeqsRange (
GetMsgBySeqsRange ( ctx context . Context , userID string , conversationID string , begin , end , num , userMaxSeq int64 ) ( minSeq int64 , maxSeq int64 , seqMsg [ ] * sdkws . MsgData , err error )
ctx context . Context ,
userID string ,
conversationID string ,
begin , end , num , userMaxSeq int64 ,
) ( minSeq int64 , maxSeq int64 , seqMsg [ ] * sdkws . MsgData , err error )
// 通过seqList获取大群在 mongo里面的消息
// 通过seqList获取大群在 mongo里面的消息
GetMsgBySeqs (
GetMsgBySeqs ( ctx context . Context , userID string , conversationID string , seqs [ ] int64 ) ( minSeq int64 , maxSeq int64 , seqMsg [ ] * sdkws . MsgData , err error )
ctx context . Context ,
userID string ,
conversationID string ,
seqs [ ] int64 ,
) ( minSeq int64 , maxSeq int64 , seqMsg [ ] * sdkws . MsgData , err error )
// 删除会话消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
// 删除会话消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
DeleteConversationMsgsAndSetMinSeq ( ctx context . Context , conversationID string , remainTime int64 ) error
DeleteConversationMsgsAndSetMinSeq ( ctx context . Context , conversationID string , remainTime int64 ) error
// 用户标记删除过期消息返回标记删除的seq列表
// 用户标记删除过期消息返回标记删除的seq列表
UserMsgsDestruct (
UserMsgsDestruct ( cte context . Context , userID string , conversationID string , destructTime int64 , lastMsgDestructTime time . Time ) ( seqs [ ] int64 , err error )
cte context . Context ,
userID string ,
conversationID string ,
destructTime int64 ,
lastMsgDestructTime time . Time ,
) ( seqs [ ] int64 , err error )
// 用户根据seq删除消息
// 用户根据seq删除消息
DeleteUserMsgsBySeqs ( ctx context . Context , userID string , conversationID string , seqs [ ] int64 ) error
DeleteUserMsgsBySeqs ( ctx context . Context , userID string , conversationID string , seqs [ ] int64 ) error
@ -99,13 +75,9 @@ type CommonMsgDatabase interface {
UserSetHasReadSeqs ( ctx context . Context , userID string , hasReadSeqs map [ string ] int64 ) error
UserSetHasReadSeqs ( ctx context . Context , userID string , hasReadSeqs map [ string ] int64 ) error
GetMongoMaxAndMinSeq ( ctx context . Context , conversationID string ) ( minSeqMongo , maxSeqMongo int64 , err error )
GetMongoMaxAndMinSeq ( ctx context . Context , conversationID string ) ( minSeqMongo , maxSeqMongo int64 , err error )
GetConversationMinMaxSeqInMongoAndCache (
GetConversationMinMaxSeqInMongoAndCache ( ctx context . Context , conversationID string ) ( minSeqMongo , maxSeqMongo , minSeqCache , maxSeqCache int64 , err error )
ctx context . Context ,
conversationID string ,
) ( minSeqMongo , maxSeqMongo , minSeqCache , maxSeqCache int64 , err error )
SetSendMsgStatus ( ctx context . Context , id string , status int32 ) error
SetSendMsgStatus ( ctx context . Context , id string , status int32 ) error
GetSendMsgStatus ( ctx context . Context , id string ) ( int32 , error )
GetSendMsgStatus ( ctx context . Context , id string ) ( int32 , error )
SearchMessage ( ctx context . Context , req * pbMsg . SearchMessageReq ) ( msgData [ ] * sdkws . MsgData , err error )
// to mq
// to mq
MsgToMQ ( ctx context . Context , key string , msg2mq * sdkws . MsgData ) error
MsgToMQ ( ctx context . Context , key string , msg2mq * sdkws . MsgData ) error
@ -113,28 +85,12 @@ type CommonMsgDatabase interface {
MsgToPushMQ ( ctx context . Context , key , conversarionID string , msg2mq * sdkws . MsgData ) ( int32 , int64 , error )
MsgToPushMQ ( ctx context . Context , key , conversarionID string , msg2mq * sdkws . MsgData ) ( int32 , int64 , error )
MsgToMongoMQ ( ctx context . Context , key , conversarionID string , msgs [ ] * sdkws . MsgData , lastSeq int64 ) error
MsgToMongoMQ ( ctx context . Context , key , conversarionID string , msgs [ ] * sdkws . MsgData , lastSeq int64 ) error
RangeUserSendCount (
RangeUserSendCount ( ctx context . Context , start time . Time , end time . Time , group bool , ase bool , pageNumber int32 , showNumber int32 ) ( msgCount int64 , userCount int64 , users [ ] * unRelationTb . UserCount , dateCount map [ string ] int64 , err error )
ctx context . Context ,
RangeGroupSendCount ( ctx context . Context , start time . Time , end time . Time , ase bool , pageNumber int32 , showNumber int32 ) ( msgCount int64 , userCount int64 , groups [ ] * unRelationTb . GroupCount , dateCount map [ string ] int64 , err error )
start time . Time ,
end time . Time ,
group bool ,
ase bool ,
pageNumber int32 ,
showNumber int32 ,
) ( msgCount int64 , userCount int64 , users [ ] * unRelationTb . UserCount , dateCount map [ string ] int64 , err error )
RangeGroupSendCount (
ctx context . Context ,
start time . Time ,
end time . Time ,
ase bool ,
pageNumber int32 ,
showNumber int32 ,
) ( msgCount int64 , userCount int64 , groups [ ] * unRelationTb . GroupCount , dateCount map [ string ] int64 , err error )
}
}
func NewCommonMsgDatabase ( msgDocModel unRelationTb . MsgDocModelInterface , cacheModel cache . MsgModel , msgMyqModel relation . ChatLogModelInterface ) CommonMsgDatabase {
func NewCommonMsgDatabase ( msgDocModel unRelationTb . MsgDocModelInterface , cacheModel cache . MsgModel ) CommonMsgDatabase {
return & commonMsgDatabase {
return & commonMsgDatabase {
msgMyq : msgMyqModel ,
msgDocDatabase : msgDocModel ,
msgDocDatabase : msgDocModel ,
cache : cacheModel ,
cache : cacheModel ,
producer : kafka . NewKafkaProducer ( config . Config . Kafka . Addr , config . Config . Kafka . LatestMsgToRedis . Topic ) ,
producer : kafka . NewKafkaProducer ( config . Config . Kafka . Addr , config . Config . Kafka . LatestMsgToRedis . Topic ) ,
@ -143,18 +99,16 @@ func NewCommonMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheMo
}
}
}
}
func InitCommonMsgDatabase ( rdb redis . UniversalClient , database * mongo . Database , dbGrom * gorm . DB ) CommonMsgDatabase {
func InitCommonMsgDatabase ( rdb redis . UniversalClient , database * mongo . Database ) CommonMsgDatabase {
cacheModel := cache . NewMsgCacheModel ( rdb )
cacheModel := cache . NewMsgCacheModel ( rdb )
msgDocModel := unrelation . NewMsgMongoDriver ( database )
msgDocModel := unrelation . NewMsgMongoDriver ( database )
msgMyqModel := relation2 . NewChatLogGorm ( dbGrom )
CommonMsgDatabase := NewCommonMsgDatabase ( msgDocModel , cacheModel )
CommonMsgDatabase := NewCommonMsgDatabase ( msgDocModel , cacheModel , msgMyqModel )
return CommonMsgDatabase
return CommonMsgDatabase
}
}
type commonMsgDatabase struct {
type commonMsgDatabase struct {
msgDocDatabase unRelationTb . MsgDocModelInterface
msgDocDatabase unRelationTb . MsgDocModelInterface
msg unRelationTb . MsgDocModel
msg unRelationTb . MsgDocModel
msgMyq relation . ChatLogModelInterface
cache cache . MsgModel
cache cache . MsgModel
producer * kafka . Producer
producer * kafka . Producer
producerToMongo * kafka . Producer
producerToMongo * kafka . Producer
@ -167,32 +121,16 @@ func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sd
return err
return err
}
}
func ( db * commonMsgDatabase ) MsgToModifyMQ (
func ( db * commonMsgDatabase ) MsgToModifyMQ ( ctx context . Context , key , conversationID string , messages [ ] * sdkws . MsgData ) error {
ctx context . Context ,
key , conversationID string ,
messages [ ] * sdkws . MsgData ,
) error {
if len ( messages ) > 0 {
if len ( messages ) > 0 {
_ , _ , err := db . producerToModify . SendMessage (
_ , _ , err := db . producerToModify . SendMessage ( ctx , key , & pbMsg . MsgDataToModifyByMQ { ConversationID : conversationID , Messages : messages } )
ctx ,
key ,
& pbMsg . MsgDataToModifyByMQ { ConversationID : conversationID , Messages : messages } ,
)
return err
return err
}
}
return nil
return nil
}
}
func ( db * commonMsgDatabase ) MsgToPushMQ (
func ( db * commonMsgDatabase ) MsgToPushMQ ( ctx context . Context , key , conversationID string , msg2mq * sdkws . MsgData ) ( int32 , int64 , error ) {
ctx context . Context ,
partition , offset , err := db . producerToPush . SendMessage ( ctx , key , & pbMsg . PushMsgDataToMQ { MsgData : msg2mq , ConversationID : conversationID } )
key , conversationID string ,
msg2mq * sdkws . MsgData ,
) ( int32 , int64 , error ) {
partition , offset , err := db . producerToPush . SendMessage (
ctx ,
key ,
& pbMsg . PushMsgDataToMQ { MsgData : msg2mq , ConversationID : conversationID } ,
)
if err != nil {
if err != nil {
log . ZError ( ctx , "MsgToPushMQ" , err , "key" , key , "msg2mq" , msg2mq )
log . ZError ( ctx , "MsgToPushMQ" , err , "key" , key , "msg2mq" , msg2mq )
return 0 , 0 , err
return 0 , 0 , err
@ -200,30 +138,15 @@ func (db *commonMsgDatabase) MsgToPushMQ(
return partition , offset , nil
return partition , offset , nil
}
}
func ( db * commonMsgDatabase ) MsgToMongoMQ (
func ( db * commonMsgDatabase ) MsgToMongoMQ ( ctx context . Context , key , conversationID string , messages [ ] * sdkws . MsgData , lastSeq int64 ) error {
ctx context . Context ,
key , conversationID string ,
messages [ ] * sdkws . MsgData ,
lastSeq int64 ,
) error {
if len ( messages ) > 0 {
if len ( messages ) > 0 {
_ , _ , err := db . producerToMongo . SendMessage (
_ , _ , err := db . producerToMongo . SendMessage ( ctx , key , & pbMsg . MsgDataToMongoByMQ { LastSeq : lastSeq , ConversationID : conversationID , MsgData : messages } )
ctx ,
key ,
& pbMsg . MsgDataToMongoByMQ { LastSeq : lastSeq , ConversationID : conversationID , MsgData : messages } ,
)
return err
return err
}
}
return nil
return nil
}
}
func ( db * commonMsgDatabase ) BatchInsertBlock (
func ( db * commonMsgDatabase ) BatchInsertBlock ( ctx context . Context , conversationID string , fields [ ] any , key int8 , firstSeq int64 ) error {
ctx context . Context ,
conversationID string ,
fields [ ] any ,
key int8 ,
firstSeq int64 ,
) error {
if len ( fields ) == 0 {
if len ( fields ) == 0 {
return nil
return nil
}
}
@ -324,12 +247,7 @@ func (db *commonMsgDatabase) BatchInsertBlock(
return nil
return nil
}
}
func ( db * commonMsgDatabase ) BatchInsertChat2DB (
func ( db * commonMsgDatabase ) BatchInsertChat2DB ( ctx context . Context , conversationID string , msgList [ ] * sdkws . MsgData , currentMaxSeq int64 ) error {
ctx context . Context ,
conversationID string ,
msgList [ ] * sdkws . MsgData ,
currentMaxSeq int64 ,
) error {
if len ( msgList ) == 0 {
if len ( msgList ) == 0 {
return errs . ErrArgs . Wrap ( "msgList is empty" )
return errs . ErrArgs . Wrap ( "msgList is empty" )
}
}
@ -375,21 +293,11 @@ func (db *commonMsgDatabase) BatchInsertChat2DB(
return db . BatchInsertBlock ( ctx , conversationID , msgs , updateKeyMsg , msgList [ 0 ] . Seq )
return db . BatchInsertBlock ( ctx , conversationID , msgs , updateKeyMsg , msgList [ 0 ] . Seq )
}
}
func ( db * commonMsgDatabase ) RevokeMsg (
func ( db * commonMsgDatabase ) RevokeMsg ( ctx context . Context , conversationID string , seq int64 , revoke * unRelationTb . RevokeModel ) error {
ctx context . Context ,
conversationID string ,
seq int64 ,
revoke * unRelationTb . RevokeModel ,
) error {
return db . BatchInsertBlock ( ctx , conversationID , [ ] any { revoke } , updateKeyRevoke , seq )
return db . BatchInsertBlock ( ctx , conversationID , [ ] any { revoke } , updateKeyRevoke , seq )
}
}
func ( db * commonMsgDatabase ) MarkSingleChatMsgsAsRead (
func ( db * commonMsgDatabase ) MarkSingleChatMsgsAsRead ( ctx context . Context , userID string , conversationID string , totalSeqs [ ] int64 ) error {
ctx context . Context ,
userID string ,
conversationID string ,
totalSeqs [ ] int64 ,
) error {
for docID , seqs := range db . msg . GetDocIDSeqsMap ( conversationID , totalSeqs ) {
for docID , seqs := range db . msg . GetDocIDSeqsMap ( conversationID , totalSeqs ) {
var indexes [ ] int64
var indexes [ ] int64
for _ , seq := range seqs {
for _ , seq := range seqs {
@ -412,11 +320,7 @@ func (db *commonMsgDatabase) DelUserDeleteMsgsList(ctx context.Context, conversa
db . cache . DelUserDeleteMsgsList ( ctx , conversationID , seqs )
db . cache . DelUserDeleteMsgsList ( ctx , conversationID , seqs )
}
}
func ( db * commonMsgDatabase ) BatchInsertChat2Cache (
func ( db * commonMsgDatabase ) BatchInsertChat2Cache ( ctx context . Context , conversationID string , msgs [ ] * sdkws . MsgData ) ( seq int64 , isNew bool , err error ) {
ctx context . Context ,
conversationID string ,
msgs [ ] * sdkws . MsgData ,
) ( seq int64 , isNew bool , err error ) {
currentMaxSeq , err := db . cache . GetMaxSeq ( ctx , conversationID )
currentMaxSeq , err := db . cache . GetMaxSeq ( ctx , conversationID )
if err != nil && errs . Unwrap ( err ) != redis . Nil {
if err != nil && errs . Unwrap ( err ) != redis . Nil {
prome . Inc ( prome . SeqGetFailedCounter )
prome . Inc ( prome . SeqGetFailedCounter )
@ -463,11 +367,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(
return lastMaxSeq , isNew , utils . Wrap ( err , "" )
return lastMaxSeq , isNew , utils . Wrap ( err , "" )
}
}
func ( db * commonMsgDatabase ) getMsgBySeqs (
func ( db * commonMsgDatabase ) getMsgBySeqs ( ctx context . Context , userID , conversationID string , seqs [ ] int64 ) ( totalMsgs [ ] * sdkws . MsgData , err error ) {
ctx context . Context ,
userID , conversationID string ,
seqs [ ] int64 ,
) ( totalMsgs [ ] * sdkws . MsgData , err error ) {
for docID , seqs := range db . msg . GetDocIDSeqsMap ( conversationID , seqs ) {
for docID , seqs := range db . msg . GetDocIDSeqsMap ( conversationID , seqs ) {
//log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs)
//log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs)
msgs , err := db . findMsgInfoBySeq ( ctx , userID , docID , seqs )
msgs , err := db . findMsgInfoBySeq ( ctx , userID , docID , seqs )
@ -481,11 +381,7 @@ func (db *commonMsgDatabase) getMsgBySeqs(
return totalMsgs , nil
return totalMsgs , nil
}
}
func ( db * commonMsgDatabase ) findMsgInfoBySeq (
func ( db * commonMsgDatabase ) findMsgInfoBySeq ( ctx context . Context , userID , docID string , seqs [ ] int64 ) ( totalMsgs [ ] * unRelationTb . MsgInfoModel , err error ) {
ctx context . Context ,
userID , docID string ,
seqs [ ] int64 ,
) ( totalMsgs [ ] * unRelationTb . MsgInfoModel , err error ) {
msgs , err := db . msgDocDatabase . GetMsgBySeqIndexIn1Doc ( ctx , userID , docID , seqs )
msgs , err := db . msgDocDatabase . GetMsgBySeqIndexIn1Doc ( ctx , userID , docID , seqs )
for _ , msg := range msgs {
for _ , msg := range msgs {
if msg . IsRead {
if msg . IsRead {
@ -495,25 +391,8 @@ func (db *commonMsgDatabase) findMsgInfoBySeq(
return msgs , err
return msgs , err
}
}
func ( db * commonMsgDatabase ) getMsgBySeqsRange (
func ( db * commonMsgDatabase ) getMsgBySeqsRange ( ctx context . Context , userID string , conversationID string , allSeqs [ ] int64 , begin , end int64 ) ( seqMsgs [ ] * sdkws . MsgData , err error ) {
ctx context . Context ,
log . ZDebug ( ctx , "getMsgBySeqsRange" , "conversationID" , conversationID , "allSeqs" , allSeqs , "begin" , begin , "end" , end )
userID string ,
conversationID string ,
allSeqs [ ] int64 ,
begin , end int64 ,
) ( seqMsgs [ ] * sdkws . MsgData , err error ) {
log . ZDebug (
ctx ,
"getMsgBySeqsRange" ,
"conversationID" ,
conversationID ,
"allSeqs" ,
allSeqs ,
"begin" ,
begin ,
"end" ,
end ,
)
for docID , seqs := range db . msg . GetDocIDSeqsMap ( conversationID , allSeqs ) {
for docID , seqs := range db . msg . GetDocIDSeqsMap ( conversationID , allSeqs ) {
log . ZDebug ( ctx , "getMsgBySeqsRange" , "docID" , docID , "seqs" , seqs )
log . ZDebug ( ctx , "getMsgBySeqsRange" , "docID" , docID , "seqs" , seqs )
msgs , err := db . findMsgInfoBySeq ( ctx , userID , docID , seqs )
msgs , err := db . findMsgInfoBySeq ( ctx , userID , docID , seqs )
@ -530,12 +409,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(
return seqMsgs , nil
return seqMsgs , nil
}
}
func ( db * commonMsgDatabase ) GetMsgBySeqsRange (
func ( db * commonMsgDatabase ) GetMsgBySeqsRange ( ctx context . Context , userID string , conversationID string , begin , end , num , userMaxSeq int64 ) ( int64 , int64 , [ ] * sdkws . MsgData , error ) {
ctx context . Context ,
userID string ,
conversationID string ,
begin , end , num , userMaxSeq int64 ,
) ( int64 , int64 , [ ] * sdkws . MsgData , error ) {
userMinSeq , err := db . cache . GetConversationUserMinSeq ( ctx , conversationID , userID )
userMinSeq , err := db . cache . GetConversationUserMinSeq ( ctx , conversationID , userID )
if err != nil && errs . Unwrap ( err ) != redis . Nil {
if err != nil && errs . Unwrap ( err ) != redis . Nil {
return 0 , 0 , nil , err
return 0 , 0 , nil , err
@ -555,18 +429,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(
if err != nil && errs . Unwrap ( err ) != redis . Nil {
if err != nil && errs . Unwrap ( err ) != redis . Nil {
return 0 , 0 , nil , err
return 0 , 0 , nil , err
}
}
log . ZDebug (
log . ZDebug ( ctx , "GetMsgBySeqsRange" , "userMinSeq" , userMinSeq , "conMinSeq" , minSeq , "conMaxSeq" , maxSeq , "userMaxSeq" , userMaxSeq )
ctx ,
"GetMsgBySeqsRange" ,
"userMinSeq" ,
userMinSeq ,
"conMinSeq" ,
minSeq ,
"conMaxSeq" ,
maxSeq ,
"userMaxSeq" ,
userMaxSeq ,
)
if userMaxSeq != 0 {
if userMaxSeq != 0 {
if userMaxSeq < maxSeq {
if userMaxSeq < maxSeq {
maxSeq = userMaxSeq
maxSeq = userMaxSeq
@ -616,18 +479,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(
cacheDelNum += 1
cacheDelNum += 1
}
}
}
}
log . ZDebug (
log . ZDebug ( ctx , "get delSeqs from redis" , "delSeqs" , delSeqs , "userID" , userID , "conversationID" , conversationID , "cacheDelNum" , cacheDelNum )
ctx ,
"get delSeqs from redis" ,
"delSeqs" ,
delSeqs ,
"userID" ,
userID ,
"conversationID" ,
conversationID ,
"cacheDelNum" ,
cacheDelNum ,
)
var reGetSeqsCache [ ] int64
var reGetSeqsCache [ ] int64
for i := 1 ; i <= cacheDelNum ; {
for i := 1 ; i <= cacheDelNum ; {
newSeq := newBegin - int64 ( i )
newSeq := newBegin - int64 ( i )
@ -647,15 +499,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(
if err != nil {
if err != nil {
if err != redis . Nil {
if err != redis . Nil {
prome . Add ( prome . MsgPullFromRedisFailedCounter , len ( failedSeqs2 ) )
prome . Add ( prome . MsgPullFromRedisFailedCounter , len ( failedSeqs2 ) )
log . ZError (
log . ZError ( ctx , "get message from redis exception" , err , "conversationID" , conversationID , "seqs" , reGetSeqsCache )
ctx ,
"get message from redis exception" ,
err ,
"conversationID" ,
conversationID ,
"seqs" ,
reGetSeqsCache ,
)
}
}
}
}
failedSeqs = append ( failedSeqs , failedSeqs2 ... )
failedSeqs = append ( failedSeqs , failedSeqs2 ... )
@ -681,12 +525,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(
return minSeq , maxSeq , successMsgs , nil
return minSeq , maxSeq , successMsgs , nil
}
}
func ( db * commonMsgDatabase ) GetMsgBySeqs (
func ( db * commonMsgDatabase ) GetMsgBySeqs ( ctx context . Context , userID string , conversationID string , seqs [ ] int64 ) ( int64 , int64 , [ ] * sdkws . MsgData , error ) {
ctx context . Context ,
userID string ,
conversationID string ,
seqs [ ] int64 ,
) ( int64 , int64 , [ ] * sdkws . MsgData , error ) {
userMinSeq , err := db . cache . GetConversationUserMinSeq ( ctx , conversationID , userID )
userMinSeq , err := db . cache . GetConversationUserMinSeq ( ctx , conversationID , userID )
if err != nil && errs . Unwrap ( err ) != redis . Nil {
if err != nil && errs . Unwrap ( err ) != redis . Nil {
return 0 , 0 , nil , err
return 0 , 0 , nil , err
@ -712,33 +551,10 @@ func (db *commonMsgDatabase) GetMsgBySeqs(
if err != nil {
if err != nil {
if err != redis . Nil {
if err != redis . Nil {
prome . Add ( prome . MsgPullFromRedisFailedCounter , len ( failedSeqs ) )
prome . Add ( prome . MsgPullFromRedisFailedCounter , len ( failedSeqs ) )
log . ZError (
log . ZError ( ctx , "get message from redis exception" , err , "failedSeqs" , failedSeqs , "conversationID" , conversationID )
ctx ,
"get message from redis exception" ,
err ,
"failedSeqs" ,
failedSeqs ,
"conversationID" ,
conversationID ,
)
}
}
}
}
log . ZInfo (
log . ZInfo ( ctx , "db.cache.GetMessagesBySeq" , "userID" , userID , "conversationID" , conversationID , "seqs" , seqs , "successMsgs" , len ( successMsgs ) , "failedSeqs" , failedSeqs , "conversationID" , conversationID )
ctx ,
"db.cache.GetMessagesBySeq" ,
"userID" ,
userID ,
"conversationID" ,
conversationID ,
"seqs" ,
seqs ,
"successMsgs" ,
len ( successMsgs ) ,
"failedSeqs" ,
failedSeqs ,
"conversationID" ,
conversationID ,
)
prome . Add ( prome . MsgPullFromRedisSuccessCounter , len ( successMsgs ) )
prome . Add ( prome . MsgPullFromRedisSuccessCounter , len ( successMsgs ) )
if len ( failedSeqs ) > 0 {
if len ( failedSeqs ) > 0 {
mongoMsgs , err := db . getMsgBySeqs ( ctx , userID , conversationID , failedSeqs )
mongoMsgs , err := db . getMsgBySeqs ( ctx , userID , conversationID , failedSeqs )
@ -752,11 +568,7 @@ func (db *commonMsgDatabase) GetMsgBySeqs(
return minSeq , maxSeq , successMsgs , nil
return minSeq , maxSeq , successMsgs , nil
}
}
func ( db * commonMsgDatabase ) DeleteConversationMsgsAndSetMinSeq (
func ( db * commonMsgDatabase ) DeleteConversationMsgsAndSetMinSeq ( ctx context . Context , conversationID string , remainTime int64 ) error {
ctx context . Context ,
conversationID string ,
remainTime int64 ,
) error {
var delStruct delMsgRecursionStruct
var delStruct delMsgRecursionStruct
var skip int64
var skip int64
minSeq , err := db . deleteMsgRecursion ( ctx , conversationID , skip , & delStruct , remainTime )
minSeq , err := db . deleteMsgRecursion ( ctx , conversationID , skip , & delStruct , remainTime )
@ -776,13 +588,7 @@ func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(
return db . cache . SetMinSeq ( ctx , conversationID , minSeq )
return db . cache . SetMinSeq ( ctx , conversationID , minSeq )
}
}
func ( db * commonMsgDatabase ) UserMsgsDestruct (
func ( db * commonMsgDatabase ) UserMsgsDestruct ( ctx context . Context , userID string , conversationID string , destructTime int64 , lastMsgDestructTime time . Time ) ( seqs [ ] int64 , err error ) {
ctx context . Context ,
userID string ,
conversationID string ,
destructTime int64 ,
lastMsgDestructTime time . Time ,
) ( seqs [ ] int64 , err error ) {
var index int64
var index int64
for {
for {
// from oldest 2 newest
// from oldest 2 newest
@ -790,16 +596,7 @@ func (db *commonMsgDatabase) UserMsgsDestruct(
if err != nil || msgDocModel . DocID == "" {
if err != nil || msgDocModel . DocID == "" {
if err != nil {
if err != nil {
if err == unrelation . ErrMsgListNotExist {
if err == unrelation . ErrMsgListNotExist {
log . ZDebug (
log . ZDebug ( ctx , "deleteMsgRecursion finished" , "conversationID" , conversationID , "userID" , userID , "index" , index )
ctx ,
"deleteMsgRecursion finished" ,
"conversationID" ,
conversationID ,
"userID" ,
userID ,
"index" ,
index ,
)
} else {
} else {
log . ZError ( ctx , "deleteMsgRecursion GetUserMsgListByIndex failed" , err , "conversationID" , conversationID , "index" , index )
log . ZError ( ctx , "deleteMsgRecursion GetUserMsgListByIndex failed" , err , "conversationID" , conversationID , "index" , index )
}
}
@ -848,26 +645,13 @@ func (d *delMsgRecursionStruct) getSetMinSeq() int64 {
// seq 70
// seq 70
// set minSeq 21
// set minSeq 21
// recursion 删除list并且返回设置的最小seq
// recursion 删除list并且返回设置的最小seq
func ( db * commonMsgDatabase ) deleteMsgRecursion (
func ( db * commonMsgDatabase ) deleteMsgRecursion ( ctx context . Context , conversationID string , index int64 , delStruct * delMsgRecursionStruct , remainTime int64 ) ( int64 , error ) {
ctx context . Context ,
conversationID string ,
index int64 ,
delStruct * delMsgRecursionStruct ,
remainTime int64 ,
) ( int64 , error ) {
// find from oldest list
// find from oldest list
msgDocModel , err := db . msgDocDatabase . GetMsgDocModelByIndex ( ctx , conversationID , index , 1 )
msgDocModel , err := db . msgDocDatabase . GetMsgDocModelByIndex ( ctx , conversationID , index , 1 )
if err != nil || msgDocModel . DocID == "" {
if err != nil || msgDocModel . DocID == "" {
if err != nil {
if err != nil {
if err == unrelation . ErrMsgListNotExist {
if err == unrelation . ErrMsgListNotExist {
log . ZDebug (
log . ZDebug ( ctx , "deleteMsgRecursion ErrMsgListNotExist" , "conversationID" , conversationID , "index:" , index )
ctx ,
"deleteMsgRecursion ErrMsgListNotExist" ,
"conversationID" ,
conversationID ,
"index:" ,
index ,
)
} else {
} else {
log . ZError ( ctx , "deleteMsgRecursion GetUserMsgListByIndex failed" , err , "conversationID" , conversationID , "index" , index )
log . ZError ( ctx , "deleteMsgRecursion GetUserMsgListByIndex failed" , err , "conversationID" , conversationID , "index" , index )
}
}
@ -879,23 +663,11 @@ func (db *commonMsgDatabase) deleteMsgRecursion(
}
}
return delStruct . getSetMinSeq ( ) + 1 , nil
return delStruct . getSetMinSeq ( ) + 1 , nil
}
}
log . ZDebug (
log . ZDebug ( ctx , "doc info" , "conversationID" , conversationID , "index" , index , "docID" , msgDocModel . DocID , "len" , len ( msgDocModel . Msg ) )
ctx ,
"doc info" ,
"conversationID" ,
conversationID ,
"index" ,
index ,
"docID" ,
msgDocModel . DocID ,
"len" ,
len ( msgDocModel . Msg ) ,
)
if int64 ( len ( msgDocModel . Msg ) ) > db . msg . GetSingleGocMsgNum ( ) {
if int64 ( len ( msgDocModel . Msg ) ) > db . msg . GetSingleGocMsgNum ( ) {
log . ZWarn ( ctx , "msgs too large" , nil , "lenth" , len ( msgDocModel . Msg ) , "docID:" , msgDocModel . DocID )
log . ZWarn ( ctx , "msgs too large" , nil , "lenth" , len ( msgDocModel . Msg ) , "docID:" , msgDocModel . DocID )
}
}
if msgDocModel . IsFull ( ) &&
if msgDocModel . IsFull ( ) && msgDocModel . Msg [ len ( msgDocModel . Msg ) - 1 ] . Msg . SendTime + ( remainTime * 1000 ) < utils . GetCurrentTimestampByMill ( ) {
msgDocModel . Msg [ len ( msgDocModel . Msg ) - 1 ] . Msg . SendTime + ( remainTime * 1000 ) < utils . GetCurrentTimestampByMill ( ) {
log . ZDebug ( ctx , "doc is full and all msg is expired" , "docID" , msgDocModel . DocID )
log . ZDebug ( ctx , "doc is full and all msg is expired" , "docID" , msgDocModel . DocID )
delStruct . delDocIDs = append ( delStruct . delDocIDs , msgDocModel . DocID )
delStruct . delDocIDs = append ( delStruct . delDocIDs , msgDocModel . DocID )
delStruct . minSeq = msgDocModel . Msg [ len ( msgDocModel . Msg ) - 1 ] . Msg . Seq
delStruct . minSeq = msgDocModel . Msg [ len ( msgDocModel . Msg ) - 1 ] . Msg . Seq
@ -932,11 +704,7 @@ func (db *commonMsgDatabase) deleteMsgRecursion(
return seq , err
return seq , err
}
}
func ( db * commonMsgDatabase ) DeleteMsgsPhysicalBySeqs (
func ( db * commonMsgDatabase ) DeleteMsgsPhysicalBySeqs ( ctx context . Context , conversationID string , allSeqs [ ] int64 ) error {
ctx context . Context ,
conversationID string ,
allSeqs [ ] int64 ,
) error {
if err := db . cache . DeleteMessages ( ctx , conversationID , allSeqs ) ; err != nil {
if err := db . cache . DeleteMessages ( ctx , conversationID , allSeqs ) ; err != nil {
return err
return err
}
}
@ -952,12 +720,7 @@ func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(
return nil
return nil
}
}
func ( db * commonMsgDatabase ) DeleteUserMsgsBySeqs (
func ( db * commonMsgDatabase ) DeleteUserMsgsBySeqs ( ctx context . Context , userID string , conversationID string , seqs [ ] int64 ) error {
ctx context . Context ,
userID string ,
conversationID string ,
seqs [ ] int64 ,
) error {
cachedMsgs , _ , err := db . cache . GetMessagesBySeq ( ctx , conversationID , seqs )
cachedMsgs , _ , err := db . cache . GetMessagesBySeq ( ctx , conversationID , seqs )
if err != nil && errs . Unwrap ( err ) != redis . Nil {
if err != nil && errs . Unwrap ( err ) != redis . Nil {
log . ZWarn ( ctx , "DeleteUserMsgsBySeqs" , err , "conversationID" , conversationID , "seqs" , seqs )
log . ZWarn ( ctx , "DeleteUserMsgsBySeqs" , err , "conversationID" , conversationID , "seqs" , seqs )
@ -1026,70 +789,31 @@ func (db *commonMsgDatabase) GetMinSeqs(ctx context.Context, conversationIDs []s
func ( db * commonMsgDatabase ) GetMinSeq ( ctx context . Context , conversationID string ) ( int64 , error ) {
func ( db * commonMsgDatabase ) GetMinSeq ( ctx context . Context , conversationID string ) ( int64 , error ) {
return db . cache . GetMinSeq ( ctx , conversationID )
return db . cache . GetMinSeq ( ctx , conversationID )
}
}
func ( db * commonMsgDatabase ) GetConversationUserMinSeq ( ctx context . Context , conversationID string , userID string ) ( int64 , error ) {
func ( db * commonMsgDatabase ) GetConversationUserMinSeq (
ctx context . Context ,
conversationID string ,
userID string ,
) ( int64 , error ) {
return db . cache . GetConversationUserMinSeq ( ctx , conversationID , userID )
return db . cache . GetConversationUserMinSeq ( ctx , conversationID , userID )
}
}
func ( db * commonMsgDatabase ) GetConversationUserMinSeqs ( ctx context . Context , conversationID string , userIDs [ ] string ) ( map [ string ] int64 , error ) {
func ( db * commonMsgDatabase ) GetConversationUserMinSeqs (
ctx context . Context ,
conversationID string ,
userIDs [ ] string ,
) ( map [ string ] int64 , error ) {
return db . cache . GetConversationUserMinSeqs ( ctx , conversationID , userIDs )
return db . cache . GetConversationUserMinSeqs ( ctx , conversationID , userIDs )
}
}
func ( db * commonMsgDatabase ) SetConversationUserMinSeq ( ctx context . Context , conversationID string , userID string , minSeq int64 ) error {
func ( db * commonMsgDatabase ) SetConversationUserMinSeq (
ctx context . Context ,
conversationID string ,
userID string ,
minSeq int64 ,
) error {
return db . cache . SetConversationUserMinSeq ( ctx , conversationID , userID , minSeq )
return db . cache . SetConversationUserMinSeq ( ctx , conversationID , userID , minSeq )
}
}
func ( db * commonMsgDatabase ) SetConversationUserMinSeqs ( ctx context . Context , conversationID string , seqs map [ string ] int64 ) ( err error ) {
func ( db * commonMsgDatabase ) SetConversationUserMinSeqs (
ctx context . Context ,
conversationID string ,
seqs map [ string ] int64 ,
) ( err error ) {
return db . cache . SetConversationUserMinSeqs ( ctx , conversationID , seqs )
return db . cache . SetConversationUserMinSeqs ( ctx , conversationID , seqs )
}
}
func ( db * commonMsgDatabase ) SetUserConversationsMinSeqs (
func ( db * commonMsgDatabase ) SetUserConversationsMinSeqs ( ctx context . Context , userID string , seqs map [ string ] int64 ) error {
ctx context . Context ,
userID string ,
seqs map [ string ] int64 ,
) error {
return db . cache . SetUserConversationsMinSeqs ( ctx , userID , seqs )
return db . cache . SetUserConversationsMinSeqs ( ctx , userID , seqs )
}
}
func ( db * commonMsgDatabase ) UserSetHasReadSeqs (
func ( db * commonMsgDatabase ) UserSetHasReadSeqs ( ctx context . Context , userID string , hasReadSeqs map [ string ] int64 ) error {
ctx context . Context ,
userID string ,
hasReadSeqs map [ string ] int64 ,
) error {
return db . cache . UserSetHasReadSeqs ( ctx , userID , hasReadSeqs )
return db . cache . UserSetHasReadSeqs ( ctx , userID , hasReadSeqs )
}
}
func ( db * commonMsgDatabase ) SetHasReadSeq (
func ( db * commonMsgDatabase ) SetHasReadSeq ( ctx context . Context , userID string , conversationID string , hasReadSeq int64 ) error {
ctx context . Context ,
userID string ,
conversationID string ,
hasReadSeq int64 ,
) error {
return db . cache . SetHasReadSeq ( ctx , userID , conversationID , hasReadSeq )
return db . cache . SetHasReadSeq ( ctx , userID , conversationID , hasReadSeq )
}
}
func ( db * commonMsgDatabase ) GetHasReadSeqs ( ctx context . Context , userID string , conversationIDs [ ] string ) ( map [ string ] int64 , error ) {
func ( db * commonMsgDatabase ) GetHasReadSeqs (
ctx context . Context ,
userID string ,
conversationIDs [ ] string ,
) ( map [ string ] int64 , error ) {
return db . cache . GetHasReadSeqs ( ctx , userID , conversationIDs )
return db . cache . GetHasReadSeqs ( ctx , userID , conversationIDs )
}
}
@ -1105,10 +829,7 @@ func (db *commonMsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (i
return db . cache . GetSendMsgStatus ( ctx , id )
return db . cache . GetSendMsgStatus ( ctx , id )
}
}
func ( db * commonMsgDatabase ) GetConversationMinMaxSeqInMongoAndCache (
func ( db * commonMsgDatabase ) GetConversationMinMaxSeqInMongoAndCache ( ctx context . Context , conversationID string ) ( minSeqMongo , maxSeqMongo , minSeqCache , maxSeqCache int64 , err error ) {
ctx context . Context ,
conversationID string ,
) ( minSeqMongo , maxSeqMongo , minSeqCache , maxSeqCache int64 , err error ) {
minSeqMongo , maxSeqMongo , err = db . GetMinMaxSeqMongo ( ctx , conversationID )
minSeqMongo , maxSeqMongo , err = db . GetMinMaxSeqMongo ( ctx , conversationID )
if err != nil {
if err != nil {
return
return
@ -1124,17 +845,11 @@ func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(
return
return
}
}
func ( db * commonMsgDatabase ) GetMongoMaxAndMinSeq (
func ( db * commonMsgDatabase ) GetMongoMaxAndMinSeq ( ctx context . Context , conversationID string ) ( minSeqMongo , maxSeqMongo int64 , err error ) {
ctx context . Context ,
conversationID string ,
) ( minSeqMongo , maxSeqMongo int64 , err error ) {
return db . GetMinMaxSeqMongo ( ctx , conversationID )
return db . GetMinMaxSeqMongo ( ctx , conversationID )
}
}
func ( db * commonMsgDatabase ) GetMinMaxSeqMongo (
func ( db * commonMsgDatabase ) GetMinMaxSeqMongo ( ctx context . Context , conversationID string ) ( minSeqMongo , maxSeqMongo int64 , err error ) {
ctx context . Context ,
conversationID string ,
) ( minSeqMongo , maxSeqMongo int64 , err error ) {
oldestMsgMongo , err := db . msgDocDatabase . GetOldestMsg ( ctx , conversationID )
oldestMsgMongo , err := db . msgDocDatabase . GetOldestMsg ( ctx , conversationID )
if err != nil {
if err != nil {
return
return
@ -1148,37 +863,10 @@ func (db *commonMsgDatabase) GetMinMaxSeqMongo(
return
return
}
}
func ( db * commonMsgDatabase ) RangeUserSendCount (
func ( db * commonMsgDatabase ) RangeUserSendCount ( ctx context . Context , start time . Time , end time . Time , group bool , ase bool , pageNumber int32 , showNumber int32 ) ( msgCount int64 , userCount int64 , users [ ] * unRelationTb . UserCount , dateCount map [ string ] int64 , err error ) {
ctx context . Context ,
start time . Time ,
end time . Time ,
group bool ,
ase bool ,
pageNumber int32 ,
showNumber int32 ,
) ( msgCount int64 , userCount int64 , users [ ] * unRelationTb . UserCount , dateCount map [ string ] int64 , err error ) {
return db . msgDocDatabase . RangeUserSendCount ( ctx , start , end , group , ase , pageNumber , showNumber )
return db . msgDocDatabase . RangeUserSendCount ( ctx , start , end , group , ase , pageNumber , showNumber )
}
}
func ( db * commonMsgDatabase ) RangeGroupSendCount (
func ( db * commonMsgDatabase ) RangeGroupSendCount ( ctx context . Context , start time . Time , end time . Time , ase bool , pageNumber int32 , showNumber int32 ) ( msgCount int64 , userCount int64 , groups [ ] * unRelationTb . GroupCount , dateCount map [ string ] int64 , err error ) {
ctx context . Context ,
start time . Time ,
end time . Time ,
ase bool ,
pageNumber int32 ,
showNumber int32 ,
) ( msgCount int64 , userCount int64 , groups [ ] * unRelationTb . GroupCount , dateCount map [ string ] int64 , err error ) {
return db . msgDocDatabase . RangeGroupSendCount ( ctx , start , end , ase , pageNumber , showNumber )
return db . msgDocDatabase . RangeGroupSendCount ( ctx , start , end , ase , pageNumber , showNumber )
}
}
func ( db * commonMsgDatabase ) SearchMessage ( ctx context . Context , req * pbMsg . SearchMessageReq ) ( msgData [ ] * sdkws . MsgData , err error ) {
var totalMsgs [ ] * sdkws . MsgData
msgs , err := db . msgDocDatabase . SearchMessage ( ctx , req )
if err != nil {
return nil , err
}
for _ , msg := range msgs {
totalMsgs = append ( totalMsgs , convert . MsgDB2Pb ( msg . Msg ) )
}
return totalMsgs , nil
}