package cronTask import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/controller" "Open_IM/pkg/common/log" "Open_IM/pkg/common/tracelog" "Open_IM/pkg/utils" "context" "math" ) type ClearMsgTool struct { msgInterface controller.MsgInterface userInterface controller.UserInterface groupInterface controller.GroupInterface } func (c *ClearMsgTool) getCronTaskOperationID() string { return cronTaskOperationID + utils.OperationIDGenerator() } func (c *ClearMsgTool) ClearAll() { operationID := c.getCronTaskOperationID() ctx := context.Background() tracelog.SetOperationID(ctx, operationID) log.NewInfo(operationID, "============================ start del cron task ============================") var err error userIDList, err := c.userInterface.GetAllUserID(ctx) if err == nil { c.ClearUsersMsg(ctx, userIDList) } else { log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) } // working group msg clear workingGroupIDList, err := c.groupInterface.GetGroupIDsByGroupType(ctx, constant.WorkingGroup) if err == nil { c.ClearSuperGroupMsg(ctx, workingGroupIDList) } else { log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) } log.NewInfo(operationID, "============================ start del cron finished ============================") } func (c *ClearMsgTool) ClearUsersMsg(ctx context.Context, userIDList []string) { for _, userID := range userIDList { if err := c.msgInterface.DeleteUserMsgsAndSetMinSeq(ctx, userID, int64(config.Config.Mongo.DBRetainChatRecords * 24 *60 *60)); err != nil { log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), userID) } minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := c.msgInterface.GetUserMinMaxSeqInMongoAndCache(ctx, userID) if err != nil { log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), "GetUserMinMaxSeqInMongoAndCache failed", userID) continue } if } } func (c *ClearMsgTool) ClearSuperGroupMsg(ctx context.Context, workingGroupIDList []string) { for _, groupID := range workingGroupIDList { userIDs, err := c.groupInterface.FindGroupMemberUserID(ctx, groupID) if err != nil { log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), "FindGroupMemberUserID", err.Error(), groupID) continue } if err := c.msgInterface.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, groupID, userIDs, int64(config.Config.Mongo.DBRetainChatRecords * 24 *60 *60)); err != nil { //log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userIDList) } minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := c.msgInterface.GetSuperGroupMinMaxSeqInMongoAndCache(ctx, groupID) } } func (c *ClearMsgTool) checkMaxSeqWithMongo(ctx context.Context, sourceID string, diffusionType int) error { var seqRedis uint64 var err error if diffusionType == constant.WriteDiffusion { seqRedis, err = db.DB.GetUserMaxSeq(sourceID) } else { seqRedis, err = db.DB.GetGroupMaxSeq(sourceID) } if err != nil { if err == goRedis.Nil { return nil } return utils.Wrap(err, "GetUserMaxSeq failed") } msg, err := db.DB.GetNewestMsg(sourceID) if err != nil { return utils.Wrap(err, "GetNewestMsg failed") } if msg == nil { return nil } if math.Abs(float64(msg.Seq-uint32(seqRedis))) > 10 { log.NewWarn(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", msg.Seq, seqRedis, sourceID, "redis maxSeq is different with msg.Seq > 10", "status: ", msg.Status, msg.SendTime) } else { log.NewInfo(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", msg.Seq, seqRedis, sourceID, "seq and msg OK", "status:", msg.Status, msg.SendTime) } return nil }