@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"math"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
@ -14,7 +16,6 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/go-redis/redis/v8"
"math"
)
type MsgTool struct {
@ -53,35 +54,30 @@ func InitMsgTool() (*MsgTool, error) {
return msgTool , nil
}
func ( c * MsgTool ) getCronTaskOperationID ( ) string {
return cronTaskOperationID + utils . OperationIDGenerator ( )
}
func ( c * MsgTool ) AllUserClearMsgAndFixSeq ( ) {
operationID := "AllUserAndGroupClearMsgAndFixSeq"
ctx := mcontext . NewCtx ( utils . GetSelfFuncName ( ) )
log . NewInfo( operationID , "============================ start del cron task ============================" )
log . ZInfo ( ctx , "============================ start del cron task ============================" )
var err error
userID Li st , err := c . userDatabase . GetAllUserID ( ctx )
userID s, err := c . userDatabase . GetAllUserID ( ctx )
if err == nil {
c . ClearUsersMsg ( ctx , userID Li st )
c . ClearUsersMsg ( ctx , userID s)
} else {
log . NewError( operationID , utils . GetSelfFuncName ( ) , err . Error ( ) )
log . ZError( ctx , "ClearUsersMsg failed" , err )
}
// working group msg clear
superGroupID Li st , err := c . groupDatabase . GetGroupIDsByGroupType ( ctx , constant . WorkingGroup )
superGroupID s, err := c . groupDatabase . GetGroupIDsByGroupType ( ctx , constant . WorkingGroup )
if err == nil {
c . ClearSuperGroupMsg ( ctx , superGroupID Li st )
c . ClearSuperGroupMsg ( ctx , superGroupID s)
} else {
log . NewError( operationID , utils . GetSelfFuncName ( ) , err . Error ( ) )
log . ZError( ctx , "ClearSuperGroupMsg failed" , err )
}
log . NewInfo( operationID , "============================ start del cron finished ============================" )
log . ZInfo( ctx , "============================ start del cron finished ============================" )
}
func ( c * MsgTool ) ClearUsersMsg ( ctx context . Context , userIDs [ ] string ) {
for _ , userID := range userIDs {
if err := c . msgDatabase . DeleteUserMsgsAndSetMinSeq ( ctx , userID , int64 ( config . Config . Mongo . DBRetainChatRecords * 24 * 60 * 60 ) ) ; err != nil {
log . NewError( mcontext . GetOperationID ( ctx ) , utils . GetSelfFuncName ( ) , err . Error ( ) , userID )
log . ZError( ctx , "DeleteUserMsgsAndSetMinSeq failed" , err , "userID" , userID , "DBRetainChatRecords" , config . Config . Mongo . DBRetainChatRecords )
}
maxSeqCache , maxSeqMongo , err := c . GetAndFixUserSeqs ( ctx , userID )
if err != nil {
@ -95,14 +91,14 @@ func (c *MsgTool) ClearSuperGroupMsg(ctx context.Context, superGroupIDs []string
for _ , groupID := range superGroupIDs {
userIDs , err := c . groupDatabase . FindGroupMemberUserID ( ctx , groupID )
if err != nil {
log . NewError( mcontext . GetOperationID ( ctx ) , utils . GetSelfFuncName ( ) , "FindGroupMemberUserID" , err . Error ( ) , groupID )
log . ZError( ctx , "ClearSuperGroupMsg failed" , err , "groupID" , groupID )
continue
}
if err := c . msgDatabase . DeleteUserSuperGroupMsgsAndSetMinSeq ( ctx , groupID , userIDs , int64 ( config . Config . Mongo . DBRetainChatRecords * 24 * 60 * 60 ) ) ; err != nil {
log . NewError( mcontext . GetOperationID ( ctx ) , utils . GetSelfFuncName ( ) , err . Error ( ) , "DeleteUserSuperGroupMsgsAndSetMinSeq failed" , groupID, userIDs , config . Config . Mongo . DBRetainChatRecords )
log . ZError( ctx , "DeleteUserSuperGroupMsgsAndSetMinSeq failed" , err, " groupID", groupID , "userID" , userIDs , "DBRetainChatRecords" , config . Config . Mongo . DBRetainChatRecords )
}
if err := c . fixGroupSeq ( ctx , groupID , userIDs ) ; err != nil {
log . NewError( mcontext . GetOperationID ( ctx ) , utils . GetSelfFuncName ( ) , err . Error ( ) , groupID , userIDs )
log . ZError( ctx , "fixGroupSeq failed" , err , "groupID" , groupID , "userID" , userIDs )
}
}
}
@ -129,7 +125,7 @@ func (c *MsgTool) fixGroupSeq(ctx context.Context, groupID string, userIDs []str
}
}
if err := c . CheckMaxSeqWithMongo ( ctx , groupID , maxSeqCache , maxSeqMongo , constant . WriteDiffusion ) ; err != nil {
log . NewWarn( mcontext . GetOperationID ( ctx ) , "cache max seq and mongo max seq is diff > 10" , groupID, maxSeqCache, maxSeq Mongo, constant . WriteDiffusion )
log . ZWarn( ctx , "cache max seq and mongo max seq is diff > 10" , err, " groupID" , groupID, " maxSeqCache" , maxSeq Cache, "maxSeq Mongo", maxSeqMongo , "constant.WriteDiffusion" , constant . WriteDiffusion )
}
return nil
}
@ -138,16 +134,16 @@ func (c *MsgTool) GetAndFixUserSeqs(ctx context.Context, userID string) (maxSeqC
minSeqMongo , maxSeqMongo , minSeqCache , maxSeqCache , err := c . msgDatabase . GetUserMinMaxSeqInMongoAndCache ( ctx , userID )
if err != nil {
if err != unrelation . ErrMsgNotFound {
log . NewError( mcontext . GetOperationID ( ctx ) , utils . GetSelfFuncName ( ) , err . Error ( ) , "GetUserMinMaxSeqInMongoAndCache failed ", userID )
log . ZError( ctx , "GetUserMinMaxSeqInMongoAndCache failed ", err , "userID ", userID )
}
return 0 , 0 , err
}
log . NewDebug( mcontext . GetOperationID ( ctx ) , userID , minSeqMongo , maxSeqMongo , minSeqCache , maxSeqCache )
log . ZDebug( ctx , "userID" , userID , "minSeqMongo" , minSeqMongo , "maxSeqMongo" , maxSeqMongo , "minSeqCache" , minSeqCache , "maxSeqCache" , maxSeqCache )
if minSeqCache > maxSeqCache {
if err := c . msgDatabase . SetUserMinSeq ( ctx , userID , maxSeqCache ) ; err != nil {
log . NewError( mcontext . GetOperationID ( ctx ) , "SetUserMinSeq failed" , userID, minSeqCache, maxSeqCache )
log . ZError( ctx , "SetUserMinSeq failed" , err, " userID" , userID, " minSeqCache", minSeqCache , "maxSeqCache" , maxSeqCache )
} else {
log . NewWarn( mcontext . GetOperationID ( ctx ) , "SetUserMinSeq success" , userID , minSeqCache , maxSeqCache )
log . ZInfo( ctx , "SetUserMinSeq success" , "userID" , userID , "minSeqCache" , minSeqCache , "maxSeqCache" , maxSeqCache )
}
}
return maxSeqCache , maxSeqMongo , nil
@ -156,14 +152,14 @@ func (c *MsgTool) GetAndFixUserSeqs(ctx context.Context, userID string) (maxSeqC
func ( c * MsgTool ) GetAndFixGroupUserSeq ( ctx context . Context , userID string , groupID string , maxSeqCache int64 ) ( minSeqCache int64 , err error ) {
minSeqCache , err = c . msgDatabase . GetGroupUserMinSeq ( ctx , groupID , userID )
if err != nil {
log . NewError( mcontext . GetOperationID ( ctx ) , "GetGroupUserMinSeq failed" , groupID, userID )
log . ZError( ctx , "GetGroupUserMinSeq failed" , err, " groupID", groupID , "userID" , userID )
return 0 , err
}
if minSeqCache > maxSeqCache {
if err := c . msgDatabase . SetGroupUserMinSeq ( ctx , groupID , userID , maxSeqCache ) ; err != nil {
log . NewError( mcontext . GetOperationID ( ctx ) , "SetGroupUserMinSeq failed" , userID, minSeqCache, maxSeqCache )
log . ZError( ctx , "SetGroupUserMinSeq failed" , err, "groupID" , groupID , " userID" , userID, " minSeqCache", minSeqCache , "maxSeqCache" , maxSeqCache )
} else {
log . NewWarn( mcontext . GetOperationID ( ctx ) , "SetGroupUserMinSeq success" , userID, minSeqCache, maxSeqCache )
log . ZInfo( ctx , "SetGroupUserMinSeq success" , "groupID" , groupID, " userID" , userID, " minSeqCache", minSeqCache , "maxSeqCache" , maxSeqCache )
}
}
return minSeqCache , nil