@ -24,6 +24,9 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/redis/go-redis/v9"
"go.mongodb.org/mongo-driver/mongo"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
@ -35,8 +38,6 @@ import (
"github.com/openimsdk/tools/mq/kafka"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/timeutil"
"github.com/redis/go-redis/v9"
"go.mongodb.org/mongo-driver/mongo"
)
const (
@ -56,9 +57,10 @@ type CommonMsgDatabase interface {
GetMsgBySeqs ( ctx context . Context , userID string , conversationID string , seqs [ ] int64 ) ( minSeq int64 , maxSeq int64 , seqMsg [ ] * sdkws . MsgData , err error )
// DeleteConversationMsgsAndSetMinSeq deletes conversation messages and resets the minimum sequence number. If `remainTime` is 0, all messages are deleted (this method does not delete Redis
// cache).
GetMessagesBySeqWithBounds ( ctx context . Context , userID string , conversationID string , seqs [ ] int64 , pullOrder sdkws . PullOrder ) ( bool , int64 , [ ] * sdkws . MsgData , error )
DeleteConversationMsgsAndSetMinSeq ( ctx context . Context , conversationID string , remainTime int64 ) error
// UserMsgsDestruct marks messages for deletion based on destruct time and returns a list of sequence numbers for marked messages.
UserMsgsDestruct ( ctx context . Context , userID string , conversationID string , destructTime int64 , lastMsgDestruct Time time . Time ) ( seqs [ ] int64 , err error )
// ClearUserMsgs marks messages for deletion based on clear time and returns a list of sequence numbers for marked messages.
Clear UserMsgs( ctx context . Context , userID string , conversationID string , clearTime int64 , lastMsgClear Time time . Time ) ( seqs [ ] int64 , err error )
// DeleteUserMsgsBySeqs allows a user to delete messages based on sequence numbers.
DeleteUserMsgsBySeqs ( ctx context . Context , userID string , conversationID string , seqs [ ] int64 ) error
// DeleteMsgsPhysicalBySeqs physically deletes messages by emptying them based on sequence numbers.
@ -92,11 +94,14 @@ type CommonMsgDatabase interface {
RangeGroupSendCount ( ctx context . Context , start time . Time , end time . Time , ase bool , pageNumber int32 , showNumber int32 ) ( msgCount int64 , userCount int64 , groups [ ] * model . GroupCount , dateCount map [ string ] int64 , err error )
ConvertMsgsDocLen ( ctx context . Context , conversationIDs [ ] string )
// clear msg
// get Msg when destruct msg before
GetBeforeMsg ( ctx context . Context , ts int64 , docIds [ ] string , limit int ) ( [ ] * model . MsgDocModel , error )
DeleteDocMsgBefore ( ctx context . Context , ts int64 , doc * model . MsgDocModel ) ( [ ] int , error )
GetDocIDs ( ctx context . Context ) ( [ ] string , error )
SetUserConversationsMaxSeq ( ctx context . Context , conversationID string , userID string , seq int64 ) error
SetUserConversationsMinSeq ( ctx context . Context , conversationID string , userID string , seq int64 ) error
}
func NewCommonMsgDatabase ( msgDocModel database . Msg , msg cache . MsgCache , seqUser cache . SeqUser , seqConversation cache . SeqConversationCache , kafkaConf * config . Kafka ) ( CommonMsgDatabase , error ) {
@ -490,8 +495,8 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co
}
successMsgs , failedSeqs , err := db . msg . GetMessagesBySeq ( ctx , conversationID , newSeqs )
if err != nil {
if errors . Is ( err , redis . Nil ) {
log . Z Error ( ctx , "get message from redis exception" , err , "failedSeqs" , failedSeqs , "conversationID" , conversationID )
if ! errors . Is ( err , redis . Nil ) {
log . Z Warn ( ctx , "get message from redis exception" , err , "failedSeqs" , failedSeqs , "conversationID" , conversationID )
}
}
log . ZDebug ( ctx , "db.seq.GetMessagesBySeq" , "userID" , userID , "conversationID" , conversationID , "seqs" ,
@ -514,6 +519,81 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co
return minSeq , maxSeq , successMsgs , nil
}
func ( db * commonMsgDatabase ) GetMessagesBySeqWithBounds ( ctx context . Context , userID string , conversationID string , seqs [ ] int64 , pullOrder sdkws . PullOrder ) ( bool , int64 , [ ] * sdkws . MsgData , error ) {
var endSeq int64
var isEnd bool
userMinSeq , err := db . seqUser . GetUserMinSeq ( ctx , conversationID , userID )
if err != nil {
return false , 0 , nil , err
}
minSeq , err := db . seqConversation . GetMinSeq ( ctx , conversationID )
if err != nil {
return false , 0 , nil , err
}
maxSeq , err := db . seqConversation . GetMaxSeq ( ctx , conversationID )
if err != nil {
return false , 0 , nil , err
}
userMaxSeq , err := db . seqUser . GetUserMaxSeq ( ctx , conversationID , userID )
if err != nil {
return false , 0 , nil , err
}
if userMinSeq > minSeq {
minSeq = userMinSeq
}
if userMaxSeq > 0 && userMaxSeq < maxSeq {
maxSeq = userMaxSeq
}
newSeqs := make ( [ ] int64 , 0 , len ( seqs ) )
for _ , seq := range seqs {
if seq <= 0 {
continue
}
// The normal range and can fetch messages
if seq >= minSeq && seq <= maxSeq {
newSeqs = append ( newSeqs , seq )
continue
}
// If the requested seq is smaller than the minimum seq and the pull order is descending (pulling older messages)
if seq < minSeq && pullOrder == sdkws . PullOrder_PullOrderDesc {
isEnd = true
endSeq = minSeq
}
// If the requested seq is larger than the maximum seq and the pull order is ascending (pulling newer messages)
if seq > maxSeq && pullOrder == sdkws . PullOrder_PullOrderAsc {
isEnd = true
endSeq = maxSeq
}
}
if len ( newSeqs ) == 0 {
return isEnd , endSeq , nil , nil
}
successMsgs , failedSeqs , err := db . msg . GetMessagesBySeq ( ctx , conversationID , newSeqs )
if err != nil {
if ! errors . Is ( err , redis . Nil ) {
log . ZWarn ( ctx , "get message from redis exception" , err , "failedSeqs" , failedSeqs , "conversationID" , conversationID )
}
}
log . ZDebug ( ctx , "db.seq.GetMessagesBySeq" , "userID" , userID , "conversationID" , conversationID , "seqs" ,
seqs , "len(successMsgs)" , len ( successMsgs ) , "failedSeqs" , failedSeqs )
if len ( failedSeqs ) > 0 {
mongoMsgs , err := db . getMsgBySeqs ( ctx , userID , conversationID , failedSeqs )
if err != nil {
return false , 0 , nil , err
}
successMsgs = append ( successMsgs , mongoMsgs ... )
//_, err = db.msg.SetMessagesToCache(ctx, conversationID, mongoMsgs)
//if err != nil {
// return 0, 0, nil, err
//}
}
return isEnd , endSeq , successMsgs , nil
}
func ( db * commonMsgDatabase ) DeleteConversationMsgsAndSetMinSeq ( ctx context . Context , conversationID string , remainTime int64 ) error {
var delStruct delMsgRecursionStruct
var skip int64
@ -528,10 +608,10 @@ func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Cont
return db . seqConversation . SetMinSeq ( ctx , conversationID , minSeq )
}
func ( db * commonMsgDatabase ) UserMsgsDestruct ( ctx context . Context , userID string , conversationID string , destructTime int64 , lastMsgDestruct Time time . Time ) ( seqs [ ] int64 , err error ) {
func ( db * commonMsgDatabase ) Clear UserMsgs( ctx context . Context , userID string , conversationID string , clearTime int64 , lastMsgClear Time time . Time ) ( seqs [ ] int64 , err error ) {
var index int64
for {
// from oldest 2 newest
// from oldest 2 newest , ASC
msgDocModel , err := db . msgDocDatabase . GetMsgDocModelByIndex ( ctx , conversationID , index , 1 )
if err != nil || msgDocModel . DocID == "" {
if err != nil {
@ -544,15 +624,19 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
// If an error is reported, or the error cannot be obtained, it is physically deleted and seq delMongoMsgsPhysical(delStruct.delDocIDList) is returned to end the recursion
break
}
index ++
// && msgDocModel.Msg[0].Msg.SendTime > lastMsgDestructTime.UnixMilli()
// && msgDocModel.Msg[0].Msg.SendTime > lastMsgClearTime.UnixMilli()
if len ( msgDocModel . Msg ) > 0 {
i := 0
var over bool
for _ , msg := range msgDocModel . Msg {
i ++
if msg != nil && msg . Msg != nil && msg . Msg . SendTime + destructTime * 1000 <= time . Now ( ) . UnixMilli ( ) {
if msg . Msg . SendTime + destructTime * 1000 > lastMsgDestructTime . UnixMilli ( ) && ! datautil . Contain ( userID , msg . DelList ... ) {
// over clear time, need to clear
if msg != nil && msg . Msg != nil && msg . Msg . SendTime + clearTime * 1000 <= time . Now ( ) . UnixMilli ( ) {
// if msg is not in del list, add to del list
if msg . Msg . SendTime + clearTime * 1000 > lastMsgClearTime . UnixMilli ( ) && ! datautil . Contain ( userID , msg . DelList ... ) {
seqs = append ( seqs , msg . Msg . Seq )
}
} else {
@ -567,13 +651,18 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
}
}
log . ZDebug ( ctx , "UserMsgsDestruct" , "conversationID" , conversationID , "userID" , userID , "seqs" , seqs )
log . ZDebug ( ctx , "ClearUserMsgs" , "conversationID" , conversationID , "userID" , userID , "seqs" , seqs )
// have msg need to destruct
if len ( seqs ) > 0 {
userMinSeq := seqs [ len ( seqs ) - 1 ] + 1
currentUserMinSeq , err := db . seqUser . GetUserMinSeq ( ctx , conversationID , userID )
// update min seq to clear after
userMinSeq := seqs [ len ( seqs ) - 1 ] + 1 // user min seq when clear after
currentUserMinSeq , err := db . seqUser . GetUserMinSeq ( ctx , conversationID , userID ) // user min seq when clear before
if err != nil {
return nil , err
}
// if before < after, update min seq
if currentUserMinSeq < userMinSeq {
if err := db . seqUser . SetUserMinSeq ( ctx , conversationID , userID , userMinSeq ) ; err != nil {
return nil , err
@ -693,6 +782,14 @@ func (db *commonMsgDatabase) SetUserConversationsMinSeqs(ctx context.Context, us
return db . seqUser . SetUserMinSeqs ( ctx , userID , seqs )
}
func ( db * commonMsgDatabase ) SetUserConversationsMaxSeq ( ctx context . Context , conversationID string , userID string , seq int64 ) error {
return db . seqUser . SetUserMaxSeq ( ctx , conversationID , userID , seq )
}
func ( db * commonMsgDatabase ) SetUserConversationsMinSeq ( ctx context . Context , conversationID string , userID string , seq int64 ) error {
return db . seqUser . SetUserMinSeq ( ctx , conversationID , userID , seq )
}
func ( db * commonMsgDatabase ) UserSetHasReadSeqs ( ctx context . Context , userID string , hasReadSeqs map [ string ] int64 ) error {
return db . seqUser . SetUserReadSeqs ( ctx , userID , hasReadSeqs )
}