@ -1,3 +1,17 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controller
import (
@ -21,11 +35,10 @@ import (
"context"
"errors"
"go.mongodb.org/mongo-driver/mongo"
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"go.mongodb.org/mongo-driver/mongo"
)
const (
@ -44,36 +57,16 @@ type CommonMsgDatabase interface {
DeleteMessagesFromCache ( ctx context . Context , conversationID string , seqs [ ] int64 ) error
DelUserDeleteMsgsList ( ctx context . Context , conversationID string , seqs [ ] int64 )
// incrSeq然后批量插入缓存
BatchInsertChat2Cache (
ctx context . Context ,
conversationID string ,
msgs [ ] * sdkws . MsgData ,
) ( seq int64 , isNewConversation bool , err error )
BatchInsertChat2Cache ( ctx context . Context , conversationID string , msgs [ ] * sdkws . MsgData ) ( seq int64 , isNewConversation bool , err error )
// 通过seqList获取mongo中写扩散消息
GetMsgBySeqsRange (
ctx context . Context ,
userID string ,
conversationID string ,
begin , end , num , userMaxSeq int64 ,
) ( minSeq int64 , maxSeq int64 , seqMsg [ ] * sdkws . MsgData , err error )
GetMsgBySeqsRange ( ctx context . Context , userID string , conversationID string , begin , end , num , userMaxSeq int64 ) ( minSeq int64 , maxSeq int64 , seqMsg [ ] * sdkws . MsgData , err error )
// 通过seqList获取大群在 mongo里面的消息
GetMsgBySeqs (
ctx context . Context ,
userID string ,
conversationID string ,
seqs [ ] int64 ,
) ( minSeq int64 , maxSeq int64 , seqMsg [ ] * sdkws . MsgData , err error )
GetMsgBySeqs ( ctx context . Context , userID string , conversationID string , seqs [ ] int64 ) ( minSeq int64 , maxSeq int64 , seqMsg [ ] * sdkws . MsgData , err error )
// 删除会话消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache)
DeleteConversationMsgsAndSetMinSeq ( ctx context . Context , conversationID string , remainTime int64 ) error
// 用户标记删除过期消息返回标记删除的seq列表
UserMsgsDestruct (
cte context . Context ,
userID string ,
conversationID string ,
destructTime int64 ,
lastMsgDestructTime time . Time ,
) ( seqs [ ] int64 , err error )
UserMsgsDestruct ( cte context . Context , userID string , conversationID string , destructTime int64 , lastMsgDestructTime time . Time ) ( seqs [ ] int64 , err error )
// 用户根据seq删除消息
DeleteUserMsgsBySeqs ( ctx context . Context , userID string , conversationID string , seqs [ ] int64 ) error
@ -99,10 +92,7 @@ type CommonMsgDatabase interface {
UserSetHasReadSeqs ( ctx context . Context , userID string , hasReadSeqs map [ string ] int64 ) error
GetMongoMaxAndMinSeq ( ctx context . Context , conversationID string ) ( minSeqMongo , maxSeqMongo int64 , err error )
GetConversationMinMaxSeqInMongoAndCache (
ctx context . Context ,
conversationID string ,
) ( minSeqMongo , maxSeqMongo , minSeqCache , maxSeqCache int64 , err error )
GetConversationMinMaxSeqInMongoAndCache ( ctx context . Context , conversationID string ) ( minSeqMongo , maxSeqMongo , minSeqCache , maxSeqCache int64 , err error )
SetSendMsgStatus ( ctx context . Context , id string , status int32 ) error
GetSendMsgStatus ( ctx context . Context , id string ) ( int32 , error )
SearchMessage ( ctx context . Context , req * pbMsg . SearchMessageReq ) ( msgData [ ] * sdkws . MsgData , err error )
@ -113,23 +103,8 @@ type CommonMsgDatabase interface {
MsgToPushMQ ( ctx context . Context , key , conversarionID string , msg2mq * sdkws . MsgData ) ( int32 , int64 , error )
MsgToMongoMQ ( ctx context . Context , key , conversarionID string , msgs [ ] * sdkws . MsgData , lastSeq int64 ) error
RangeUserSendCount (
ctx context . Context ,
start time . Time ,
end time . Time ,
group bool ,
ase bool ,
pageNumber int32 ,
showNumber int32 ,
) ( msgCount int64 , userCount int64 , users [ ] * unRelationTb . UserCount , dateCount map [ string ] int64 , err error )
RangeGroupSendCount (
ctx context . Context ,
start time . Time ,
end time . Time ,
ase bool ,
pageNumber int32 ,
showNumber int32 ,
) ( msgCount int64 , userCount int64 , groups [ ] * unRelationTb . GroupCount , dateCount map [ string ] int64 , err error )
RangeUserSendCount ( ctx context . Context , start time . Time , end time . Time , group bool , ase bool , pageNumber int32 , showNumber int32 ) ( msgCount int64 , userCount int64 , users [ ] * unRelationTb . UserCount , dateCount map [ string ] int64 , err error )
RangeGroupSendCount ( ctx context . Context , start time . Time , end time . Time , ase bool , pageNumber int32 , showNumber int32 ) ( msgCount int64 , userCount int64 , groups [ ] * unRelationTb . GroupCount , dateCount map [ string ] int64 , err error )
}
func NewCommonMsgDatabase ( msgDocModel unRelationTb . MsgDocModelInterface , cacheModel cache . MsgModel , msgMyqModel relation . ChatLogModelInterface ) CommonMsgDatabase {
@ -167,32 +142,16 @@ func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sd
return err
}
func ( db * commonMsgDatabase ) MsgToModifyMQ (
ctx context . Context ,
key , conversationID string ,
messages [ ] * sdkws . MsgData ,
) error {
func ( db * commonMsgDatabase ) MsgToModifyMQ ( ctx context . Context , key , conversationID string , messages [ ] * sdkws . MsgData ) error {
if len ( messages ) > 0 {
_ , _ , err := db . producerToModify . SendMessage (
ctx ,
key ,
& pbMsg . MsgDataToModifyByMQ { ConversationID : conversationID , Messages : messages } ,
)
_ , _ , err := db . producerToModify . SendMessage ( ctx , key , & pbMsg . MsgDataToModifyByMQ { ConversationID : conversationID , Messages : messages } )
return err
}
return nil
}
func ( db * commonMsgDatabase ) MsgToPushMQ (
ctx context . Context ,
key , conversationID string ,
msg2mq * sdkws . MsgData ,
) ( int32 , int64 , error ) {
partition , offset , err := db . producerToPush . SendMessage (
ctx ,
key ,
& pbMsg . PushMsgDataToMQ { MsgData : msg2mq , ConversationID : conversationID } ,
)
func ( db * commonMsgDatabase ) MsgToPushMQ ( ctx context . Context , key , conversationID string , msg2mq * sdkws . MsgData ) ( int32 , int64 , error ) {
partition , offset , err := db . producerToPush . SendMessage ( ctx , key , & pbMsg . PushMsgDataToMQ { MsgData : msg2mq , ConversationID : conversationID } )
if err != nil {
log . ZError ( ctx , "MsgToPushMQ" , err , "key" , key , "msg2mq" , msg2mq )
return 0 , 0 , err
@ -200,30 +159,15 @@ func (db *commonMsgDatabase) MsgToPushMQ(
return partition , offset , nil
}
func ( db * commonMsgDatabase ) MsgToMongoMQ (
ctx context . Context ,
key , conversationID string ,
messages [ ] * sdkws . MsgData ,
lastSeq int64 ,
) error {
func ( db * commonMsgDatabase ) MsgToMongoMQ ( ctx context . Context , key , conversationID string , messages [ ] * sdkws . MsgData , lastSeq int64 ) error {
if len ( messages ) > 0 {
_ , _ , err := db . producerToMongo . SendMessage (
ctx ,
key ,
& pbMsg . MsgDataToMongoByMQ { LastSeq : lastSeq , ConversationID : conversationID , MsgData : messages } ,
)
_ , _ , err := db . producerToMongo . SendMessage ( ctx , key , & pbMsg . MsgDataToMongoByMQ { LastSeq : lastSeq , ConversationID : conversationID , MsgData : messages } )
return err
}
return nil
}
func ( db * commonMsgDatabase ) BatchInsertBlock (
ctx context . Context ,
conversationID string ,
fields [ ] any ,
key int8 ,
firstSeq int64 ,
) error {
func ( db * commonMsgDatabase ) BatchInsertBlock ( ctx context . Context , conversationID string , fields [ ] any , key int8 , firstSeq int64 ) error {
if len ( fields ) == 0 {
return nil
}
@ -324,12 +268,7 @@ func (db *commonMsgDatabase) BatchInsertBlock(
return nil
}
func ( db * commonMsgDatabase ) BatchInsertChat2DB (
ctx context . Context ,
conversationID string ,
msgList [ ] * sdkws . MsgData ,
currentMaxSeq int64 ,
) error {
func ( db * commonMsgDatabase ) BatchInsertChat2DB ( ctx context . Context , conversationID string , msgList [ ] * sdkws . MsgData , currentMaxSeq int64 ) error {
if len ( msgList ) == 0 {
return errs . ErrArgs . Wrap ( "msgList is empty" )
}
@ -375,21 +314,11 @@ func (db *commonMsgDatabase) BatchInsertChat2DB(
return db . BatchInsertBlock ( ctx , conversationID , msgs , updateKeyMsg , msgList [ 0 ] . Seq )
}
func ( db * commonMsgDatabase ) RevokeMsg (
ctx context . Context ,
conversationID string ,
seq int64 ,
revoke * unRelationTb . RevokeModel ,
) error {
func ( db * commonMsgDatabase ) RevokeMsg ( ctx context . Context , conversationID string , seq int64 , revoke * unRelationTb . RevokeModel ) error {
return db . BatchInsertBlock ( ctx , conversationID , [ ] any { revoke } , updateKeyRevoke , seq )
}
func ( db * commonMsgDatabase ) MarkSingleChatMsgsAsRead (
ctx context . Context ,
userID string ,
conversationID string ,
totalSeqs [ ] int64 ,
) error {
func ( db * commonMsgDatabase ) MarkSingleChatMsgsAsRead ( ctx context . Context , userID string , conversationID string , totalSeqs [ ] int64 ) error {
for docID , seqs := range db . msg . GetDocIDSeqsMap ( conversationID , totalSeqs ) {
var indexes [ ] int64
for _ , seq := range seqs {
@ -412,11 +341,7 @@ func (db *commonMsgDatabase) DelUserDeleteMsgsList(ctx context.Context, conversa
db . cache . DelUserDeleteMsgsList ( ctx , conversationID , seqs )
}
func ( db * commonMsgDatabase ) BatchInsertChat2Cache (
ctx context . Context ,
conversationID string ,
msgs [ ] * sdkws . MsgData ,
) ( seq int64 , isNew bool , err error ) {
func ( db * commonMsgDatabase ) BatchInsertChat2Cache ( ctx context . Context , conversationID string , msgs [ ] * sdkws . MsgData ) ( seq int64 , isNew bool , err error ) {
currentMaxSeq , err := db . cache . GetMaxSeq ( ctx , conversationID )
if err != nil && errs . Unwrap ( err ) != redis . Nil {
prome . Inc ( prome . SeqGetFailedCounter )
@ -463,11 +388,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(
return lastMaxSeq , isNew , utils . Wrap ( err , "" )
}
func ( db * commonMsgDatabase ) getMsgBySeqs (
ctx context . Context ,
userID , conversationID string ,
seqs [ ] int64 ,
) ( totalMsgs [ ] * sdkws . MsgData , err error ) {
func ( db * commonMsgDatabase ) getMsgBySeqs ( ctx context . Context , userID , conversationID string , seqs [ ] int64 ) ( totalMsgs [ ] * sdkws . MsgData , err error ) {
for docID , seqs := range db . msg . GetDocIDSeqsMap ( conversationID , seqs ) {
//log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs)
msgs , err := db . findMsgInfoBySeq ( ctx , userID , docID , seqs )
@ -481,11 +402,7 @@ func (db *commonMsgDatabase) getMsgBySeqs(
return totalMsgs , nil
}
func ( db * commonMsgDatabase ) findMsgInfoBySeq (
ctx context . Context ,
userID , docID string ,
seqs [ ] int64 ,
) ( totalMsgs [ ] * unRelationTb . MsgInfoModel , err error ) {
func ( db * commonMsgDatabase ) findMsgInfoBySeq ( ctx context . Context , userID , docID string , seqs [ ] int64 ) ( totalMsgs [ ] * unRelationTb . MsgInfoModel , err error ) {
msgs , err := db . msgDocDatabase . GetMsgBySeqIndexIn1Doc ( ctx , userID , docID , seqs )
for _ , msg := range msgs {
if msg . IsRead {
@ -495,25 +412,8 @@ func (db *commonMsgDatabase) findMsgInfoBySeq(
return msgs , err
}
func ( db * commonMsgDatabase ) getMsgBySeqsRange (
ctx context . Context ,
userID string ,
conversationID string ,
allSeqs [ ] int64 ,
begin , end int64 ,
) ( seqMsgs [ ] * sdkws . MsgData , err error ) {
log . ZDebug (
ctx ,
"getMsgBySeqsRange" ,
"conversationID" ,
conversationID ,
"allSeqs" ,
allSeqs ,
"begin" ,
begin ,
"end" ,
end ,
)
func ( db * commonMsgDatabase ) getMsgBySeqsRange ( ctx context . Context , userID string , conversationID string , allSeqs [ ] int64 , begin , end int64 ) ( seqMsgs [ ] * sdkws . MsgData , err error ) {
log . ZDebug ( ctx , "getMsgBySeqsRange" , "conversationID" , conversationID , "allSeqs" , allSeqs , "begin" , begin , "end" , end )
for docID , seqs := range db . msg . GetDocIDSeqsMap ( conversationID , allSeqs ) {
log . ZDebug ( ctx , "getMsgBySeqsRange" , "docID" , docID , "seqs" , seqs )
msgs , err := db . findMsgInfoBySeq ( ctx , userID , docID , seqs )
@ -530,12 +430,7 @@ func (db *commonMsgDatabase) getMsgBySeqsRange(
return seqMsgs , nil
}
func ( db * commonMsgDatabase ) GetMsgBySeqsRange (
ctx context . Context ,
userID string ,
conversationID string ,
begin , end , num , userMaxSeq int64 ,
) ( int64 , int64 , [ ] * sdkws . MsgData , error ) {
func ( db * commonMsgDatabase ) GetMsgBySeqsRange ( ctx context . Context , userID string , conversationID string , begin , end , num , userMaxSeq int64 ) ( int64 , int64 , [ ] * sdkws . MsgData , error ) {
userMinSeq , err := db . cache . GetConversationUserMinSeq ( ctx , conversationID , userID )
if err != nil && errs . Unwrap ( err ) != redis . Nil {
return 0 , 0 , nil , err
@ -555,18 +450,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(
if err != nil && errs . Unwrap ( err ) != redis . Nil {
return 0 , 0 , nil , err
}
log . ZDebug (
ctx ,
"GetMsgBySeqsRange" ,
"userMinSeq" ,
userMinSeq ,
"conMinSeq" ,
minSeq ,
"conMaxSeq" ,
maxSeq ,
"userMaxSeq" ,
userMaxSeq ,
)
log . ZDebug ( ctx , "GetMsgBySeqsRange" , "userMinSeq" , userMinSeq , "conMinSeq" , minSeq , "conMaxSeq" , maxSeq , "userMaxSeq" , userMaxSeq )
if userMaxSeq != 0 {
if userMaxSeq < maxSeq {
maxSeq = userMaxSeq
@ -616,18 +500,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(
cacheDelNum += 1
}
}
log . ZDebug (
ctx ,
"get delSeqs from redis" ,
"delSeqs" ,
delSeqs ,
"userID" ,
userID ,
"conversationID" ,
conversationID ,
"cacheDelNum" ,
cacheDelNum ,
)
log . ZDebug ( ctx , "get delSeqs from redis" , "delSeqs" , delSeqs , "userID" , userID , "conversationID" , conversationID , "cacheDelNum" , cacheDelNum )
var reGetSeqsCache [ ] int64
for i := 1 ; i <= cacheDelNum ; {
newSeq := newBegin - int64 ( i )
@ -647,15 +520,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(
if err != nil {
if err != redis . Nil {
prome . Add ( prome . MsgPullFromRedisFailedCounter , len ( failedSeqs2 ) )
log . ZError (
ctx ,
"get message from redis exception" ,
err ,
"conversationID" ,
conversationID ,
"seqs" ,
reGetSeqsCache ,
)
log . ZError ( ctx , "get message from redis exception" , err , "conversationID" , conversationID , "seqs" , reGetSeqsCache )
}
}
failedSeqs = append ( failedSeqs , failedSeqs2 ... )
@ -681,12 +546,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(
return minSeq , maxSeq , successMsgs , nil
}
func ( db * commonMsgDatabase ) GetMsgBySeqs (
ctx context . Context ,
userID string ,
conversationID string ,
seqs [ ] int64 ,
) ( int64 , int64 , [ ] * sdkws . MsgData , error ) {
func ( db * commonMsgDatabase ) GetMsgBySeqs ( ctx context . Context , userID string , conversationID string , seqs [ ] int64 ) ( int64 , int64 , [ ] * sdkws . MsgData , error ) {
userMinSeq , err := db . cache . GetConversationUserMinSeq ( ctx , conversationID , userID )
if err != nil && errs . Unwrap ( err ) != redis . Nil {
return 0 , 0 , nil , err
@ -712,33 +572,10 @@ func (db *commonMsgDatabase) GetMsgBySeqs(
if err != nil {
if err != redis . Nil {
prome . Add ( prome . MsgPullFromRedisFailedCounter , len ( failedSeqs ) )
log . ZError (
ctx ,
"get message from redis exception" ,
err ,
"failedSeqs" ,
failedSeqs ,
"conversationID" ,
conversationID ,
)
log . ZError ( ctx , "get message from redis exception" , err , "failedSeqs" , failedSeqs , "conversationID" , conversationID )
}
}
log . ZInfo (
ctx ,
"db.cache.GetMessagesBySeq" ,
"userID" ,
userID ,
"conversationID" ,
conversationID ,
"seqs" ,
seqs ,
"successMsgs" ,
len ( successMsgs ) ,
"failedSeqs" ,
failedSeqs ,
"conversationID" ,
conversationID ,
)
log . ZInfo ( ctx , "db.cache.GetMessagesBySeq" , "userID" , userID , "conversationID" , conversationID , "seqs" , seqs , "successMsgs" , len ( successMsgs ) , "failedSeqs" , failedSeqs , "conversationID" , conversationID )
prome . Add ( prome . MsgPullFromRedisSuccessCounter , len ( successMsgs ) )
if len ( failedSeqs ) > 0 {
mongoMsgs , err := db . getMsgBySeqs ( ctx , userID , conversationID , failedSeqs )
@ -752,11 +589,7 @@ func (db *commonMsgDatabase) GetMsgBySeqs(
return minSeq , maxSeq , successMsgs , nil
}
func ( db * commonMsgDatabase ) DeleteConversationMsgsAndSetMinSeq (
ctx context . Context ,
conversationID string ,
remainTime int64 ,
) error {
func ( db * commonMsgDatabase ) DeleteConversationMsgsAndSetMinSeq ( ctx context . Context , conversationID string , remainTime int64 ) error {
var delStruct delMsgRecursionStruct
var skip int64
minSeq , err := db . deleteMsgRecursion ( ctx , conversationID , skip , & delStruct , remainTime )
@ -776,13 +609,7 @@ func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(
return db . cache . SetMinSeq ( ctx , conversationID , minSeq )
}
func ( db * commonMsgDatabase ) UserMsgsDestruct (
ctx context . Context ,
userID string ,
conversationID string ,
destructTime int64 ,
lastMsgDestructTime time . Time ,
) ( seqs [ ] int64 , err error ) {
func ( db * commonMsgDatabase ) UserMsgsDestruct ( ctx context . Context , userID string , conversationID string , destructTime int64 , lastMsgDestructTime time . Time ) ( seqs [ ] int64 , err error ) {
var index int64
for {
// from oldest 2 newest
@ -790,16 +617,7 @@ func (db *commonMsgDatabase) UserMsgsDestruct(
if err != nil || msgDocModel . DocID == "" {
if err != nil {
if err == unrelation . ErrMsgListNotExist {
log . ZDebug (
ctx ,
"deleteMsgRecursion finished" ,
"conversationID" ,
conversationID ,
"userID" ,
userID ,
"index" ,
index ,
)
log . ZDebug ( ctx , "deleteMsgRecursion finished" , "conversationID" , conversationID , "userID" , userID , "index" , index )
} else {
log . ZError ( ctx , "deleteMsgRecursion GetUserMsgListByIndex failed" , err , "conversationID" , conversationID , "index" , index )
}
@ -848,26 +666,13 @@ func (d *delMsgRecursionStruct) getSetMinSeq() int64 {
// seq 70
// set minSeq 21
// recursion 删除list并且返回设置的最小seq
func ( db * commonMsgDatabase ) deleteMsgRecursion (
ctx context . Context ,
conversationID string ,
index int64 ,
delStruct * delMsgRecursionStruct ,
remainTime int64 ,
) ( int64 , error ) {
func ( db * commonMsgDatabase ) deleteMsgRecursion ( ctx context . Context , conversationID string , index int64 , delStruct * delMsgRecursionStruct , remainTime int64 ) ( int64 , error ) {
// find from oldest list
msgDocModel , err := db . msgDocDatabase . GetMsgDocModelByIndex ( ctx , conversationID , index , 1 )
if err != nil || msgDocModel . DocID == "" {
if err != nil {
if err == unrelation . ErrMsgListNotExist {
log . ZDebug (
ctx ,
"deleteMsgRecursion ErrMsgListNotExist" ,
"conversationID" ,
conversationID ,
"index:" ,
index ,
)
log . ZDebug ( ctx , "deleteMsgRecursion ErrMsgListNotExist" , "conversationID" , conversationID , "index:" , index )
} else {
log . ZError ( ctx , "deleteMsgRecursion GetUserMsgListByIndex failed" , err , "conversationID" , conversationID , "index" , index )
}
@ -879,23 +684,11 @@ func (db *commonMsgDatabase) deleteMsgRecursion(
}
return delStruct . getSetMinSeq ( ) + 1 , nil
}
log . ZDebug (
ctx ,
"doc info" ,
"conversationID" ,
conversationID ,
"index" ,
index ,
"docID" ,
msgDocModel . DocID ,
"len" ,
len ( msgDocModel . Msg ) ,
)
log . ZDebug ( ctx , "doc info" , "conversationID" , conversationID , "index" , index , "docID" , msgDocModel . DocID , "len" , len ( msgDocModel . Msg ) )
if int64 ( len ( msgDocModel . Msg ) ) > db . msg . GetSingleGocMsgNum ( ) {
log . ZWarn ( ctx , "msgs too large" , nil , "lenth" , len ( msgDocModel . Msg ) , "docID:" , msgDocModel . DocID )
}
if msgDocModel . IsFull ( ) &&
msgDocModel . Msg [ len ( msgDocModel . Msg ) - 1 ] . Msg . SendTime + ( remainTime * 1000 ) < utils . GetCurrentTimestampByMill ( ) {
if msgDocModel . IsFull ( ) && msgDocModel . Msg [ len ( msgDocModel . Msg ) - 1 ] . Msg . SendTime + ( remainTime * 1000 ) < utils . GetCurrentTimestampByMill ( ) {
log . ZDebug ( ctx , "doc is full and all msg is expired" , "docID" , msgDocModel . DocID )
delStruct . delDocIDs = append ( delStruct . delDocIDs , msgDocModel . DocID )
delStruct . minSeq = msgDocModel . Msg [ len ( msgDocModel . Msg ) - 1 ] . Msg . Seq
@ -932,11 +725,7 @@ func (db *commonMsgDatabase) deleteMsgRecursion(
return seq , err
}
func ( db * commonMsgDatabase ) DeleteMsgsPhysicalBySeqs (
ctx context . Context ,
conversationID string ,
allSeqs [ ] int64 ,
) error {
func ( db * commonMsgDatabase ) DeleteMsgsPhysicalBySeqs ( ctx context . Context , conversationID string , allSeqs [ ] int64 ) error {
if err := db . cache . DeleteMessages ( ctx , conversationID , allSeqs ) ; err != nil {
return err
}
@ -952,12 +741,7 @@ func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(
return nil
}
func ( db * commonMsgDatabase ) DeleteUserMsgsBySeqs (
ctx context . Context ,
userID string ,
conversationID string ,
seqs [ ] int64 ,
) error {
func ( db * commonMsgDatabase ) DeleteUserMsgsBySeqs ( ctx context . Context , userID string , conversationID string , seqs [ ] int64 ) error {
cachedMsgs , _ , err := db . cache . GetMessagesBySeq ( ctx , conversationID , seqs )
if err != nil && errs . Unwrap ( err ) != redis . Nil {
log . ZWarn ( ctx , "DeleteUserMsgsBySeqs" , err , "conversationID" , conversationID , "seqs" , seqs )
@ -1026,70 +810,31 @@ func (db *commonMsgDatabase) GetMinSeqs(ctx context.Context, conversationIDs []s
func ( db * commonMsgDatabase ) GetMinSeq ( ctx context . Context , conversationID string ) ( int64 , error ) {
return db . cache . GetMinSeq ( ctx , conversationID )
}
func ( db * commonMsgDatabase ) GetConversationUserMinSeq (
ctx context . Context ,
conversationID string ,
userID string ,
) ( int64 , error ) {
func ( db * commonMsgDatabase ) GetConversationUserMinSeq ( ctx context . Context , conversationID string , userID string ) ( int64 , error ) {
return db . cache . GetConversationUserMinSeq ( ctx , conversationID , userID )
}
func ( db * commonMsgDatabase ) GetConversationUserMinSeqs (
ctx context . Context ,
conversationID string ,
userIDs [ ] string ,
) ( map [ string ] int64 , error ) {
func ( db * commonMsgDatabase ) GetConversationUserMinSeqs ( ctx context . Context , conversationID string , userIDs [ ] string ) ( map [ string ] int64 , error ) {
return db . cache . GetConversationUserMinSeqs ( ctx , conversationID , userIDs )
}
func ( db * commonMsgDatabase ) SetConversationUserMinSeq (
ctx context . Context ,
conversationID string ,
userID string ,
minSeq int64 ,
) error {
func ( db * commonMsgDatabase ) SetConversationUserMinSeq ( ctx context . Context , conversationID string , userID string , minSeq int64 ) error {
return db . cache . SetConversationUserMinSeq ( ctx , conversationID , userID , minSeq )
}
func ( db * commonMsgDatabase ) SetConversationUserMinSeqs (
ctx context . Context ,
conversationID string ,
seqs map [ string ] int64 ,
) ( err error ) {
func ( db * commonMsgDatabase ) SetConversationUserMinSeqs ( ctx context . Context , conversationID string , seqs map [ string ] int64 ) ( err error ) {
return db . cache . SetConversationUserMinSeqs ( ctx , conversationID , seqs )
}
func ( db * commonMsgDatabase ) SetUserConversationsMinSeqs (
ctx context . Context ,
userID string ,
seqs map [ string ] int64 ,
) error {
func ( db * commonMsgDatabase ) SetUserConversationsMinSeqs ( ctx context . Context , userID string , seqs map [ string ] int64 ) error {
return db . cache . SetUserConversationsMinSeqs ( ctx , userID , seqs )
}
func ( db * commonMsgDatabase ) UserSetHasReadSeqs (
ctx context . Context ,
userID string ,
hasReadSeqs map [ string ] int64 ,
) error {
func ( db * commonMsgDatabase ) UserSetHasReadSeqs ( ctx context . Context , userID string , hasReadSeqs map [ string ] int64 ) error {
return db . cache . UserSetHasReadSeqs ( ctx , userID , hasReadSeqs )
}
func ( db * commonMsgDatabase ) SetHasReadSeq (
ctx context . Context ,
userID string ,
conversationID string ,
hasReadSeq int64 ,
) error {
func ( db * commonMsgDatabase ) SetHasReadSeq ( ctx context . Context , userID string , conversationID string , hasReadSeq int64 ) error {
return db . cache . SetHasReadSeq ( ctx , userID , conversationID , hasReadSeq )
}
func ( db * commonMsgDatabase ) GetHasReadSeqs (
ctx context . Context ,
userID string ,
conversationIDs [ ] string ,
) ( map [ string ] int64 , error ) {
func ( db * commonMsgDatabase ) GetHasReadSeqs ( ctx context . Context , userID string , conversationIDs [ ] string ) ( map [ string ] int64 , error ) {
return db . cache . GetHasReadSeqs ( ctx , userID , conversationIDs )
}
@ -1105,10 +850,7 @@ func (db *commonMsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (i
return db . cache . GetSendMsgStatus ( ctx , id )
}
func ( db * commonMsgDatabase ) GetConversationMinMaxSeqInMongoAndCache (
ctx context . Context ,
conversationID string ,
) ( minSeqMongo , maxSeqMongo , minSeqCache , maxSeqCache int64 , err error ) {
func ( db * commonMsgDatabase ) GetConversationMinMaxSeqInMongoAndCache ( ctx context . Context , conversationID string ) ( minSeqMongo , maxSeqMongo , minSeqCache , maxSeqCache int64 , err error ) {
minSeqMongo , maxSeqMongo , err = db . GetMinMaxSeqMongo ( ctx , conversationID )
if err != nil {
return
@ -1124,17 +866,11 @@ func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(
return
}
func ( db * commonMsgDatabase ) GetMongoMaxAndMinSeq (
ctx context . Context ,
conversationID string ,
) ( minSeqMongo , maxSeqMongo int64 , err error ) {
func ( db * commonMsgDatabase ) GetMongoMaxAndMinSeq ( ctx context . Context , conversationID string ) ( minSeqMongo , maxSeqMongo int64 , err error ) {
return db . GetMinMaxSeqMongo ( ctx , conversationID )
}
func ( db * commonMsgDatabase ) GetMinMaxSeqMongo (
ctx context . Context ,
conversationID string ,
) ( minSeqMongo , maxSeqMongo int64 , err error ) {
func ( db * commonMsgDatabase ) GetMinMaxSeqMongo ( ctx context . Context , conversationID string ) ( minSeqMongo , maxSeqMongo int64 , err error ) {
oldestMsgMongo , err := db . msgDocDatabase . GetOldestMsg ( ctx , conversationID )
if err != nil {
return
@ -1148,26 +884,11 @@ func (db *commonMsgDatabase) GetMinMaxSeqMongo(
return
}
func ( db * commonMsgDatabase ) RangeUserSendCount (
ctx context . Context ,
start time . Time ,
end time . Time ,
group bool ,
ase bool ,
pageNumber int32 ,
showNumber int32 ,
) ( msgCount int64 , userCount int64 , users [ ] * unRelationTb . UserCount , dateCount map [ string ] int64 , err error ) {
func ( db * commonMsgDatabase ) RangeUserSendCount ( ctx context . Context , start time . Time , end time . Time , group bool , ase bool , pageNumber int32 , showNumber int32 ) ( msgCount int64 , userCount int64 , users [ ] * unRelationTb . UserCount , dateCount map [ string ] int64 , err error ) {
return db . msgDocDatabase . RangeUserSendCount ( ctx , start , end , group , ase , pageNumber , showNumber )
}
func ( db * commonMsgDatabase ) RangeGroupSendCount (
ctx context . Context ,
start time . Time ,
end time . Time ,
ase bool ,
pageNumber int32 ,
showNumber int32 ,
) ( msgCount int64 , userCount int64 , groups [ ] * unRelationTb . GroupCount , dateCount map [ string ] int64 , err error ) {
func ( db * commonMsgDatabase ) RangeGroupSendCount ( ctx context . Context , start time . Time , end time . Time , ase bool , pageNumber int32 , showNumber int32 ) ( msgCount int64 , userCount int64 , groups [ ] * unRelationTb . GroupCount , dateCount map [ string ] int64 , err error ) {
return db . msgDocDatabase . RangeGroupSendCount ( ctx , start , end , ase , pageNumber , showNumber )
}