From 0654ecc9bbe3bf4ceb1db3703c71241bd07e0125 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 24 Nov 2022 19:17:29 +0800 Subject: [PATCH] cron --- cmd/open_im_cron_task/main.go | 8 ++- internal/cron_task/clear_msg.go | 1 + internal/cron_task/cron_task.go | 116 +++++++++++++++++++------------- 3 files changed, 77 insertions(+), 48 deletions(-) diff --git a/cmd/open_im_cron_task/main.go b/cmd/open_im_cron_task/main.go index 30ed7aea7..a3cb60a25 100644 --- a/cmd/open_im_cron_task/main.go +++ b/cmd/open_im_cron_task/main.go @@ -2,11 +2,15 @@ package main import ( "Open_IM/internal/cron_task" + "flag" "fmt" "time" ) func main() { - fmt.Println(time.Now(), "start cronTask") - cronTask.StartCronTask() + var userID = flag.String("userID", "", "userID to clear msg and reset seq") + var workingGroupID = flag.String("workingGroupID", "", "workingGroupID to clear msg and reset seq") + flag.Parse() + fmt.Println(time.Now(), "start cronTask", userID, workingGroupID) + cronTask.StartCronTask(*userID, *workingGroupID) } diff --git a/internal/cron_task/clear_msg.go b/internal/cron_task/clear_msg.go index 1db1c0b6c..f752d3f59 100644 --- a/internal/cron_task/clear_msg.go +++ b/internal/cron_task/clear_msg.go @@ -139,6 +139,7 @@ func deleteMongoMsg(operationID string, ID string, index int64, delStruct *delMs } delStruct.minSeq = lastMsgPb.Seq if msgListIsFull(msgs) { + log.NewDebug(operationID, "msg list is full", msgs.UID) delStruct.delUidList = append(delStruct.delUidList, msgs.UID) } log.NewDebug(operationID, ID, "continue", delStruct) diff --git a/internal/cron_task/cron_task.go b/internal/cron_task/cron_task.go index 87c2fbb46..1fdef997e 100644 --- a/internal/cron_task/cron_task.go +++ b/internal/cron_task/cron_task.go @@ -14,60 +14,31 @@ import ( const cronTaskOperationID = "cronTaskOperationID-" -func StartCronTask() { +func StartCronTask(userID, workingGroupID string) { log.NewPrivateLog("cron") log.NewInfo(utils.OperationIDGenerator(), "start cron task", "cron config", config.Config.Mongo.ChatRecordsClearTime) - c := cron.New() fmt.Println("cron config", config.Config.Mongo.ChatRecordsClearTime) - _, err := c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, func() { - // user msg clear + if userID != "" { operationID := getCronTaskOperationID() - log.NewInfo(operationID, "====================== start del cron task ======================") - userIDList, err := im_mysql_model.SelectAllUserID() - if err == nil { - log.NewDebug(operationID, utils.GetSelfFuncName(), "userIDList: ", userIDList) - userIDList = []string{"4158779020"} - for _, userID := range userIDList { - if err := DeleteMongoMsgAndResetRedisSeq(operationID, userID); err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), userID) - } - if err := checkMaxSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), userID, err) - } - } - } else { - log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) - } - //return - // working group msg clear - workingGroupIDList, err := im_mysql_model.GetGroupIDListByGroupType(constant.WorkingGroup) - if err == nil { - log.NewDebug(operationID, utils.GetSelfFuncName(), "workingGroupIDList: ", workingGroupIDList) - for _, groupID := range workingGroupIDList { - userIDList, err = rocksCache.GetGroupMemberIDListFromCache(groupID) - if err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID) - continue - } - log.NewDebug(operationID, utils.GetSelfFuncName(), "groupID:", groupID, "userIDList:", userIDList) - if err := ResetUserGroupMinSeq(operationID, groupID, userIDList); err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userIDList) - } - if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), groupID, err) - } - } - } else { - log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) - } - - log.NewInfo(operationID, "====================== start del cron finished ======================") - }) + StartClearMsg(operationID, []string{userID}) + } + if workingGroupID != "" { + operationID := getCronTaskOperationID() + StartClearWorkingGroupMsg(operationID, []string{workingGroupID}) + } + if userID != "" || workingGroupID != "" { + fmt.Println("clear msg finished") + return + } + clearFunc := func() { + ClearAll() + } + c := cron.New() + _, err := c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, clearFunc) if err != nil { fmt.Println("start cron failed", err.Error(), config.Config.Mongo.ChatRecordsClearTime) panic(err) } - c.Start() fmt.Println("start cron task success") for { @@ -78,3 +49,56 @@ func StartCronTask() { func getCronTaskOperationID() string { return cronTaskOperationID + utils.OperationIDGenerator() } + +func ClearAll() { + operationID := getCronTaskOperationID() + log.NewInfo(operationID, "====================== start del cron task ======================") + //var userIDList []string + var err error + userIDList, err := im_mysql_model.SelectAllUserID() + if err == nil { + StartClearMsg(operationID, userIDList) + } else { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) + } + //return + // working group msg clear + workingGroupIDList, err := im_mysql_model.GetGroupIDListByGroupType(constant.WorkingGroup) + if err == nil { + StartClearWorkingGroupMsg(operationID, workingGroupIDList) + } else { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error()) + } + + log.NewInfo(operationID, "====================== start del cron finished ======================") +} + +func StartClearMsg(operationID string, userIDList []string) { + log.NewDebug(operationID, utils.GetSelfFuncName(), "userIDList: ", userIDList) + for _, userID := range userIDList { + if err := DeleteMongoMsgAndResetRedisSeq(operationID, userID); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), userID) + } + if err := checkMaxSeqWithMongo(operationID, userID, constant.WriteDiffusion); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), userID, err) + } + } +} + +func StartClearWorkingGroupMsg(operationID string, workingGroupIDList []string) { + log.NewDebug(operationID, utils.GetSelfFuncName(), "workingGroupIDList: ", workingGroupIDList) + for _, groupID := range workingGroupIDList { + userIDList, err := rocksCache.GetGroupMemberIDListFromCache(groupID) + if err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID) + continue + } + log.NewDebug(operationID, utils.GetSelfFuncName(), "groupID:", groupID, "workingGroupIDList:", userIDList) + if err := ResetUserGroupMinSeq(operationID, groupID, userIDList); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userIDList) + } + if err := checkMaxSeqWithMongo(operationID, groupID, constant.ReadDiffusion); err != nil { + log.NewError(operationID, utils.GetSelfFuncName(), groupID, err) + } + } +}