From 9c76722c900a35bd88d54999d1b64f7381022cc6 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 23 Feb 2023 18:20:45 +0800 Subject: [PATCH] push --- cmd/cmdutils/main.go | 17 ++++ cmd/crontask/main.go | 12 ++- internal/crontask/cron_task.go | 43 --------- internal/{crontask => task}/clear_msg.go | 89 ++++++++++++++++--- internal/{crontask => task}/clear_msg_test.go | 2 +- internal/task/cron_task.go | 56 ++++++++++++ pkg/common/db/cache/redis.go | 26 +++--- pkg/common/db/controller/msg.go | 86 +++++++++--------- 8 files changed, 211 insertions(+), 120 deletions(-) create mode 100644 cmd/cmdutils/main.go delete mode 100644 internal/crontask/cron_task.go rename internal/{crontask => task}/clear_msg.go (58%) rename internal/{crontask => task}/clear_msg_test.go (99%) create mode 100644 internal/task/cron_task.go diff --git a/cmd/cmdutils/main.go b/cmd/cmdutils/main.go new file mode 100644 index 000000000..ded252795 --- /dev/null +++ b/cmd/cmdutils/main.go @@ -0,0 +1,17 @@ +package main + +import ( + "Open_IM/internal/task" + "flag" + "fmt" + "time" +) + +func main() { + var userID = flag.String("userID", "", "userID to clear msg and reset seq") + var workingGroupID = flag.String("workingGroupID", "", "workingGroupID to clear msg and reset seq") + var fixAllSeq = flag.Bool("fixAllSeq", false, "fix seq") + flag.Parse() + fmt.Println(time.Now(), "start cronTask", *userID, *workingGroupID) + task.FixSeq(*userID, *workingGroupID, *fixAllSeq) +} diff --git a/cmd/crontask/main.go b/cmd/crontask/main.go index 6dcc33ddc..c4ca2aab9 100644 --- a/cmd/crontask/main.go +++ b/cmd/crontask/main.go @@ -1,16 +1,14 @@ package main import ( - "Open_IM/internal/crontask" - "flag" + "Open_IM/internal/task" "fmt" "time" ) func main() { - var userID = flag.String("userID", "", "userID to clear msg and reset seq") - var workingGroupID = flag.String("workingGroupID", "", "workingGroupID to clear msg and reset seq") - flag.Parse() - fmt.Println(time.Now(), "start cronTask", *userID, *workingGroupID) - cronTask.StartCronTask(*userID, *workingGroupID) + fmt.Println(time.Now(), "start cronTask") + if err := task.StartCronTask(); err != nil { + panic(err.Error()) + } } diff --git a/internal/crontask/cron_task.go b/internal/crontask/cron_task.go deleted file mode 100644 index be1094661..000000000 --- a/internal/crontask/cron_task.go +++ /dev/null @@ -1,43 +0,0 @@ -package cronTask - -import ( - "Open_IM/pkg/common/config" - "Open_IM/pkg/common/log" - "Open_IM/pkg/utils" - "fmt" - "time" - - "github.com/robfig/cron/v3" -) - -const cronTaskOperationID = "cronTaskOperationID-" -const moduleName = "cron" - -func StartCronTask(userID, workingGroupID string) { - log.NewPrivateLog(moduleName) - log.NewInfo(utils.OperationIDGenerator(), "start cron task", "cron config", config.Config.Mongo.ChatRecordsClearTime) - fmt.Println("cron task start, config", config.Config.Mongo.ChatRecordsClearTime) - if userID != "" { - operationID := getCronTaskOperationID() - ClearUsersMsg(operationID, []string{userID}) - } - if workingGroupID != "" { - operationID := getCronTaskOperationID() - ClearSuperGroupMsg(operationID, []string{workingGroupID}) - } - if userID != "" || workingGroupID != "" { - fmt.Println("clear msg finished") - return - } - c := cron.New() - _, err := c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, ClearAll) - if err != nil { - fmt.Println("start cron failed", err.Error(), config.Config.Mongo.ChatRecordsClearTime) - panic(err) - } - c.Start() - fmt.Println("start cron task success") - for { - time.Sleep(10 * time.Second) - } -} diff --git a/internal/crontask/clear_msg.go b/internal/task/clear_msg.go similarity index 58% rename from internal/crontask/clear_msg.go rename to internal/task/clear_msg.go index d6e30f49b..7dfdb0a7b 100644 --- a/internal/crontask/clear_msg.go +++ b/internal/task/clear_msg.go @@ -1,4 +1,4 @@ -package cronTask +package task import ( "Open_IM/pkg/common/config" @@ -8,20 +8,22 @@ import ( "Open_IM/pkg/common/tracelog" "Open_IM/pkg/utils" "context" + "fmt" + "github.com/go-redis/redis/v8" "math" ) -type ClearMsgTool struct { +type msgTool struct { msgInterface controller.MsgDatabase userInterface controller.UserDatabase groupInterface controller.GroupDatabase } -func (c *ClearMsgTool) getCronTaskOperationID() string { +func (c *msgTool) getCronTaskOperationID() string { return cronTaskOperationID + utils.OperationIDGenerator() } -func (c *ClearMsgTool) ClearAll() { +func (c *msgTool) ClearAll() { operationID := c.getCronTaskOperationID() ctx := context.Background() tracelog.SetOperationID(ctx, operationID) @@ -43,7 +45,7 @@ func (c *ClearMsgTool) ClearAll() { log.NewInfo(operationID, "============================ start del cron finished ============================") } -func (c *ClearMsgTool) ClearUsersMsg(ctx context.Context, userIDList []string) { +func (c *msgTool) ClearUsersMsg(ctx context.Context, userIDList []string) { for _, userID := range userIDList { if err := c.msgInterface.DeleteUserMsgsAndSetMinSeq(ctx, userID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil { log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), userID) @@ -58,7 +60,7 @@ func (c *ClearMsgTool) ClearUsersMsg(ctx context.Context, userIDList []string) { } } -func (c *ClearMsgTool) ClearSuperGroupMsg(ctx context.Context, workingGroupIDList []string) { +func (c *msgTool) ClearSuperGroupMsg(ctx context.Context, workingGroupIDList []string) { for _, groupID := range workingGroupIDList { userIDs, err := c.groupInterface.FindGroupMemberUserID(ctx, groupID) if err != nil { @@ -73,16 +75,20 @@ func (c *ClearMsgTool) ClearSuperGroupMsg(ctx context.Context, workingGroupIDLis log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), "GetUserMinMaxSeqInMongoAndCache failed", groupID) continue } - //for _, userID := range userIDs { - // c.msgInterface.getgroup - // c.FixGroupUserSeq(ctx, userID, groupID, ) - // - //} + for _, userID := range userIDs { + minSeqCache, err := c.msgInterface.GetGroupUserMinSeq(ctx, groupID, userID) + if err != nil { + log.NewError(tracelog.GetOperationID(ctx), "GetGroupUserMinSeq failed", groupID, userID) + continue + } + c.FixGroupUserSeq(ctx, userID, groupID, minSeqCache, maxSeqCache) + + } c.CheckMaxSeqWithMongo(ctx, groupID, maxSeqCache, maxSeqMongo, constant.WriteDiffusion) } } -func (c *ClearMsgTool) FixUserSeq(ctx context.Context, userID string, minSeqCache, maxSeqCache int64) { +func (c *msgTool) FixUserSeq(ctx context.Context, userID string, minSeqCache, maxSeqCache int64) { if minSeqCache > maxSeqCache { if err := c.msgInterface.SetUserMinSeq(ctx, userID, maxSeqCache); err != nil { log.NewError(tracelog.GetOperationID(ctx), "SetUserMinSeq failed", userID, minSeqCache, maxSeqCache) @@ -92,7 +98,7 @@ func (c *ClearMsgTool) FixUserSeq(ctx context.Context, userID string, minSeqCach } } -func (c *ClearMsgTool) FixGroupUserSeq(ctx context.Context, userID string, groupID string, minSeqCache, maxSeqCache int64) { +func (c *msgTool) FixGroupUserSeq(ctx context.Context, userID string, groupID string, minSeqCache, maxSeqCache int64) { if minSeqCache > maxSeqCache { if err := c.msgInterface.SetGroupUserMinSeq(ctx, groupID, userID, maxSeqCache); err != nil { log.NewError(tracelog.GetOperationID(ctx), "SetGroupUserMinSeq failed", userID, minSeqCache, maxSeqCache) @@ -102,8 +108,63 @@ func (c *ClearMsgTool) FixGroupUserSeq(ctx context.Context, userID string, group } } -func (c *ClearMsgTool) CheckMaxSeqWithMongo(ctx context.Context, sourceID string, maxSeqCache, maxSeqMongo int64, diffusionType int) { +func (c *msgTool) CheckMaxSeqWithMongo(ctx context.Context, sourceID string, maxSeqCache, maxSeqMongo int64, diffusionType int) { if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 { log.NewWarn(tracelog.GetOperationID(ctx), "cache max seq and mongo max seq is diff > 10", sourceID, maxSeqCache, maxSeqMongo, diffusionType) } } + +func (c *msgTool) FixAllSeq(ctx context.Context) { + userIDs, err := c.userInterface.GetAllUserID(ctx) + if err != nil { + panic(err.Error()) + } + for _, userID := range userIDs { + userCurrentMinSeq, err := c.msgInterface.GetUserMinSeq(ctx, userID) + if err != nil && err != redis.Nil { + continue + } + userCurrentMaxSeq, err := c.msgInterface.GetUserMaxSeq(ctx, userID) + if err != nil && err != redis.Nil { + continue + } + if userCurrentMinSeq > userCurrentMaxSeq { + if err = c.msgInterface.SetUserMinSeq(ctx, userID, userCurrentMaxSeq); err != nil { + fmt.Println("SetUserMinSeq failed", userID, userCurrentMaxSeq) + } + fmt.Println("fix", userID, userCurrentMaxSeq) + } + } + fmt.Println("fix users seq success") + + groupIDs, err := c.groupInterface.GetGroupIDsByGroupType(ctx, constant.WorkingGroup) + if err != nil { + panic(err.Error()) + } + for _, groupID := range groupIDs { + maxSeq, err := c.msgInterface.GetGroupMaxSeq(ctx, groupID) + if err != nil { + fmt.Println("GetGroupMaxSeq failed", groupID) + continue + } + userIDs, err := c.groupInterface.FindGroupMemberUserID(ctx, groupID) + if err != nil { + fmt.Println("get groupID", groupID, "failed, try again later") + continue + } + for _, userID := range userIDs { + userMinSeq, err := c.msgInterface.GetGroupUserMinSeq(ctx, groupID, userID) + if err != nil && err != redis.Nil { + fmt.Println("GetGroupUserMinSeq failed", groupID, userID) + continue + } + if userMinSeq > maxSeq { + if err = c.msgInterface.SetGroupUserMinSeq(ctx, groupID, userID, maxSeq); err != nil { + fmt.Println("SetGroupUserMinSeq failed", err.Error(), groupID, userID, maxSeq) + } + fmt.Println("fix", groupID, userID, maxSeq, userMinSeq) + } + } + } + fmt.Println("fix all seq finished") +} diff --git a/internal/crontask/clear_msg_test.go b/internal/task/clear_msg_test.go similarity index 99% rename from internal/crontask/clear_msg_test.go rename to internal/task/clear_msg_test.go index be0be8197..fc91c2e82 100644 --- a/internal/crontask/clear_msg_test.go +++ b/internal/task/clear_msg_test.go @@ -1,4 +1,4 @@ -package cronTask +package task import ( "Open_IM/pkg/common/constant" diff --git a/internal/task/cron_task.go b/internal/task/cron_task.go new file mode 100644 index 000000000..0e2f4f6d8 --- /dev/null +++ b/internal/task/cron_task.go @@ -0,0 +1,56 @@ +package task + +import ( + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/log" + "Open_IM/pkg/common/tracelog" + "Open_IM/pkg/utils" + "context" + "fmt" + "time" + + "github.com/robfig/cron/v3" +) + +const cronTaskOperationID = "cronTaskOperationID-" +const moduleName = "cron" + +func StartCronTask() error { + log.NewPrivateLog(moduleName) + log.NewInfo(utils.OperationIDGenerator(), "start cron task", "cron config", config.Config.Mongo.ChatRecordsClearTime) + fmt.Println("cron task start, config", config.Config.Mongo.ChatRecordsClearTime) + clearCronTask := msgTool{} + ctx := context.Background() + operationID := clearCronTask.getCronTaskOperationID() + tracelog.SetOperationID(ctx, operationID) + c := cron.New() + _, err := c.AddFunc(config.Config.Mongo.ChatRecordsClearTime, clearCronTask.ClearAll) + if err != nil { + fmt.Println("start cron failed", err.Error(), config.Config.Mongo.ChatRecordsClearTime) + return err + } + c.Start() + fmt.Println("start cron task success") + for { + time.Sleep(10 * time.Second) + } +} + +func FixSeq(userID, workingGroupID string, fixAllSeq bool) { + log.NewPrivateLog(moduleName) + log.NewInfo(utils.OperationIDGenerator(), "start cron task", "cron config", config.Config.Mongo.ChatRecordsClearTime) + clearCronTask := msgTool{} + ctx := context.Background() + operationID := clearCronTask.getCronTaskOperationID() + tracelog.SetOperationID(ctx, operationID) + if userID != "" { + clearCronTask.ClearUsersMsg(ctx, []string{userID}) + } + if workingGroupID != "" { + clearCronTask.ClearSuperGroupMsg(ctx, []string{workingGroupID}) + } + if fixAllSeq { + clearCronTask.FixAllSeq(ctx) + } + fmt.Println("fix seq finished") +} diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/redis.go index 6448ff024..d6d3bc110 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/redis.go @@ -256,7 +256,7 @@ func (r *RedisClient) DeleteTokenByUidPid(ctx context.Context, userID string, pl return r.rdb.HDel(context.Background(), key, fields...).Err() } -func (r *RedisClient) GetMessagesBySeq(ctx context.Context, userID string, seqList []int64, operationID string) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err2 error) { +func (r *RedisClient) GetMessagesBySeq(ctx context.Context, userID string, seqList []int64, operationID string) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err2 error) { for _, v := range seqList { //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 key := messageCache + userID + "_" + strconv.Itoa(int(v)) @@ -265,25 +265,25 @@ func (r *RedisClient) GetMessagesBySeq(ctx context.Context, userID string, seqLi if err != redis.Nil { err2 = err } - failedSeqList = append(failedSeqList, v) + failedSeqs = append(failedSeqs, v) } else { msg := sdkws.MsgData{} err = jsonpb.UnmarshalString(result, &msg) if err != nil { err2 = err - failedSeqList = append(failedSeqList, v) + failedSeqs = append(failedSeqs, v) } else { - seqMsg = append(seqMsg, &msg) + seqMsgs = append(seqMsgs, &msg) } } } - return seqMsg, failedSeqList, err2 + return seqMsgs, failedSeqs, err2 } -func (r *RedisClient) SetMessageToCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ, uid string) (int, error) { +func (r *RedisClient) SetMessageToCache(ctx context.Context, userID string, msgs []*pbChat.MsgDataToMQ, uid string) (int, error) { pipe := r.rdb.Pipeline() - var failedList []pbChat.MsgDataToMQ - for _, msg := range msgList { + var failedMsgs []pbChat.MsgDataToMQ + for _, msg := range msgs { key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq)) s, err := utils.Pb2String(msg.MsgData) if err != nil { @@ -292,17 +292,17 @@ func (r *RedisClient) SetMessageToCache(ctx context.Context, userID string, msgL err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err() //err = r.rdb.HMSet(context.Background(), "12", map[string]interface{}{"1": 2, "343": false}).Err() if err != nil { - failedList = append(failedList, *msg) + failedMsgs = append(failedMsgs, *msg) } } - if len(failedList) != 0 { - return len(failedList), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList)) + if len(failedMsgs) != 0 { + return len(failedMsgs), errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList)) } _, err := pipe.Exec(ctx) return 0, err } -func (r *RedisClient) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbChat.MsgDataToMQ) error { - for _, msg := range msgList { +func (r *RedisClient) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbChat.MsgDataToMQ) error { + for _, msg := range msgs { key := messageCache + userID + "_" + strconv.Itoa(int(msg.MsgData.Seq)) err := r.rdb.Del(ctx, key).Err() if err != nil { diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 5c584e18b..997aca61c 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -114,7 +114,7 @@ import ( // return m.database.SetUserMinSeq(ctx, userID, minSeq) //} -type MsgDatabaseInterface interface { +type MsgDatabase interface { // 批量插入消息 BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error // 刪除redis中消息缓存 @@ -140,6 +140,8 @@ type MsgDatabaseInterface interface { GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) // 设置群用户最小seq 直接调用cache SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) + // + GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) // 设置用户最小seq 直接调用cache SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) @@ -162,97 +164,97 @@ type MsgDatabaseInterface interface { GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) } -type MsgDatabase struct { +type msgDatabase struct { mgo unRelationTb.MsgDocModelInterface cache cache.Cache msg unRelationTb.MsgDocModel } -func (db *MsgDatabase) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { +func (db *msgDatabase) JudgeMessageReactionEXISTS(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { //TODO implement me panic("implement me") } -func (db *MsgDatabase) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { +func (db *msgDatabase) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { //TODO implement me panic("implement me") } -func (db *MsgDatabase) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { +func (db *msgDatabase) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { //TODO implement me panic("implement me") } -func (db *MsgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) { +func (db *msgDatabase) GetExtendMsg(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) { //TODO implement me panic("implement me") } -func (db *MsgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error { +func (db *msgDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error { //TODO implement me panic("implement me") } -func (db *MsgDatabase) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { +func (db *msgDatabase) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { //TODO implement me panic("implement me") } -func (db *MsgDatabase) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { +func (db *msgDatabase) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { //TODO implement me panic("implement me") } -func (db *MsgDatabase) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { +func (db *msgDatabase) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { //TODO implement me panic("implement me") } -func (db *MsgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error { +func (db *msgDatabase) DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error { //TODO implement me panic("implement me") } -func (db *MsgDatabase) SetSendMsgStatus(ctx context.Context, userID string, status int32) error { +func (db *msgDatabase) SetSendMsgStatus(ctx context.Context, userID string, status int32) error { //TODO implement me panic("implement me") } -func (db *MsgDatabase) GetSendMsgStatus(ctx context.Context, userID string) (int32, error) { +func (db *msgDatabase) GetSendMsgStatus(ctx context.Context, userID string) (int32, error) { //TODO implement me panic("implement me") } -func (db *MsgDatabase) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error { +func (db *msgDatabase) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error { //TODO implement me panic("implement me") } -func (db *MsgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { +func (db *msgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { //TODO implement me panic("implement me") } -func (db *MsgDatabase) GetUserMinSeq(ctx context.Context, userID string) (int64, error) { +func (db *msgDatabase) GetUserMinSeq(ctx context.Context, userID string) (int64, error) { //TODO implement me panic("implement me") } -func (db *MsgDatabase) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { +func (db *msgDatabase) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { //TODO implement me panic("implement me") } -func (db *MsgDatabase) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) { +func (db *msgDatabase) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) { //TODO implement me panic("implement me") } -func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabaseInterface { - return &MsgDatabase{} +func NewMsgDatabase(mgo *mongo.Client, rdb redis.UniversalClient) MsgDatabase { + return &msgDatabase{} } -func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error { +func (db *msgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ, currentMaxSeq int64) error { //newTime := utils.GetCurrentTimestampByMill() if int64(len(msgList)) > db.msg.GetSingleGocMsgNum() { return errors.New("too large") @@ -336,11 +338,11 @@ func (db *MsgDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, return nil } -func (db *MsgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbMsg.MsgDataToMQ) error { +func (db *msgDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*pbMsg.MsgDataToMQ) error { return db.cache.DeleteMessageFromCache(ctx, userID, msgs) } -func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) { +func (db *msgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*pbMsg.MsgDataToMQ) (int64, error) { //newTime := utils.GetCurrentTimestampByMill() lenList := len(msgList) if int64(lenList) > db.msg.GetSingleGocMsgNum() { @@ -392,7 +394,7 @@ func (db *MsgDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID strin return lastMaxSeq, utils.Wrap(err, "") } -func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) { +func (db *msgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) { sortkeys.Int64s(seqs) docIDSeqsMap := db.msg.GetDocIDSeqsMap(userID, seqs) lock := sync.Mutex{} @@ -413,7 +415,7 @@ func (db *MsgDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []i return totalUnExistSeqs, nil } -func (db *MsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) { +func (db *msgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) { seqMsgs, indexes, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs) if err != nil { return nil, err @@ -426,7 +428,7 @@ func (db *MsgDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, s return unExistSeqs, nil } -func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) { +func (db *msgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) { doc, err := db.mgo.FindOneByDocID(ctx, docID) if err != nil { return nil, nil, nil, err @@ -457,7 +459,7 @@ func (db *MsgDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID s return seqMsgs, indexes, unExistSeqs, nil } -func (db *MsgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) { +func (db *msgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) { msgInfo, err := db.mgo.GetNewestMsg(ctx, sourceID) if err != nil { return nil, err @@ -465,7 +467,7 @@ func (db *MsgDatabase) GetNewestMsg(ctx context.Context, sourceID string) (msgPb return db.unmarshalMsg(msgInfo) } -func (db *MsgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) { +func (db *msgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb *sdkws.MsgData, err error) { msgInfo, err := db.mgo.GetOldestMsg(ctx, sourceID) if err != nil { return nil, err @@ -473,7 +475,7 @@ func (db *MsgDatabase) GetOldestMsg(ctx context.Context, sourceID string) (msgPb return db.unmarshalMsg(msgInfo) } -func (db *MsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) { +func (db *msgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) { msgPb = &sdkws.MsgData{} err = proto.Unmarshal(msgInfo.Msg, msgPb) if err != nil { @@ -482,7 +484,7 @@ func (db *MsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb * return msgPb, nil } -func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []int64, diffusionType int) (seqMsg []*sdkws.MsgData, err error) { +func (db *msgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs []int64, diffusionType int) (seqMsg []*sdkws.MsgData, err error) { var hasSeqs []int64 singleCount := 0 m := db.msg.GetDocIDSeqsMap(sourceID, seqs) @@ -523,7 +525,7 @@ func (db *MsgDatabase) getMsgBySeqs(ctx context.Context, sourceID string, seqs [ return seqMsg, nil } -func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { +func (db *msgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, userID, seqs) if err != nil { if err != redis.Nil { @@ -544,7 +546,7 @@ func (db *MsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []i return successMsgs, nil } -func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { +func (db *msgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) { successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, groupID, seqs) if err != nil { if err != redis.Nil { @@ -565,7 +567,7 @@ func (db *MsgDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID strin return successMsgs, nil } -func (db *MsgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error { +func (db *msgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error { err := db.DeleteUserMsgsAndSetMinSeq(ctx, userID, 0) if err != nil { return err @@ -574,7 +576,7 @@ func (db *MsgDatabase) CleanUpUserMsg(ctx context.Context, userID string) error return utils.Wrap(err, "") } -func (db *MsgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error { +func (db *msgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error { var delStruct delMsgRecursionStruct minSeq, err := db.deleteMsgRecursion(ctx, groupID, unRelationTb.OldestList, &delStruct, remainTime) if err != nil { @@ -602,7 +604,7 @@ func (db *MsgDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, return nil } -func (db *MsgDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error { +func (db *msgDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error { var delStruct delMsgRecursionStruct minSeq, err := db.deleteMsgRecursion(ctx, userID, unRelationTb.OldestList, &delStruct, remainTime) if err != nil { @@ -628,7 +630,7 @@ func (d *delMsgRecursionStruct) getSetMinSeq() int64 { // seq 70 // set minSeq 21 // recursion 删除list并且返回设置的最小seq -func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) { +func (db *msgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) { // find from oldest list msgs, err := db.mgo.GetMsgsByIndex(ctx, sourceID, index) if err != nil || msgs.DocID == "" { @@ -690,10 +692,10 @@ func (db *MsgDatabase) deleteMsgRecursion(ctx context.Context, sourceID string, //log.NewDebug(operationID, sourceID, "continue to", delStruct) // 继续递归 index+1 seq, err := db.deleteMsgRecursion(ctx, sourceID, index+1, delStruct, remainTime) - return seq, utils.Wrap(err, "deleteMsg failed") + return seq, err } -func (db *MsgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { +func (db *msgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, userID) if err != nil { return 0, 0, 0, 0, err @@ -710,7 +712,7 @@ func (db *MsgDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, user return } -func (db *MsgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) { +func (db *msgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) { minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, groupID) if err != nil { return 0, 0, 0, err @@ -722,7 +724,7 @@ func (db *MsgDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context return } -func (db *MsgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo int64, err error) { +func (db *msgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) (minSeqMongo, maxSeqMongo int64, err error) { oldestMsgMongo, err := db.mgo.GetOldestMsg(ctx, sourceID) if err != nil { return 0, 0, err @@ -744,10 +746,10 @@ func (db *MsgDatabase) GetMinMaxSeqMongo(ctx context.Context, sourceID string) ( return } -func (db *MsgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { +func (db *msgDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { return db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq) } -func (db *MsgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { +func (db *msgDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { return db.cache.SetUserMinSeq(ctx, userID, minSeq) }