@ -15,10 +15,14 @@
package controller
package controller
import (
import (
"context"
"errors"
"time"
"gorm.io/gorm"
relation2 "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
relation2 "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
"gorm.io/gorm"
"time"
"github.com/redis/go-redis/v9"
"github.com/redis/go-redis/v9"
@ -32,13 +36,11 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"context"
"go.mongodb.org/mongo-driver/mongo"
"errors"
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"go.mongodb.org/mongo-driver/mongo"
)
)
const (
const (
@ -103,8 +105,23 @@ type CommonMsgDatabase interface {
MsgToPushMQ ( ctx context . Context , key , conversarionID string , msg2mq * sdkws . MsgData ) ( int32 , int64 , error )
MsgToPushMQ ( ctx context . Context , key , conversarionID string , msg2mq * sdkws . MsgData ) ( int32 , int64 , error )
MsgToMongoMQ ( ctx context . Context , key , conversarionID string , msgs [ ] * sdkws . MsgData , lastSeq int64 ) error
MsgToMongoMQ ( ctx context . Context , key , conversarionID string , msgs [ ] * sdkws . MsgData , lastSeq int64 ) error
RangeUserSendCount ( ctx context . Context , start time . Time , end time . Time , group bool , ase bool , pageNumber int32 , showNumber int32 ) ( msgCount int64 , userCount int64 , users [ ] * unRelationTb . UserCount , dateCount map [ string ] int64 , err error )
RangeUserSendCount (
RangeGroupSendCount ( ctx context . Context , start time . Time , end time . Time , ase bool , pageNumber int32 , showNumber int32 ) ( msgCount int64 , userCount int64 , groups [ ] * unRelationTb . GroupCount , dateCount map [ string ] int64 , err error )
ctx context . Context ,
start time . Time ,
end time . Time ,
group bool ,
ase bool ,
pageNumber int32 ,
showNumber int32 ,
) ( msgCount int64 , userCount int64 , users [ ] * unRelationTb . UserCount , dateCount map [ string ] int64 , err error )
RangeGroupSendCount (
ctx context . Context ,
start time . Time ,
end time . Time ,
ase bool ,
pageNumber int32 ,
showNumber int32 ,
) ( msgCount int64 , userCount int64 , groups [ ] * unRelationTb . GroupCount , dateCount map [ string ] int64 , err error )
}
}
func NewCommonMsgDatabase ( msgDocModel unRelationTb . MsgDocModelInterface , cacheModel cache . MsgModel , msgMyqModel relation . ChatLogModelInterface ) CommonMsgDatabase {
func NewCommonMsgDatabase ( msgDocModel unRelationTb . MsgDocModelInterface , cacheModel cache . MsgModel , msgMyqModel relation . ChatLogModelInterface ) CommonMsgDatabase {
@ -172,7 +189,7 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI
return nil
return nil
}
}
num := db . msg . GetSingleGocMsgNum ( )
num := db . msg . GetSingleGocMsgNum ( )
// num = 100
// num = 100
for i , field := range fields { // 检查类型
for i , field := range fields { // 检查类型
var ok bool
var ok bool
switch key {
switch key {
@ -390,7 +407,7 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
func ( db * commonMsgDatabase ) getMsgBySeqs ( ctx context . Context , userID , conversationID string , seqs [ ] int64 ) ( totalMsgs [ ] * sdkws . MsgData , err error ) {
func ( db * commonMsgDatabase ) getMsgBySeqs ( ctx context . Context , userID , conversationID string , seqs [ ] int64 ) ( totalMsgs [ ] * sdkws . MsgData , err error ) {
for docID , seqs := range db . msg . GetDocIDSeqsMap ( conversationID , seqs ) {
for docID , seqs := range db . msg . GetDocIDSeqsMap ( conversationID , seqs ) {
// log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs)
// log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs)
msgs , err := db . findMsgInfoBySeq ( ctx , userID , docID , seqs )
msgs , err := db . findMsgInfoBySeq ( ctx , userID , docID , seqs )
if err != nil {
if err != nil {
return nil , err
return nil , err
@ -575,7 +592,22 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co
log . ZError ( ctx , "get message from redis exception" , err , "failedSeqs" , failedSeqs , "conversationID" , conversationID )
log . ZError ( ctx , "get message from redis exception" , err , "failedSeqs" , failedSeqs , "conversationID" , conversationID )
}
}
}
}
log . ZInfo ( ctx , "db.cache.GetMessagesBySeq" , "userID" , userID , "conversationID" , conversationID , "seqs" , seqs , "successMsgs" , len ( successMsgs ) , "failedSeqs" , failedSeqs , "conversationID" , conversationID )
log . ZInfo (
ctx ,
"db.cache.GetMessagesBySeq" ,
"userID" ,
userID ,
"conversationID" ,
conversationID ,
"seqs" ,
seqs ,
"successMsgs" ,
len ( successMsgs ) ,
"failedSeqs" ,
failedSeqs ,
"conversationID" ,
conversationID ,
)
prome . Add ( prome . MsgPullFromRedisSuccessCounter , len ( successMsgs ) )
prome . Add ( prome . MsgPullFromRedisSuccessCounter , len ( successMsgs ) )
if len ( failedSeqs ) > 0 {
if len ( failedSeqs ) > 0 {
mongoMsgs , err := db . getMsgBySeqs ( ctx , userID , conversationID , failedSeqs )
mongoMsgs , err := db . getMsgBySeqs ( ctx , userID , conversationID , failedSeqs )
@ -637,7 +669,6 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
log . ZDebug ( ctx , "deleteMsgRecursion finished" , "conversationID" , conversationID , "userID" , userID , "index" , index )
log . ZDebug ( ctx , "deleteMsgRecursion finished" , "conversationID" , conversationID , "userID" , userID , "index" , index )
break
break
}
}
}
}
}
}
}
}
@ -652,7 +683,7 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
return seqs , nil
return seqs , nil
}
}
// this is struct for recursion
// this is struct for recursion .
type delMsgRecursionStruct struct {
type delMsgRecursionStruct struct {
minSeq int64
minSeq int64
delDocIDs [ ] string
delDocIDs [ ] string
@ -665,7 +696,7 @@ func (d *delMsgRecursionStruct) getSetMinSeq() int64 {
// index 0....19(del) 20...69
// index 0....19(del) 20...69
// seq 70
// seq 70
// set minSeq 21
// set minSeq 21
// recursion 删除list并且返回设置的最小seq
// recursion 删除list并且返回设置的最小seq .
func ( db * commonMsgDatabase ) deleteMsgRecursion ( ctx context . Context , conversationID string , index int64 , delStruct * delMsgRecursionStruct , remainTime int64 ) ( int64 , error ) {
func ( db * commonMsgDatabase ) deleteMsgRecursion ( ctx context . Context , conversationID string , index int64 , delStruct * delMsgRecursionStruct , remainTime int64 ) ( int64 , error ) {
// find from oldest list
// find from oldest list
msgDocModel , err := db . msgDocDatabase . GetMsgDocModelByIndex ( ctx , conversationID , index , 1 )
msgDocModel , err := db . msgDocDatabase . GetMsgDocModelByIndex ( ctx , conversationID , index , 1 )
@ -791,15 +822,19 @@ func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, u
func ( db * commonMsgDatabase ) SetMaxSeq ( ctx context . Context , conversationID string , maxSeq int64 ) error {
func ( db * commonMsgDatabase ) SetMaxSeq ( ctx context . Context , conversationID string , maxSeq int64 ) error {
return db . cache . SetMaxSeq ( ctx , conversationID , maxSeq )
return db . cache . SetMaxSeq ( ctx , conversationID , maxSeq )
}
}
func ( db * commonMsgDatabase ) GetMaxSeqs ( ctx context . Context , conversationIDs [ ] string ) ( map [ string ] int64 , error ) {
func ( db * commonMsgDatabase ) GetMaxSeqs ( ctx context . Context , conversationIDs [ ] string ) ( map [ string ] int64 , error ) {
return db . cache . GetMaxSeqs ( ctx , conversationIDs )
return db . cache . GetMaxSeqs ( ctx , conversationIDs )
}
}
func ( db * commonMsgDatabase ) GetMaxSeq ( ctx context . Context , conversationID string ) ( int64 , error ) {
func ( db * commonMsgDatabase ) GetMaxSeq ( ctx context . Context , conversationID string ) ( int64 , error ) {
return db . cache . GetMaxSeq ( ctx , conversationID )
return db . cache . GetMaxSeq ( ctx , conversationID )
}
}
func ( db * commonMsgDatabase ) SetMinSeq ( ctx context . Context , conversationID string , minSeq int64 ) error {
func ( db * commonMsgDatabase ) SetMinSeq ( ctx context . Context , conversationID string , minSeq int64 ) error {
return db . cache . SetMinSeq ( ctx , conversationID , minSeq )
return db . cache . SetMinSeq ( ctx , conversationID , minSeq )
}
}
func ( db * commonMsgDatabase ) SetMinSeqs ( ctx context . Context , seqs map [ string ] int64 ) error {
func ( db * commonMsgDatabase ) SetMinSeqs ( ctx context . Context , seqs map [ string ] int64 ) error {
return db . cache . SetMinSeqs ( ctx , seqs )
return db . cache . SetMinSeqs ( ctx , seqs )
}
}
@ -807,18 +842,23 @@ func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int
func ( db * commonMsgDatabase ) GetMinSeqs ( ctx context . Context , conversationIDs [ ] string ) ( map [ string ] int64 , error ) {
func ( db * commonMsgDatabase ) GetMinSeqs ( ctx context . Context , conversationIDs [ ] string ) ( map [ string ] int64 , error ) {
return db . cache . GetMinSeqs ( ctx , conversationIDs )
return db . cache . GetMinSeqs ( ctx , conversationIDs )
}
}
func ( db * commonMsgDatabase ) GetMinSeq ( ctx context . Context , conversationID string ) ( int64 , error ) {
func ( db * commonMsgDatabase ) GetMinSeq ( ctx context . Context , conversationID string ) ( int64 , error ) {
return db . cache . GetMinSeq ( ctx , conversationID )
return db . cache . GetMinSeq ( ctx , conversationID )
}
}
func ( db * commonMsgDatabase ) GetConversationUserMinSeq ( ctx context . Context , conversationID string , userID string ) ( int64 , error ) {
func ( db * commonMsgDatabase ) GetConversationUserMinSeq ( ctx context . Context , conversationID string , userID string ) ( int64 , error ) {
return db . cache . GetConversationUserMinSeq ( ctx , conversationID , userID )
return db . cache . GetConversationUserMinSeq ( ctx , conversationID , userID )
}
}
func ( db * commonMsgDatabase ) GetConversationUserMinSeqs ( ctx context . Context , conversationID string , userIDs [ ] string ) ( map [ string ] int64 , error ) {
func ( db * commonMsgDatabase ) GetConversationUserMinSeqs ( ctx context . Context , conversationID string , userIDs [ ] string ) ( map [ string ] int64 , error ) {
return db . cache . GetConversationUserMinSeqs ( ctx , conversationID , userIDs )
return db . cache . GetConversationUserMinSeqs ( ctx , conversationID , userIDs )
}
}
func ( db * commonMsgDatabase ) SetConversationUserMinSeq ( ctx context . Context , conversationID string , userID string , minSeq int64 ) error {
func ( db * commonMsgDatabase ) SetConversationUserMinSeq ( ctx context . Context , conversationID string , userID string , minSeq int64 ) error {
return db . cache . SetConversationUserMinSeq ( ctx , conversationID , userID , minSeq )
return db . cache . SetConversationUserMinSeq ( ctx , conversationID , userID , minSeq )
}
}
func ( db * commonMsgDatabase ) SetConversationUserMinSeqs ( ctx context . Context , conversationID string , seqs map [ string ] int64 ) ( err error ) {
func ( db * commonMsgDatabase ) SetConversationUserMinSeqs ( ctx context . Context , conversationID string , seqs map [ string ] int64 ) ( err error ) {
return db . cache . SetConversationUserMinSeqs ( ctx , conversationID , seqs )
return db . cache . SetConversationUserMinSeqs ( ctx , conversationID , seqs )
}
}
@ -834,6 +874,7 @@ func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID stri
func ( db * commonMsgDatabase ) SetHasReadSeq ( ctx context . Context , userID string , conversationID string , hasReadSeq int64 ) error {
func ( db * commonMsgDatabase ) SetHasReadSeq ( ctx context . Context , userID string , conversationID string , hasReadSeq int64 ) error {
return db . cache . SetHasReadSeq ( ctx , userID , conversationID , hasReadSeq )
return db . cache . SetHasReadSeq ( ctx , userID , conversationID , hasReadSeq )
}
}
func ( db * commonMsgDatabase ) GetHasReadSeqs ( ctx context . Context , userID string , conversationIDs [ ] string ) ( map [ string ] int64 , error ) {
func ( db * commonMsgDatabase ) GetHasReadSeqs ( ctx context . Context , userID string , conversationIDs [ ] string ) ( map [ string ] int64 , error ) {
return db . cache . GetHasReadSeqs ( ctx , userID , conversationIDs )
return db . cache . GetHasReadSeqs ( ctx , userID , conversationIDs )
}
}
@ -884,11 +925,26 @@ func (db *commonMsgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversation
return
return
}
}
func ( db * commonMsgDatabase ) RangeUserSendCount ( ctx context . Context , start time . Time , end time . Time , group bool , ase bool , pageNumber int32 , showNumber int32 ) ( msgCount int64 , userCount int64 , users [ ] * unRelationTb . UserCount , dateCount map [ string ] int64 , err error ) {
func ( db * commonMsgDatabase ) RangeUserSendCount (
ctx context . Context ,
start time . Time ,
end time . Time ,
group bool ,
ase bool ,
pageNumber int32 ,
showNumber int32 ,
) ( msgCount int64 , userCount int64 , users [ ] * unRelationTb . UserCount , dateCount map [ string ] int64 , err error ) {
return db . msgDocDatabase . RangeUserSendCount ( ctx , start , end , group , ase , pageNumber , showNumber )
return db . msgDocDatabase . RangeUserSendCount ( ctx , start , end , group , ase , pageNumber , showNumber )
}
}
func ( db * commonMsgDatabase ) RangeGroupSendCount ( ctx context . Context , start time . Time , end time . Time , ase bool , pageNumber int32 , showNumber int32 ) ( msgCount int64 , userCount int64 , groups [ ] * unRelationTb . GroupCount , dateCount map [ string ] int64 , err error ) {
func ( db * commonMsgDatabase ) RangeGroupSendCount (
ctx context . Context ,
start time . Time ,
end time . Time ,
ase bool ,
pageNumber int32 ,
showNumber int32 ,
) ( msgCount int64 , userCount int64 , groups [ ] * unRelationTb . GroupCount , dateCount map [ string ] int64 , err error ) {
return db . msgDocDatabase . RangeGroupSendCount ( ctx , start , end , ase , pageNumber , showNumber )
return db . msgDocDatabase . RangeGroupSendCount ( ctx , start , end , ase , pageNumber , showNumber )
}
}