diff --git a/internal/tools/conversation.go b/internal/tools/conversation.go index 4adec99ee..5d568cac5 100644 --- a/internal/tools/conversation.go +++ b/internal/tools/conversation.go @@ -16,49 +16,126 @@ package tools import ( "context" - "time" - "github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/mcontext" "github.com/OpenIMSDK/tools/utils" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" + "math/rand" + "time" ) +//func (c *MsgTool) ConversationsDestructMsgs() { +// log.ZInfo(context.Background(), "start msg destruct cron task") +// ctx := mcontext.NewCtx(utils.GetSelfFuncName()) +// conversations, err := c.conversationDatabase.GetConversationIDsNeedDestruct(ctx) +// if err != nil { +// log.ZError(ctx, "get conversation id need destruct failed", err) +// return +// } +// log.ZDebug(context.Background(), "nums conversations need destruct", "nums", len(conversations)) +// for _, conversation := range conversations { +// ctx = mcontext.NewCtx(utils.GetSelfFuncName() + "-" + utils.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID) +// log.ZDebug( +// ctx, +// "UserMsgsDestruct", +// "conversationID", +// conversation.ConversationID, +// "ownerUserID", +// conversation.OwnerUserID, +// "msgDestructTime", +// conversation.MsgDestructTime, +// "lastMsgDestructTime", +// conversation.LatestMsgDestructTime, +// ) +// now := time.Now() +// seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime) +// if err != nil { +// log.ZError(ctx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) +// continue +// } +// if len(seqs) > 0 { +// if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]interface{}{"latest_msg_destruct_time": now}); err != nil { +// log.ZError(ctx, "updateUsersConversationFiled failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) +// continue +// } +// if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil { +// log.ZError(ctx, "userDeleteMsgsNotification failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) +// } +// } +// } +//} + func (c *MsgTool) ConversationsDestructMsgs() { log.ZInfo(context.Background(), "start msg destruct cron task") ctx := mcontext.NewCtx(utils.GetSelfFuncName()) - conversations, err := c.conversationDatabase.GetConversationIDsNeedDestruct(ctx) + num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx) if err != nil { - log.ZError(ctx, "get conversation id need destruct failed", err) + log.ZError(ctx, "GetAllConversationIDsNumber failed", err) return } - log.ZDebug(context.Background(), "nums conversations need destruct", "nums", len(conversations)) - for _, conversation := range conversations { - ctx = mcontext.NewCtx(utils.GetSelfFuncName() + "-" + utils.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID) - log.ZDebug( - ctx, - "UserMsgsDestruct", - "conversationID", - conversation.ConversationID, - "ownerUserID", - conversation.OwnerUserID, - "msgDestructTime", - conversation.MsgDestructTime, - "lastMsgDestructTime", - conversation.LatestMsgDestructTime, - ) - now := time.Now() - seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime) + const batchNum = 50 + log.ZDebug(ctx, "GetAllConversationIDsNumber", "num", num) + if num == 0 { + return + } + count := int(num/batchNum + num/batchNum/2) + if count < 1 { + count = 1 + } + maxPage := 1 + num/batchNum + if num%batchNum != 0 { + maxPage++ + } + for i := 0; i < count; i++ { + pageNumber := rand.Int63() % maxPage + conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, int32(pageNumber), batchNum) if err != nil { - log.ZError(ctx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) + log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) continue } - if len(seqs) > 0 { - if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]interface{}{"latest_msg_destruct_time": now}); err != nil { - log.ZError(ctx, "updateUsersConversationFiled failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) + log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs) + if len(conversationIDs) == 0 { + continue + } + conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, conversationIDs) + if err != nil { + log.ZError(ctx, "GetConversationsByConversationID failed", err, "conversationIDs", conversationIDs) + continue + } + temp := make([]*relation.ConversationModel, 0, len(conversations)) + for i, conversation := range conversations { + if conversation.IsMsgDestruct && conversation.MsgDestructTime != 0 && (time.Now().Unix() > (conversation.MsgDestructTime+conversation.LatestMsgDestructTime.Unix()+8*60*60)) || conversation.LatestMsgDestructTime.IsZero() { + temp = append(temp, conversations[i]) + } + } + for _, conversation := range temp { + ctx = mcontext.NewCtx(utils.GetSelfFuncName() + "-" + utils.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID) + log.ZDebug( + ctx, + "UserMsgsDestruct", + "conversationID", + conversation.ConversationID, + "ownerUserID", + conversation.OwnerUserID, + "msgDestructTime", + conversation.MsgDestructTime, + "lastMsgDestructTime", + conversation.LatestMsgDestructTime, + ) + now := time.Now() + seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime) + if err != nil { + log.ZError(ctx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) continue } - if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil { - log.ZError(ctx, "userDeleteMsgsNotification failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) + if len(seqs) > 0 { + if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]interface{}{"latest_msg_destruct_time": now}); err != nil { + log.ZError(ctx, "updateUsersConversationFiled failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) + continue + } + if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil { + log.ZError(ctx, "userDeleteMsgsNotification failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) + } } } } diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 6702bc6c7..6f4803628 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -39,13 +39,13 @@ func StartTask() error { log.ZInfo(context.Background(), "start chatRecordsClearTime cron task", "cron config", config.Config.ChatRecordsClearTime) _, err = c.AddFunc(config.Config.ChatRecordsClearTime, msgTool.AllConversationClearMsgAndFixSeq) if err != nil { - fmt.Println("start allConversationClearMsgAndFixSeq cron failed", err.Error(), config.Config.ChatRecordsClearTime) + log.ZError(context.Background(), "start allConversationClearMsgAndFixSeq cron failed", err) panic(err) } log.ZInfo(context.Background(), "start msgDestruct cron task", "cron config", config.Config.MsgDestructTime) _, err = c.AddFunc(config.Config.MsgDestructTime, msgTool.ConversationsDestructMsgs) if err != nil { - fmt.Println("start conversationsDestructMsgs cron failed", err.Error(), config.Config.ChatRecordsClearTime) + log.ZError(context.Background(), "start conversationsDestructMsgs cron failed", err) panic(err) } c.Start() diff --git a/internal/tools/msg.go b/internal/tools/msg.go index ca095051c..7e06fda4a 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -17,13 +17,11 @@ package tools import ( "context" "fmt" - "math" - + "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" "github.com/redis/go-redis/v9" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - - "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" + "math" "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" @@ -31,6 +29,7 @@ import ( "github.com/OpenIMSDK/tools/mw" "github.com/OpenIMSDK/tools/tx" "github.com/OpenIMSDK/tools/utils" + "math/rand" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" @@ -104,18 +103,55 @@ func InitMsgTool() (*MsgTool, error) { return msgTool, nil } +//func (c *MsgTool) AllConversationClearMsgAndFixSeq() { +// ctx := mcontext.NewCtx(utils.GetSelfFuncName()) +// log.ZInfo(ctx, "============================ start del cron task ============================") +// conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx) +// if err != nil { +// log.ZError(ctx, "GetAllConversationIDs failed", err) +// return +// } +// for _, conversationID := range conversationIDs { +// conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID)) +// } +// c.ClearConversationsMsg(ctx, conversationIDs) +// log.ZInfo(ctx, "============================ start del cron finished ============================") +//} + func (c *MsgTool) AllConversationClearMsgAndFixSeq() { ctx := mcontext.NewCtx(utils.GetSelfFuncName()) log.ZInfo(ctx, "============================ start del cron task ============================") - conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx) + num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx) if err != nil { - log.ZError(ctx, "GetAllConversationIDs failed", err) + log.ZError(ctx, "GetAllConversationIDsNumber failed", err) return } - for _, conversationID := range conversationIDs { - conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID)) + const batchNum = 50 + log.ZDebug(ctx, "GetAllConversationIDsNumber", "num", num) + if num == 0 { + return + } + count := int(num/batchNum + num/batchNum/2) + if count < 1 { + count = 1 + } + maxPage := 1 + num/batchNum + if num%batchNum != 0 { + maxPage++ + } + for i := 0; i < count; i++ { + pageNumber := rand.Int63() % maxPage + conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, int32(pageNumber), batchNum) + if err != nil { + log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) + continue + } + log.ZDebug(ctx, "PageConversationIDs failed", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs) + if len(conversationIDs) == 0 { + continue + } + c.ClearConversationsMsg(ctx, conversationIDs) } - c.ClearConversationsMsg(ctx, conversationIDs) log.ZInfo(ctx, "============================ start del cron finished ============================") } diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index b93f0bf06..4f7de5ee0 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -50,6 +50,8 @@ type ConversationDatabase interface { GetConversationIDs(ctx context.Context, userID string) ([]string, error) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) GetAllConversationIDs(ctx context.Context) ([]string, error) + GetAllConversationIDsNumber(ctx context.Context) (int64, error) + PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) (conversationIDs []string, err error) //GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.ConversationModel, error) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.ConversationModel, error) @@ -295,6 +297,14 @@ func (c *conversationDatabase) GetAllConversationIDs(ctx context.Context) ([]str return c.conversationDB.GetAllConversationIDs(ctx) } +func (c *conversationDatabase) GetAllConversationIDsNumber(ctx context.Context) (int64, error) { + return c.conversationDB.GetAllConversationIDsNumber(ctx) +} + +func (c *conversationDatabase) PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) ([]string, error) { + return c.conversationDB.PageConversationIDs(ctx, pageNumber, showNumber) +} + //func (c *conversationDatabase) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) { // return c.cache.GetUserAllHasReadSeqs(ctx, ownerUserID) //} diff --git a/pkg/common/db/relation/conversation_model.go b/pkg/common/db/relation/conversation_model.go index d5ca92ec2..b6c554864 100644 --- a/pkg/common/db/relation/conversation_model.go +++ b/pkg/common/db/relation/conversation_model.go @@ -16,9 +16,7 @@ package relation import ( "context" - "github.com/OpenIMSDK/tools/errs" - "gorm.io/gorm" "github.com/OpenIMSDK/protocol/constant" @@ -188,6 +186,18 @@ func (c *ConversationGorm) GetAllConversationIDs(ctx context.Context) (conversat ) } +func (c *ConversationGorm) GetAllConversationIDsNumber(ctx context.Context) (int64, error) { + var num int64 + err := c.db(ctx).Select("COUNT(DISTINCT conversation_id)").Model(&relation.ConversationModel{}).Count(&num).Error + return num, errs.Wrap(err) +} + +func (c *ConversationGorm) PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) (conversationIDs []string, err error) { + err = c.db(ctx).Distinct("conversation_id").Limit(int(showNumber)).Offset(int((pageNumber-1)*showNumber)).Pluck("conversation_id", &conversationIDs).Error + err = errs.Wrap(err) + return +} + func (c *ConversationGorm) GetUserAllHasReadSeqs( ctx context.Context, ownerUserID string, diff --git a/pkg/common/db/table/relation/conversation.go b/pkg/common/db/table/relation/conversation.go index 7e6c6bdf8..e9680873f 100644 --- a/pkg/common/db/table/relation/conversation.go +++ b/pkg/common/db/table/relation/conversation.go @@ -63,6 +63,8 @@ type ConversationModelInterface interface { GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) FindSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) GetAllConversationIDs(ctx context.Context) ([]string, error) + GetAllConversationIDsNumber(ctx context.Context) (int64, error) + PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) (conversationIDs []string, err error) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (hashReadSeqs map[string]int64, err error) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*ConversationModel, error) GetConversationIDsNeedDestruct(ctx context.Context) ([]*ConversationModel, error)