package cronTask
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/controller"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/tracelog"
"Open_IM/pkg/utils"
"context"
"math"
)
type ClearMsgTool struct {
msgInterface controller . MsgInterface
userInterface controller . UserInterface
groupInterface controller . GroupInterface
}
func ( c * ClearMsgTool ) getCronTaskOperationID ( ) string {
return cronTaskOperationID + utils . OperationIDGenerator ( )
}
func ( c * ClearMsgTool ) ClearAll ( ) {
operationID := c . getCronTaskOperationID ( )
ctx := context . Background ( )
tracelog . SetOperationID ( ctx , operationID )
log . NewInfo ( operationID , "============================ start del cron task ============================" )
var err error
userIDList , err := c . userInterface . GetAllUserID ( ctx )
if err == nil {
c . ClearUsersMsg ( ctx , userIDList )
} else {
log . NewError ( operationID , utils . GetSelfFuncName ( ) , err . Error ( ) )
}
// working group msg clear
workingGroupIDList , err := c . groupInterface . GetGroupIDsByGroupType ( ctx , constant . WorkingGroup )
if err == nil {
c . ClearSuperGroupMsg ( ctx , workingGroupIDList )
} else {
log . NewError ( operationID , utils . GetSelfFuncName ( ) , err . Error ( ) )
}
log . NewInfo ( operationID , "============================ start del cron finished ============================" )
}
func ( c * ClearMsgTool ) ClearUsersMsg ( ctx context . Context , userIDList [ ] string ) {
for _ , userID := range userIDList {
if err := c . msgInterface . DeleteUserMsgsAndSetMinSeq ( ctx , userID , int64 ( config . Config . Mongo . DBRetainChatRecords * 24 * 60 * 60 ) ) ; err != nil {
log . NewError ( tracelog . GetOperationID ( ctx ) , utils . GetSelfFuncName ( ) , err . Error ( ) , userID )
}
minSeqMongo , maxSeqMongo , minSeqCache , maxSeqCache , err := c . msgInterface . GetUserMinMaxSeqInMongoAndCache ( ctx , userID )
if err != nil {
log . NewError ( tracelog . GetOperationID ( ctx ) , utils . GetSelfFuncName ( ) , err . Error ( ) , "GetUserMinMaxSeqInMongoAndCache failed" , userID )
continue
}
if
}
}
func ( c * ClearMsgTool ) ClearSuperGroupMsg ( ctx context . Context , workingGroupIDList [ ] string ) {
for _ , groupID := range workingGroupIDList {
userIDs , err := c . groupInterface . FindGroupMemberUserID ( ctx , groupID )
if err != nil {
log . NewError ( tracelog . GetOperationID ( ctx ) , utils . GetSelfFuncName ( ) , "FindGroupMemberUserID" , err . Error ( ) , groupID )
continue
}
if err := c . msgInterface . DeleteUserSuperGroupMsgsAndSetMinSeq ( ctx , groupID , userIDs , int64 ( config . Config . Mongo . DBRetainChatRecords * 24 * 60 * 60 ) ) ; err != nil {
//log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userIDList)
}
minSeqMongo , maxSeqMongo , minSeqCache , maxSeqCache , err := c . msgInterface . GetSuperGroupMinMaxSeqInMongoAndCache ( ctx , groupID )
}
}
func ( c * ClearMsgTool ) checkMaxSeqWithMongo ( ctx context . Context , sourceID string , diffusionType int ) error {
var seqRedis uint64
var err error
if diffusionType == constant . WriteDiffusion {
seqRedis , err = db . DB . GetUserMaxSeq ( sourceID )
} else {
seqRedis , err = db . DB . GetGroupMaxSeq ( sourceID )
}
if err != nil {
if err == goRedis . Nil {
return nil
}
return utils . Wrap ( err , "GetUserMaxSeq failed" )
}
msg , err := db . DB . GetNewestMsg ( sourceID )
if err != nil {
return utils . Wrap ( err , "GetNewestMsg failed" )
}
if msg == nil {
return nil
}
if math . Abs ( float64 ( msg . Seq - uint32 ( seqRedis ) ) ) > 10 {
log . NewWarn ( operationID , utils . GetSelfFuncName ( ) , "seqMongo, seqRedis" , msg . Seq , seqRedis , sourceID , "redis maxSeq is different with msg.Seq > 10" , "status: " , msg . Status , msg . SendTime )
} else {
log . NewInfo ( operationID , utils . GetSelfFuncName ( ) , "seqMongo, seqRedis" , msg . Seq , seqRedis , sourceID , "seq and msg OK" , "status:" , msg . Status , msg . SendTime )
}
return nil
}