@ -24,6 +24,9 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/redis/go-redis/v9"
"go.mongodb.org/mongo-driver/mongo"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
@ -35,8 +38,6 @@ import (
"github.com/openimsdk/tools/mq/kafka"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/timeutil"
"github.com/redis/go-redis/v9"
"go.mongodb.org/mongo-driver/mongo"
)
const (
@ -56,6 +57,7 @@ type CommonMsgDatabase interface {
GetMsgBySeqs ( ctx context . Context , userID string , conversationID string , seqs [ ] int64 ) ( minSeq int64 , maxSeq int64 , seqMsg [ ] * sdkws . MsgData , err error )
// DeleteConversationMsgsAndSetMinSeq deletes conversation messages and resets the minimum sequence number. If `remainTime` is 0, all messages are deleted (this method does not delete Redis
// cache).
GetMessagesBySeqWithBounds ( ctx context . Context , userID string , conversationID string , seqs [ ] int64 , pullOrder sdkws . PullOrder ) ( bool , int64 , [ ] * sdkws . MsgData , error )
DeleteConversationMsgsAndSetMinSeq ( ctx context . Context , conversationID string , remainTime int64 ) error
// ClearUserMsgs marks messages for deletion based on clear time and returns a list of sequence numbers for marked messages.
ClearUserMsgs ( ctx context . Context , userID string , conversationID string , clearTime int64 , lastMsgClearTime time . Time ) ( seqs [ ] int64 , err error )
@ -517,6 +519,81 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co
return minSeq , maxSeq , successMsgs , nil
}
func ( db * commonMsgDatabase ) GetMessagesBySeqWithBounds ( ctx context . Context , userID string , conversationID string , seqs [ ] int64 , pullOrder sdkws . PullOrder ) ( bool , int64 , [ ] * sdkws . MsgData , error ) {
var endSeq int64
var isEnd bool
userMinSeq , err := db . seqUser . GetUserMinSeq ( ctx , conversationID , userID )
if err != nil {
return false , 0 , nil , err
}
minSeq , err := db . seqConversation . GetMinSeq ( ctx , conversationID )
if err != nil {
return false , 0 , nil , err
}
maxSeq , err := db . seqConversation . GetMaxSeq ( ctx , conversationID )
if err != nil {
return false , 0 , nil , err
}
userMaxSeq , err := db . seqUser . GetUserMaxSeq ( ctx , conversationID , userID )
if err != nil {
return false , 0 , nil , err
}
if userMinSeq > minSeq {
minSeq = userMinSeq
}
if userMaxSeq > 0 && userMaxSeq < maxSeq {
maxSeq = userMaxSeq
}
newSeqs := make ( [ ] int64 , 0 , len ( seqs ) )
for _ , seq := range seqs {
if seq <= 0 {
continue
}
// The normal range and can fetch messages
if seq >= minSeq && seq <= maxSeq {
newSeqs = append ( newSeqs , seq )
continue
}
// If the requested seq is smaller than the minimum seq and the pull order is descending (pulling older messages)
if seq < minSeq && pullOrder == sdkws . PullOrder_PullOrderDesc {
isEnd = true
endSeq = minSeq
}
// If the requested seq is larger than the maximum seq and the pull order is ascending (pulling newer messages)
if seq > maxSeq && pullOrder == sdkws . PullOrder_PullOrderAsc {
isEnd = true
endSeq = maxSeq
}
}
if len ( newSeqs ) == 0 {
return isEnd , endSeq , nil , nil
}
successMsgs , failedSeqs , err := db . msg . GetMessagesBySeq ( ctx , conversationID , newSeqs )
if err != nil {
if ! errors . Is ( err , redis . Nil ) {
log . ZWarn ( ctx , "get message from redis exception" , err , "failedSeqs" , failedSeqs , "conversationID" , conversationID )
}
}
log . ZDebug ( ctx , "db.seq.GetMessagesBySeq" , "userID" , userID , "conversationID" , conversationID , "seqs" ,
seqs , "len(successMsgs)" , len ( successMsgs ) , "failedSeqs" , failedSeqs )
if len ( failedSeqs ) > 0 {
mongoMsgs , err := db . getMsgBySeqs ( ctx , userID , conversationID , failedSeqs )
if err != nil {
return false , 0 , nil , err
}
successMsgs = append ( successMsgs , mongoMsgs ... )
//_, err = db.msg.SetMessagesToCache(ctx, conversationID, mongoMsgs)
//if err != nil {
// return 0, 0, nil, err
//}
}
return isEnd , endSeq , successMsgs , nil
}
func ( db * commonMsgDatabase ) DeleteConversationMsgsAndSetMinSeq ( ctx context . Context , conversationID string , remainTime int64 ) error {
var delStruct delMsgRecursionStruct
var skip int64