@ -135,7 +135,6 @@ func InitCommonMsgDatabase(rdb redis.UniversalClient, database *mongo.Database)
cacheModel := cache . NewMsgCacheModel ( rdb )
msgDocModel := unrelation . NewMsgMongoDriver ( database )
CommonMsgDatabase := NewCommonMsgDatabase ( msgDocModel , cacheModel )
return CommonMsgDatabase
}
@ -151,17 +150,14 @@ type commonMsgDatabase struct {
func ( db * commonMsgDatabase ) MsgToMQ ( ctx context . Context , key string , msg2mq * sdkws . MsgData ) error {
_ , _ , err := db . producer . SendMessage ( ctx , key , msg2mq )
return err
}
func ( db * commonMsgDatabase ) MsgToModifyMQ ( ctx context . Context , key , conversationID string , messages [ ] * sdkws . MsgData ) error {
if len ( messages ) > 0 {
_ , _ , err := db . producerToModify . SendMessage ( ctx , key , & pbmsg . MsgDataToModifyByMQ { ConversationID : conversationID , Messages : messages } )
return err
}
return nil
}
@ -169,26 +165,26 @@ func (db *commonMsgDatabase) MsgToPushMQ(ctx context.Context, key, conversationI
partition , offset , err := db . producerToPush . SendMessage ( ctx , key , & pbmsg . PushMsgDataToMQ { MsgData : msg2mq , ConversationID : conversationID } )
if err != nil {
log . ZError ( ctx , "MsgToPushMQ" , err , "key" , key , "msg2mq" , msg2mq )
return 0 , 0 , err
}
return partition , offset , nil
}
func ( db * commonMsgDatabase ) MsgToMongoMQ ( ctx context . Context , key , conversationID string , messages [ ] * sdkws . MsgData , lastSeq int64 ) error {
if len ( messages ) > 0 {
_ , _ , err := db . producerToMongo . SendMessage ( ctx , key , & pbmsg . MsgDataToMongoByMQ { LastSeq : lastSeq , ConversationID : conversationID , MsgData : messages } )
return err
}
return nil
}
func checkTypeForBatchInsertBlock ( fields [ ] any , key int8 , firstSeq int64 ) error {
func ( db * commonMsgDatabase ) BatchInsertBlock ( ctx context . Context , conversationID string , fields [ ] any , key int8 , firstSeq int64 ) error {
if len ( fields ) == 0 {
return nil
}
num := db . msg . GetSingleGocMsgNum ( )
// num = 100
for i , field := range fields { // check type
for i , field := range fields { // 检查类型
var ok bool
switch key {
case updateKeyMsg :
@ -206,11 +202,8 @@ func checkTypeForBatchInsertBlock(fields []any, key int8, firstSeq int64) error
return errs . ErrInternalServer . Wrap ( "field type is invalid" )
}
}
return nil
}
func ( db * commonMsgDatabase ) updateMsgModelForBatchInsertBlock ( ctx context . Context , conversationID string , fields [ ] any , key int8 , seq int64 , i int ) ( bool , error ) {
// 返回值为true表示数据库存在该文档, false表示数据库不存在该文档
updateMsgModel := func ( seq int64 , i int ) ( bool , error ) {
var (
res * mongo . UpdateResult
err error
@ -227,16 +220,25 @@ func (db *commonMsgDatabase) updateMsgModelForBatchInsertBlock(ctx context.Conte
if err != nil {
return false , err
}
return res . MatchedCount > 0 , nil
}
func ( db * commonMsgDatabase ) newDocForBatchInsertBlock ( conversationID string , fields [ ] any , key int8 , seq , firstSeq , num int64 , i int ) ( unrelationtb . MsgDocModel , int ) {
}
tryUpdate := true
for i := 0 ; i < len ( fields ) ; i ++ {
seq := firstSeq + int64 ( i ) // 当前seq
if tryUpdate {
matched , err := updateMsgModel ( seq , i )
if err != nil {
return err
}
if matched {
continue // 匹配到了,继续下一个(不一定修改)
}
}
doc := unrelationtb . MsgDocModel {
DocID : db . msg . GetDocID ( conversationID , seq ) ,
Msg : make ( [ ] * unrelationtb . MsgInfoModel , num ) ,
}
var insert int // number of inserted
var insert int // 插入的数量
for j := i ; j < len ( fields ) ; j ++ {
seq = firstSeq + int64 ( j )
if db . msg . GetDocID ( conversationID , seq ) != doc . DocID {
@ -263,49 +265,17 @@ func (db *commonMsgDatabase) newDocForBatchInsertBlock(conversationID string, fi
doc . Msg [ i ] . DelList = [ ] string { }
}
}
return doc , insert
}
func ( db * commonMsgDatabase ) BatchInsertBlock ( ctx context . Context , conversationID string , fields [ ] any , key int8 , firstSeq int64 ) error {
if len ( fields ) == 0 {
return nil
}
num := db . msg . GetSingleGocMsgNum ( )
// num = 100
err := checkTypeForBatchInsertBlock ( fields , key , firstSeq )
if err != nil {
return err
}
tryUpdate := true
for i := 0 ; i < len ( fields ) ; i ++ {
seq := firstSeq + int64 ( i ) // current seq
// try update
if tryUpdate {
matched , err := db . updateMsgModelForBatchInsertBlock ( ctx , conversationID , fields , key , seq , i )
if err != nil {
return err
}
if matched {
continue // if matched,skip
}
}
doc , insert := db . newDocForBatchInsertBlock ( conversationID , fields , key , seq , firstSeq , num , i )
// insert doc into db
if err := db . msgDocDatabase . Create ( ctx , & doc ) ; err != nil {
if mongo . IsDuplicateKeyError ( err ) {
i -- // exists concurrent,
tryUpdate = true // try update
i -- // 存在并发,重试当前数据
tryUpdate = true // 以修改模式
continue
}
return err
}
tryUpdate = false // if insert success,change to insert mode
i += insert - 1 // skip inserted data
tryUpdate = false // 当前以插入成功,下一块优先插入模式
i += insert - 1 // 跳过已插入的数据
}
return nil
}
@ -352,7 +322,6 @@ func (db *commonMsgDatabase) BatchInsertChat2DB(ctx context.Context, conversatio
Ex : msg . Ex ,
}
}
return db . BatchInsertBlock ( ctx , conversationID , msgs , updateKeyMsg , msgList [ 0 ] . Seq )
}
@ -369,11 +338,9 @@ func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userI
log . ZDebug ( ctx , "MarkSingleChatMsgsAsRead" , "userID" , userID , "docID" , docID , "indexes" , indexes )
if err := db . msgDocDatabase . MarkSingleChatMsgsAsRead ( ctx , userID , docID , indexes ) ; err != nil {
log . ZError ( ctx , "MarkSingleChatMsgsAsRead" , err , "userID" , userID , "docID" , docID , "indexes" , indexes )
return err
}
}
return nil
}
@ -387,9 +354,8 @@ func (db *commonMsgDatabase) DelUserDeleteMsgsList(ctx context.Context, conversa
func ( db * commonMsgDatabase ) BatchInsertChat2Cache ( ctx context . Context , conversationID string , msgs [ ] * sdkws . MsgData ) ( seq int64 , isNew bool , err error ) {
currentMaxSeq , err := db . cache . GetMaxSeq ( ctx , conversationID )
if err != nil && ! errors . Is ( err , redis . Nil ) {
if err != nil && errs . Unwrap ( err ) != redis . Nil {
prome . Inc ( prome . SeqGetFailedCounter )
return 0 , false , err
}
prome . Inc ( prome . SeqGetSuccessCounter )
@ -400,7 +366,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
if lenList < 1 {
return 0 , false , errors . New ( "too short as 0" )
}
if err ors. Is ( err , redis . Nil ) {
if err s. Unwrap ( err ) == redis . Nil {
isNew = true
}
lastMaxSeq := currentMaxSeq
@ -430,7 +396,6 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
} else {
prome . Inc ( prome . SeqSetSuccessCounter )
}
return lastMaxSeq , isNew , utils . Wrap ( err , "" )
}
@ -445,7 +410,6 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversat
totalMsgs = append ( totalMsgs , convert . MsgDB2Pb ( msg . Msg ) )
}
}
return totalMsgs , nil
}
@ -456,7 +420,6 @@ func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID
msg . Msg . IsRead = true
}
}
return msgs , err
}
@ -475,76 +438,16 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID strin
seqMsgs = append ( seqMsgs , convert . MsgDB2Pb ( msg . Msg ) )
}
}
return seqMsgs , nil
}
func ( db * commonMsgDatabase ) getCacheMsgForGetMsgBySeqsRange ( ctx context . Context , userID string , conversationID string , begin int64 , seqs [ ] int64 ) ( [ ] * sdkws . MsgData , [ ] int64 , error ) {
newBegin := seqs [ 0 ]
newEnd := seqs [ len ( seqs ) - 1 ]
log . ZDebug ( ctx , "GetMsgBySeqsRange" , "first seqs" , seqs , "newBegin" , newBegin , "newEnd" , newEnd )
cachedMsgs , failedSeqs , err := db . cache . GetMessagesBySeq ( ctx , conversationID , seqs )
if err != nil {
if ! errors . Is ( err , redis . Nil ) {
prome . Add ( prome . MsgPullFromRedisFailedCounter , len ( failedSeqs ) )
log . ZError ( ctx , "get message from redis exception" , err , "conversationID" , conversationID , "seqs" , seqs )
}
}
var successMsgs [ ] * sdkws . MsgData
if len ( cachedMsgs ) == 0 {
return successMsgs , failedSeqs , err
}
// if len(cachedMsgs) > 0
delSeqs , err2 := db . cache . GetUserDelList ( ctx , userID , conversationID )
if err2 != nil && ! errors . Is ( err2 , redis . Nil ) {
return nil , nil , err2
}
var cacheDelNum int
for _ , msg := range cachedMsgs {
if ! utils . Contain ( msg . Seq , delSeqs ... ) {
successMsgs = append ( successMsgs , msg )
} else {
cacheDelNum += 1
}
}
log . ZDebug ( ctx , "get delSeqs from redis" , "delSeqs" , delSeqs , "userID" , userID , "conversationID" , conversationID , "cacheDelNum" , cacheDelNum )
var reGetSeqsCache [ ] int64
for i := 1 ; i <= cacheDelNum ; {
newSeq := newBegin - int64 ( i )
if newSeq >= begin {
if ! utils . Contain ( newSeq , delSeqs ... ) {
log . ZDebug ( ctx , "seq del in cache, a new seq in range append" , "new seq" , newSeq )
reGetSeqsCache = append ( reGetSeqsCache , newSeq )
i ++
}
} else {
break
}
}
if len ( reGetSeqsCache ) > 0 {
log . ZDebug ( ctx , "reGetSeqsCache" , "reGetSeqsCache" , reGetSeqsCache )
cachedMsgs , failedSeqs2 , err2 := db . cache . GetMessagesBySeq ( ctx , conversationID , reGetSeqsCache )
if err2 != nil {
if ! errors . Is ( err2 , redis . Nil ) {
prome . Add ( prome . MsgPullFromRedisFailedCounter , len ( failedSeqs2 ) )
log . ZError ( ctx , "get message from redis exception" , err2 , "conversationID" , conversationID , "seqs" , reGetSeqsCache )
}
}
failedSeqs = append ( failedSeqs , failedSeqs2 ... )
successMsgs = append ( successMsgs , cachedMsgs ... )
}
return successMsgs , failedSeqs , err
}
func ( db * commonMsgDatabase ) GetMsgBySeqsRange ( ctx context . Context , userID string , conversationID string , begin , end , num , userMaxSeq int64 ) ( int64 , int64 , [ ] * sdkws . MsgData , error ) {
// 从缓存中获取最小和最大序列号,并根据给定的范围值进行调整
userMinSeq , err := db . cache . GetConversationUserMinSeq ( ctx , conversationID , userID )
if err != nil && ! errors . Is ( err , redis . Nil ) {
if err != nil && errs . Unwrap ( err ) != redis . Nil {
return 0 , 0 , nil , err
}
minSeq , err := db . cache . GetMinSeq ( ctx , conversationID )
if err != nil && ! errors . Is ( err , redis . Nil ) {
if err != nil && errs . Unwrap ( err ) != redis . Nil {
return 0 , 0 , nil , err
}
if userMinSeq > minSeq {
@ -552,25 +455,18 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
}
if minSeq > end {
log . ZInfo ( ctx , "minSeq > end" , "minSeq" , minSeq , "end" , end )
return 0 , 0 , nil , nil
}
maxSeq , err := db . cache . GetMaxSeq ( ctx , conversationID )
if err != nil && ! errors . Is ( err , redis . Nil ) {
if err != nil && errs . Unwrap ( err ) != redis . Nil {
return 0 , 0 , nil , err
}
// log out debug info
log . ZDebug ( ctx , "GetMsgBySeqsRange" , "userMinSeq" , userMinSeq , "conMinSeq" , minSeq , "conMaxSeq" , maxSeq , "userMaxSeq" , userMaxSeq )
// adjust maxSeq according to userMaxSeq
if userMaxSeq != 0 {
if userMaxSeq < maxSeq {
maxSeq = userMaxSeq
}
}
// adjust begin and end according to minSeq and maxSeq
if begin < minSeq {
begin = minSeq
}
@ -580,8 +476,6 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
if end < begin {
return 0 , 0 , nil , errs . ErrArgs . Wrap ( "seq end < begin" )
}
// get seqs to search
var seqs [ ] int64
for i := end ; i > end - num ; i -- {
if i >= begin {
@ -593,24 +487,67 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
if len ( seqs ) == 0 {
return 0 , 0 , nil , nil
}
// get info from cache,and filter deleted msg
successMsgs , failedSeqs , err := db . getCacheMsgForGetMsgBySeqsRange ( ctx , userID , conversationID , begin , seqs )
newBegin := seqs [ 0 ]
newEnd := seqs [ len ( seqs ) - 1 ]
log . ZDebug ( ctx , "GetMsgBySeqsRange" , "first seqs" , seqs , "newBegin" , newBegin , "newEnd" , newEnd )
cachedMsgs , failedSeqs , err := db . cache . GetMessagesBySeq ( ctx , conversationID , seqs )
if err != nil {
if err != redis . Nil {
prome . Add ( prome . MsgPullFromRedisFailedCounter , len ( failedSeqs ) )
log . ZError ( ctx , "get message from redis exception" , err , "conversationID" , conversationID , "seqs" , seqs )
}
}
var successMsgs [ ] * sdkws . MsgData
if len ( cachedMsgs ) > 0 {
delSeqs , err := db . cache . GetUserDelList ( ctx , userID , conversationID )
if err != nil && errs . Unwrap ( err ) != redis . Nil {
return 0 , 0 , nil , err
}
// log out debug info
var cacheDelNum int
for _ , msg := range cachedMsgs {
if ! utils . Contain ( msg . Seq , delSeqs ... ) {
successMsgs = append ( successMsgs , msg )
} else {
cacheDelNum += 1
}
}
log . ZDebug ( ctx , "get delSeqs from redis" , "delSeqs" , delSeqs , "userID" , userID , "conversationID" , conversationID , "cacheDelNum" , cacheDelNum )
var reGetSeqsCache [ ] int64
for i := 1 ; i <= cacheDelNum ; {
newSeq := newBegin - int64 ( i )
if newSeq >= begin {
if ! utils . Contain ( newSeq , delSeqs ... ) {
log . ZDebug ( ctx , "seq del in cache, a new seq in range append" , "new seq" , newSeq )
reGetSeqsCache = append ( reGetSeqsCache , newSeq )
i ++
}
} else {
break
}
}
if len ( reGetSeqsCache ) > 0 {
log . ZDebug ( ctx , "reGetSeqsCache" , "reGetSeqsCache" , reGetSeqsCache )
cachedMsgs , failedSeqs2 , err := db . cache . GetMessagesBySeq ( ctx , conversationID , reGetSeqsCache )
if err != nil {
if err != redis . Nil {
prome . Add ( prome . MsgPullFromRedisFailedCounter , len ( failedSeqs2 ) )
log . ZError ( ctx , "get message from redis exception" , err , "conversationID" , conversationID , "seqs" , reGetSeqsCache )
}
}
failedSeqs = append ( failedSeqs , failedSeqs2 ... )
successMsgs = append ( successMsgs , cachedMsgs ... )
}
}
log . ZDebug ( ctx , "get msgs from cache" , "successMsgs" , successMsgs )
if len ( failedSeqs ) != 0 {
log . ZDebug ( ctx , "msgs not exist in redis" , "seqs" , failedSeqs )
}
// if not found in cache,find in mongo
// get from cache or db
prome . Add ( prome . MsgPullFromRedisSuccessCounter , len ( successMsgs ) )
if len ( failedSeqs ) > 0 {
mongoMsgs , err := db . getMsgBySeqsRange ( ctx , userID , conversationID , failedSeqs , begin , end )
if err != nil {
prome . Add ( prome . MsgPullFromMongoFailedCounter , len ( failedSeqs ) )
return 0 , 0 , nil , err
}
prome . Add ( prome . MsgPullFromMongoSuccessCounter , len ( mongoMsgs ) )
@ -622,15 +559,15 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
func ( db * commonMsgDatabase ) GetMsgBySeqs ( ctx context . Context , userID string , conversationID string , seqs [ ] int64 ) ( int64 , int64 , [ ] * sdkws . MsgData , error ) {
userMinSeq , err := db . cache . GetConversationUserMinSeq ( ctx , conversationID , userID )
if err != nil && ! errors . Is ( err , redis . Nil ) {
if err != nil && errs . Unwrap ( err ) != redis . Nil {
return 0 , 0 , nil , err
}
minSeq , err := db . cache . GetMinSeq ( ctx , conversationID )
if err != nil && ! errors . Is ( err , redis . Nil ) {
if err != nil && errs . Unwrap ( err ) != redis . Nil {
return 0 , 0 , nil , err
}
maxSeq , err := db . cache . GetMaxSeq ( ctx , conversationID )
if err != nil && ! errors . Is ( err , redis . Nil ) {
if err != nil && errs . Unwrap ( err ) != redis . Nil {
return 0 , 0 , nil , err
}
if userMinSeq < minSeq {
@ -644,7 +581,7 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co
}
successMsgs , failedSeqs , err := db . cache . GetMessagesBySeq ( ctx , conversationID , newSeqs )
if err != nil {
if ! errors . Is ( err , redis . Nil ) {
if err != redis . Nil {
prome . Add ( prome . MsgPullFromRedisFailedCounter , len ( failedSeqs ) )
log . ZError ( ctx , "get message from redis exception" , err , "failedSeqs" , failedSeqs , "conversationID" , conversationID )
}
@ -670,13 +607,11 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co
mongoMsgs , err := db . getMsgBySeqs ( ctx , userID , conversationID , failedSeqs )
if err != nil {
prome . Add ( prome . MsgPullFromMongoFailedCounter , len ( failedSeqs ) )
return 0 , 0 , nil , err
}
prome . Add ( prome . MsgPullFromMongoSuccessCounter , len ( mongoMsgs ) )
successMsgs = append ( successMsgs , mongoMsgs ... )
}
return minSeq , maxSeq , successMsgs , nil
}
@ -697,20 +632,30 @@ func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Cont
log . ZWarn ( ctx , "CleanUpOneUserAllMsg" , err , "conversationID" , conversationID )
}
}
return db . cache . SetMinSeq ( ctx , conversationID , minSeq )
}
func processMsgDocModel (
ctx context . Context ,
msgDocModel * unrelationtb . MsgDocModel ,
userID , conversationID string ,
index int64 ,
destructTime int64 ,
lastMsgDestructTime time . Time ,
) ( seqs [ ] int64 , over bool ) {
func ( db * commonMsgDatabase ) UserMsgsDestruct ( ctx context . Context , userID string , conversationID string , destructTime int64 , lastMsgDestructTime time . Time ) ( seqs [ ] int64 , err error ) {
var index int64
for {
// from oldest 2 newest
msgDocModel , err := db . msgDocDatabase . GetMsgDocModelByIndex ( ctx , conversationID , index , 1 )
if err != nil || msgDocModel . DocID == "" {
if err != nil {
if err == unrelation . ErrMsgListNotExist {
log . ZDebug ( ctx , "not doc find" , "conversationID" , conversationID , "userID" , userID , "index" , index )
} else {
log . ZError ( ctx , "deleteMsgRecursion GetUserMsgListByIndex failed" , err , "conversationID" , conversationID , "index" , index )
}
}
// 获取报错, 或者获取不到了, 物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归
break
}
index ++
//&& msgDocModel.Msg[0].Msg.SendTime > lastMsgDestructTime.UnixMilli()
if len ( msgDocModel . Msg ) > 0 {
i := 0
var over bool
for _ , msg := range msgDocModel . Msg {
i ++
if msg != nil && msg . Msg != nil && msg . Msg . SendTime + destructTime * 1000 <= time . Now ( ) . UnixMilli ( ) {
@ -720,50 +665,20 @@ func processMsgDocModel(
} else {
log . ZDebug ( ctx , "all msg need destruct is found" , "conversationID" , conversationID , "userID" , userID , "index" , index , "stop index" , i )
over = true
return seqs , over
}
}
}
return seqs , over
}
func ( db * commonMsgDatabase ) UserMsgsDestruct ( ctx context . Context , userID , conversationID string , destructTime int64 , lastMsgDestructTime time . Time ) ( seqs [ ] int64 , err error ) {
var index int64
// refresh msg list
for {
// from oldest to newest
msgDocModel , err2 := db . msgDocDatabase . GetMsgDocModelByIndex ( ctx , conversationID , index , 1 )
if err2 != nil || msgDocModel . DocID == "" {
if err2 != nil {
if errors . Is ( err2 , unrelation . ErrMsgListNotExist ) {
log . ZDebug ( ctx , "not doc find" , "conversationID" , conversationID , "userID" , userID , "index" , index )
} else {
log . ZError ( ctx , "deleteMsgRecursion GetUserMsgListByIndex failed" , err2 , "conversationID" , conversationID , "index" , index )
}
}
// If there is an error or no message document is found, delete the message physically and return the sequence number, then end the recursion.
break
}
index ++
//&& msgDocModel.Msg[0].Msg.SendTime > lastMsgDestructTime.UnixMilli()
curSeqs , over := processMsgDocModel ( ctx , msgDocModel , userID , conversationID , index , destructTime , lastMsgDestructTime )
seqs = append ( seqs , curSeqs ... )
}
if over {
break
}
}
// Log the result of the function call.
log . ZDebug ( ctx , "UserMsgsDestruct" , "conversationID" , conversationID , "userID" , userID , "seqs" , seqs )
if len ( seqs ) == 0 {
return seqs , nil
}
// if len(seqs) > 0
log . ZDebug ( ctx , "UserMsgsDestruct" , "conversationID" , conversationID , "userID" , userID , "seqs" , seqs )
if len ( seqs ) > 0 {
userMinSeq := seqs [ len ( seqs ) - 1 ] + 1
currentUserMinSeq , err := db . cache . GetConversationUserMinSeq ( ctx , conversationID , userID )
if err != nil && ! errors . Is ( err , redis . Nil ) {
if err != nil && errs . Unwrap ( err ) != redis . Nil {
return nil , err
}
if currentUserMinSeq < userMinSeq {
@ -771,7 +686,7 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID, conve
return nil , err
}
}
}
return seqs , nil
}
@ -794,50 +709,28 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio
msgDocModel , err := db . msgDocDatabase . GetMsgDocModelByIndex ( ctx , conversationID , index , 1 )
if err != nil || msgDocModel . DocID == "" {
if err != nil {
if err ors. Is ( err , unrelation . ErrMsgListNotExist ) {
if err == unrelation . ErrMsgListNotExist {
log . ZDebug ( ctx , "deleteMsgRecursion ErrMsgListNotExist" , "conversationID" , conversationID , "index:" , index )
} else {
log . ZError ( ctx , "deleteMsgRecursion GetUserMsgListByIndex failed" , err , "conversationID" , conversationID , "index" , index )
}
}
// get error or miss content, delete physically and return minSeq,delMongoMsgsPhysical(delStruct.delDocIDList), end recursion
// 获取报错, 或者获取不到了, 物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归
err = db . msgDocDatabase . DeleteDocs ( ctx , delStruct . delDocIDs )
if err != nil {
return 0 , err
}
return delStruct . getSetMinSeq ( ) + 1 , nil
}
log . ZDebug ( ctx , "doc info" , "conversationID" , conversationID , "index" , index , "docID" , msgDocModel . DocID , "len" , len ( msgDocModel . Msg ) )
if int64 ( len ( msgDocModel . Msg ) ) > db . msg . GetSingleGocMsgNum ( ) {
log . ZWarn ( ctx , "msgs too large" , nil , "lenth" , len ( msgDocModel . Msg ) , "docID:" , msgDocModel . DocID )
}
fullAndExpired := msgDocModel . IsFull ( ) && msgDocModel . Msg [ len ( msgDocModel . Msg ) - 1 ] . Msg . SendTime + ( remainTime * 1000 ) < utils . GetCurrentTimestampByMill ( )
if fullAndExpired {
handleFullAndExpiredForDeleteMsgRecursion ( ctx , msgDocModel , delStruct )
} else {
handleNotFullAndExpiredForDeleteMsgRecursion ( ctx , msgDocModel , remainTime , index , conversationID , delStruct , db )
}
seq , err := db . deleteMsgRecursion ( ctx , conversationID , index + 1 , delStruct , remainTime )
return seq , err
}
func handleFullAndExpiredForDeleteMsgRecursion ( ctx context . Context , msgDocModel * unrelationtb . MsgDocModel , delStruct * delMsgRecursionStruct ) {
if msgDocModel . IsFull ( ) && msgDocModel . Msg [ len ( msgDocModel . Msg ) - 1 ] . Msg . SendTime + ( remainTime * 1000 ) < utils . GetCurrentTimestampByMill ( ) {
log . ZDebug ( ctx , "doc is full and all msg is expired" , "docID" , msgDocModel . DocID )
delStruct . delDocIDs = append ( delStruct . delDocIDs , msgDocModel . DocID )
delStruct . minSeq = msgDocModel . Msg [ len ( msgDocModel . Msg ) - 1 ] . Msg . Seq
}
func handleNotFullAndExpiredForDeleteMsgRecursion (
ctx context . Context ,
msgDocModel * unrelationtb . MsgDocModel ,
remainTime , index int64 ,
conversationID string ,
delStruct * delMsgRecursionStruct ,
db * commonMsgDatabase ,
) {
} else {
var delMsgIndexs [ ] int
for i , MsgInfoModel := range msgDocModel . Msg {
if MsgInfoModel != nil && MsgInfoModel . Msg != nil {
@ -847,12 +740,14 @@ func handleNotFullAndExpiredForDeleteMsgRecursion(
}
}
if len ( delMsgIndexs ) > 0 {
err2 := db . msgDocDatabase . DeleteMsgsInOneDocByIndex ( ctx , msgDocModel . DocID , delMsgIndexs )
if err2 != nil {
log . ZError ( ctx , "deleteMsgRecursion DeleteMsgsInOneDocByIndex failed" , err2 , "conversationID" , conversationID , "index" , index )
if err := db . msgDocDatabase . DeleteMsgsInOneDocByIndex ( ctx , msgDocModel . DocID , delMsgIndexs ) ; err != nil {
log . ZError ( ctx , "deleteMsgRecursion DeleteMsgsInOneDocByIndex failed" , err , "conversationID" , conversationID , "index" , index )
}
delStruct . minSeq = int64 ( msgDocModel . Msg [ delMsgIndexs [ len ( delMsgIndexs ) - 1 ] ] . Msg . Seq )
}
delStruct . minSeq = msgDocModel . Msg [ delMsgIndexs [ len ( delMsgIndexs ) - 1 ] ] . Msg . Seq
}
seq , err := db . deleteMsgRecursion ( ctx , conversationID , index + 1 , delStruct , remainTime )
return seq , err
}
func ( db * commonMsgDatabase ) DeleteMsgsPhysicalBySeqs ( ctx context . Context , conversationID string , allSeqs [ ] int64 ) error {
@ -868,15 +763,13 @@ func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conve
return err
}
}
return nil
}
func ( db * commonMsgDatabase ) DeleteUserMsgsBySeqs ( ctx context . Context , userID string , conversationID string , seqs [ ] int64 ) error {
cachedMsgs , _ , err := db . cache . GetMessagesBySeq ( ctx , conversationID , seqs )
if err != nil && err ors. Is ( err , redis . Nil ) {
if err != nil && err s. Unwrap ( err ) != redis . Nil {
log . ZWarn ( ctx , "DeleteUserMsgsBySeqs" , err , "conversationID" , conversationID , "seqs" , seqs )
return err
}
if len ( cachedMsgs ) > 0 {
@ -896,7 +789,6 @@ func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID st
}
}
}
return nil
}
@ -908,12 +800,11 @@ func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, u
for _ , conversationID := range conversationIDs {
maxSeq , err := db . cache . GetMaxSeq ( ctx , conversationID )
if err != nil {
if err ors. Is ( err , redis . Nil ) {
if err == redis . Nil {
log . ZInfo ( ctx , "max seq is nil" , "conversationID" , conversationID )
} else {
log . ZError ( ctx , "get max seq failed" , err , "conversationID" , conversationID )
}
continue
}
if err := db . cache . SetMinSeq ( ctx , conversationID , maxSeq + 1 ) ; err != nil {
@ -1007,7 +898,6 @@ func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context
if err != nil {
return
}
return
}
@ -1026,7 +916,6 @@ func (db *commonMsgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversation
return
}
maxSeqMongo = newestMsgMongo . Msg . Seq
return
}
@ -1054,7 +943,7 @@ func (db *commonMsgDatabase) RangeGroupSendCount(
}
func ( db * commonMsgDatabase ) SearchMessage ( ctx context . Context , req * pbmsg . SearchMessageReq ) ( total int32 , msgData [ ] * sdkws . MsgData , err error ) {
totalMsgs := make ( [ ] * sdkws . MsgData , 0 )
var totalMsgs [ ] * sdkws . MsgData
total , msgs , err := db . msgDocDatabase . SearchMessage ( ctx , req )
if err != nil {
return 0 , nil , err
@ -1065,7 +954,6 @@ func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.Searc
}
totalMsgs = append ( totalMsgs , convert . MsgDB2Pb ( msg . Msg ) )
}
return total , totalMsgs , nil
}