From 2039b760b11528998bce77dce88aa62fd3fadb84 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Tue, 23 May 2023 11:54:10 +0800 Subject: [PATCH 1/2] cron --- internal/tools/msg.go | 14 ++++++----- pkg/common/db/controller/conversation.go | 32 ++++++++++++------------ 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/internal/tools/msg.go b/internal/tools/msg.go index 83c4b9c90..a7d935af4 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -19,18 +19,19 @@ import ( type MsgTool struct { msgDatabase controller.CommonMsgDatabase - conversationDatabase controller.ConversationDataBase + conversationDatabase controller.ConversationDatabase userDatabase controller.UserDatabase groupDatabase controller.GroupDatabase } var errSeq = errors.New("cache max seq and mongo max seq is diff > 10") -func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controller.UserDatabase, groupDatabase controller.GroupDatabase) *MsgTool { +func NewMsgTool(msgDatabase controller.CommonMsgDatabase, userDatabase controller.UserDatabase, groupDatabase controller.GroupDatabase, conversationDatabase controller.ConversationDatabase) *MsgTool { return &MsgTool{ - msgDatabase: msgDatabase, - userDatabase: userDatabase, - groupDatabase: groupDatabase, + msgDatabase: msgDatabase, + userDatabase: userDatabase, + groupDatabase: groupDatabase, + conversationDatabase: conversationDatabase, } } @@ -51,7 +52,8 @@ func InitMsgTool() (*MsgTool, error) { msgDatabase := controller.InitCommonMsgDatabase(rdb, mongo.GetDatabase()) userDatabase := controller.NewUserDatabase(userDB, cache.NewUserCacheRedis(rdb, relation.NewUserGorm(db), cache.GetDefaultOpt()), tx.NewGorm(db)) groupDatabase := controller.InitGroupDatabase(db, rdb, mongo.GetDatabase()) - msgTool := NewMsgTool(msgDatabase, userDatabase, groupDatabase) + conversationDatabase := controller.NewConversationDatabase(relation.NewConversationGorm(db), cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), relation.NewConversationGorm(db)), tx.NewGorm(db)) + msgTool := NewMsgTool(msgDatabase, userDatabase, groupDatabase, conversationDatabase) return msgTool, nil } diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index 22e4b8dff..dccb9f4b8 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -36,20 +36,20 @@ type ConversationDatabase interface { } func NewConversationDatabase(conversation relationTb.ConversationModelInterface, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { - return &ConversationDataBase{ + return &conversationDatabase{ conversationDB: conversation, cache: cache, tx: tx, } } -type ConversationDataBase struct { +type conversationDatabase struct { conversationDB relationTb.ConversationModelInterface cache cache.ConversationCache tx tx.Tx } -func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) (err error) { +func (c *conversationDatabase) SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) (err error) { cache := c.cache.NewCache() if err := c.tx.Transaction(func(tx any) error { conversationTx := c.conversationDB.NewTx(tx) @@ -95,7 +95,7 @@ func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, return cache.ExecDel(ctx) } -func (c *ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) error { +func (c *conversationDatabase) UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) error { _, err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args) if err != nil { return err @@ -103,7 +103,7 @@ func (c *ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, return c.cache.DelUsersConversation(conversationID, userIDs...).ExecDel(ctx) } -func (c *ConversationDataBase) CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error { +func (c *conversationDatabase) CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error { if err := c.conversationDB.Create(ctx, conversations); err != nil { return err } @@ -114,7 +114,7 @@ func (c *ConversationDataBase) CreateConversation(ctx context.Context, conversat return c.cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).ExecDel(ctx) } -func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationTb.ConversationModel) error { +func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationTb.ConversationModel) error { cache := c.cache.NewCache() if err := c.tx.Transaction(func(tx any) error { conversationTx := c.conversationDB.NewTx(tx) @@ -150,19 +150,19 @@ func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Con return c.cache.ExecDel(ctx) } -func (c *ConversationDataBase) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) { +func (c *conversationDatabase) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) { return c.cache.GetConversations(ctx, ownerUserID, conversationIDs) } -func (c *ConversationDataBase) GetConversation(ctx context.Context, ownerUserID string, conversationID string) (*relationTb.ConversationModel, error) { +func (c *conversationDatabase) GetConversation(ctx context.Context, ownerUserID string, conversationID string) (*relationTb.ConversationModel, error) { return c.cache.GetConversation(ctx, ownerUserID, conversationID) } -func (c *ConversationDataBase) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) { +func (c *conversationDatabase) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) { return c.cache.GetUserAllConversations(ctx, ownerUserID) } -func (c *ConversationDataBase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error { +func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error { cache := c.cache.NewCache() if err := c.tx.Transaction(func(tx any) error { var conversationIDs []string @@ -208,11 +208,11 @@ func (c *ConversationDataBase) SetUserConversations(ctx context.Context, ownerUs return cache.ExecDel(ctx) } -func (c *ConversationDataBase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) { +func (c *conversationDatabase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) { return c.cache.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) } -func (c *ConversationDataBase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error { +func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error { cache := c.cache.NewCache() conversationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID) if err := c.tx.Transaction(func(tx any) error { @@ -247,18 +247,18 @@ func (c *ConversationDataBase) CreateGroupChatConversation(ctx context.Context, return cache.ExecDel(ctx) } -func (c *ConversationDataBase) GetConversationIDs(ctx context.Context, userID string) ([]string, error) { +func (c *conversationDatabase) GetConversationIDs(ctx context.Context, userID string) ([]string, error) { return c.cache.GetUserConversationIDs(ctx, userID) } -func (c *ConversationDataBase) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) { +func (c *conversationDatabase) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) { return c.cache.GetUserConversationIDsHash(ctx, ownerUserID) } -func (c *ConversationDataBase) GetAllConversationIDs(ctx context.Context) ([]string, error) { +func (c *conversationDatabase) GetAllConversationIDs(ctx context.Context) ([]string, error) { return c.conversationDB.GetAllConversationIDs(ctx) } -func (c *ConversationDataBase) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) { +func (c *conversationDatabase) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) { return c.cache.GetUserAllHasReadSeqs(ctx, ownerUserID) } From ba3aa74469e4a4756e373f6fac35688e46695ab9 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Tue, 23 May 2023 14:52:44 +0800 Subject: [PATCH 2/2] cron --- internal/tools/cron_task.go | 2 -- pkg/common/db/controller/msg.go | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index fa19bffe0..c55a0d046 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -10,8 +10,6 @@ import ( "github.com/robfig/cron/v3" ) -const moduleName = "cron" - func StartCronTask() error { log.ZInfo(context.Background(), "start cron task", "cron config", config.Config.Mongo.ChatRecordsClearTime) fmt.Println("cron task start, config", config.Config.Mongo.ChatRecordsClearTime) diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 5fbf6fedb..1c6b03643 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -568,7 +568,7 @@ func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversatio } return delStruct.getSetMinSeq() + 1, nil } - log.ZDebug(ctx, "conversationID", conversationID, "index:", index, "docID", msgs.DocID, "len", len(msgs.Msg)) + log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgs.DocID, "len", len(msgs.Msg)) if int64(len(msgs.Msg)) > db.msg.GetSingleGocMsgNum() { log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgs.Msg), "docID:", msgs.DocID) }