// Copyright © 2023 OpenIM. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package tools import ( "context" "fmt" "github.com/openimsdk/open-im-server/v3/internal/rpc/msg" "math" "math/rand" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/stringutil" "github.com/redis/go-redis/v9" ) type MsgTool struct { msgDatabase controller.CommonMsgDatabase conversationDatabase controller.ConversationDatabase userDatabase controller.UserDatabase groupDatabase controller.GroupDatabase msgNotificationSender *msg.MsgNotificationSender config *CronTaskConfig } func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controller.UserDatabase, groupDatabase controller.GroupDatabase, conversationDatabase controller.ConversationDatabase, msgNotificationSender *msg.MsgNotificationSender, config *CronTaskConfig, ) *MsgTool { return &MsgTool{ msgDatabase: msgDatabase, userDatabase: userDatabase, groupDatabase: groupDatabase, conversationDatabase: conversationDatabase, msgNotificationSender: msgNotificationSender, config: config, } } func InitMsgTool(ctx context.Context, config *CronTaskConfig) (*MsgTool, error) { ch := make(chan int) <-ch //mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) //if err != nil { // return nil, err //} //rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) //if err != nil { // return nil, err //} //discov, err := kdisc.NewDiscoveryRegister(&config.ZookeeperConfig, &config.Share) //if err != nil { // return nil, err //} //discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) //userDB, err := mgo.NewUserMongo(mgocli.GetDB()) //if err != nil { // return nil, err //} ////msgDatabase, err := controller.InitCommonMsgDatabase(rdb, mgocli.GetDB(), config) //if err != nil { // return nil, err //} //userMongoDB := mgo.NewUserMongoDriver(mgocli.GetDB()) //userDatabase := controller.NewUserDatabase( // userDB, // cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt()), // mgocli.GetTx(), // userMongoDB, //) //groupDB, err := mgo.NewGroupMongo(mgocli.GetDB()) //if err != nil { // return nil, err //} //groupMemberDB, err := mgo.NewGroupMember(mgocli.GetDB()) //if err != nil { // return nil, err //} //groupRequestDB, err := mgo.NewGroupRequestMgo(mgocli.GetDB()) //if err != nil { // return nil, err //} //conversationDB, err := mgo.NewConversationMongo(mgocli.GetDB()) //if err != nil { // return nil, err //} //groupDatabase := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), nil) //conversationDatabase := controller.NewConversationDatabase( // conversationDB, // cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), // mgocli.GetTx(), //) //msgRpcClient := rpcclient.NewMessageRpcClient(discov, config.Share.RpcRegisterName.Msg) //msgNotificationSender := notification.NewMsgNotificationSender(config, rpcclient.WithRpcClient(&msgRpcClient)) //msgTool := NewMsgTool(msgDatabase, userDatabase, groupDatabase, conversationDatabase, msgNotificationSender, config) //return msgTool, nil return nil, nil } // func (c *MsgTool) AllConversationClearMsgAndFixSeq() { // ctx := mcontext.NewCtx(utils.GetSelfFuncName()) // log.ZInfo(ctx, "============================ start del cron task ============================") // conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx) // if err != nil { // log.ZError(ctx, "GetAllConversationIDs failed", err) // return // } // for _, conversationID := range conversationIDs { // conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID)) // } // c.ClearConversationsMsg(ctx, conversationIDs) // log.ZInfo(ctx, "============================ start del cron finished ============================") //} func (c *MsgTool) AllConversationClearMsgAndFixSeq() { ctx := mcontext.NewCtx(stringutil.GetSelfFuncName()) log.ZInfo(ctx, "============================ start del cron task ============================") num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx) if err != nil { log.ZError(ctx, "GetAllConversationIDsNumber failed", err) return } const batchNum = 50 log.ZDebug(ctx, "GetAllConversationIDsNumber", "num", num) if num == 0 { return } count := int(num/batchNum + num/batchNum/2) if count < 1 { count = 1 } maxPage := 1 + num/batchNum if num%batchNum != 0 { maxPage++ } for i := 0; i < count; i++ { pageNumber := rand.Int63() % maxPage pagination := &sdkws.RequestPagination{ PageNumber: int32(pageNumber), ShowNumber: batchNum, } conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination) if err != nil { log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) continue } log.ZDebug(ctx, "PageConversationIDs failed", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs) if len(conversationIDs) == 0 { continue } c.ClearConversationsMsg(ctx, conversationIDs) } log.ZInfo(ctx, "============================ start del cron finished ============================") } func (c *MsgTool) ClearConversationsMsg(ctx context.Context, conversationIDs []string) { for _, conversationID := range conversationIDs { if err := c.msgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, int64(c.config.CronTask.RetainChatRecords*24*60*60)); err != nil { log.ZError(ctx, "DeleteUserSuperGroupMsgsAndSetMinSeq failed", err, "conversationID", conversationID, "DBRetainChatRecords", c.config.CronTask.RetainChatRecords) } if err := c.checkMaxSeq(ctx, conversationID); err != nil { log.ZError(ctx, "fixSeq failed", err, "conversationID", conversationID) } } } func (c *MsgTool) checkMaxSeqWithMongo(ctx context.Context, conversationID string, maxSeqCache int64) error { minSeqMongo, maxSeqMongo, err := c.msgDatabase.GetMongoMaxAndMinSeq(ctx, conversationID) if err != nil { return err } if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 { err = fmt.Errorf("cache max seq and mongo max seq is diff > 10, maxSeqMongo:%d,minSeqMongo:%d,maxSeqCache:%d,conversationID:%s", maxSeqMongo, minSeqMongo, maxSeqCache, conversationID) return errs.Wrap(err) } return nil } func (c *MsgTool) checkMaxSeq(ctx context.Context, conversationID string) error { maxSeq, err := c.msgDatabase.GetMaxSeq(ctx, conversationID) if err != nil { if errs.Unwrap(err) == redis.Nil { return nil } return err } if err := c.checkMaxSeqWithMongo(ctx, conversationID, maxSeq); err != nil { return err } return nil } func (c *MsgTool) FixAllSeq(ctx context.Context) error { conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx) if err != nil { return err } for _, conversationID := range conversationIDs { conversationIDs = append(conversationIDs, conversationutil.GetNotificationConversationIDByConversationID(conversationID)) } for _, conversationID := range conversationIDs { if err := c.checkMaxSeq(ctx, conversationID); err != nil { log.ZWarn(ctx, "fixSeq failed", err, "conversationID", conversationID) } } fmt.Println("fix all seq finished") return nil }