From bead363df14ee92afd775fd995c5dc56980bbd5b Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Thu, 1 Aug 2024 14:25:07 +0800 Subject: [PATCH] feat: implement scheduled destruct msgs feature in cron task. --- go.mod | 2 +- go.sum | 4 +- internal/rpc/conversation/conversaion.go | 90 +++++++++++++++++++----- internal/rpc/msg/clear.go | 72 +++++++++++++++++-- internal/rpc/msg/server.go | 5 ++ internal/tools/cron_task.go | 54 +++++++++++--- pkg/common/convert/conversation.go | 4 +- pkg/rpcclient/conversation.go | 10 ++- 8 files changed, 206 insertions(+), 35 deletions(-) diff --git a/go.mod b/go.mod index 1a1cf36d2..49254097a 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.69-alpha.47 + github.com/openimsdk/protocol v0.0.69-alpha.50 github.com/openimsdk/tools v0.0.49-alpha.55 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 815afe8d2..ac525b2db 100644 --- a/go.sum +++ b/go.sum @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.69-alpha.47 h1:WEpU7dHSzcpiyPoUkgSt1mC9HfQ6xSDNNZf4KWbZiFI= -github.com/openimsdk/protocol v0.0.69-alpha.47/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.69-alpha.50 h1:4r6vY9LsjFrR8AAwORFhijOGmq2vzDH3XTX4wBiw+2M= +github.com/openimsdk/protocol v0.0.69-alpha.50/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k= github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 3047c376b..0d21b3299 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -16,13 +16,16 @@ package conversation import ( "context" + "sort" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" - tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/tools/db/redisutil" - "sort" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" @@ -40,10 +43,11 @@ import ( ) type conversationServer struct { - msgRpcClient *rpcclient.MessageRpcClient - user *rpcclient.UserRpcClient - groupRpcClient *rpcclient.GroupRpcClient - conversationDatabase controller.ConversationDatabase + msgRpcClient *rpcclient.MessageRpcClient + user *rpcclient.UserRpcClient + groupRpcClient *rpcclient.GroupRpcClient + conversationDatabase controller.ConversationDatabase + conversationNotificationSender *ConversationNotificationSender config *Config } @@ -204,11 +208,11 @@ func (c *conversationServer) getConversations(ctx context.Context, ownerUserID s } func (c *conversationServer) SetConversation(ctx context.Context, req *pbconversation.SetConversationReq) (*pbconversation.SetConversationResp, error) { - var conversation tablerelation.Conversation + var conversation dbModel.Conversation if err := datautil.CopyStructFields(&conversation, req.Conversation); err != nil { return nil, err } - err := c.conversationDatabase.SetUserConversations(ctx, req.Conversation.OwnerUserID, []*tablerelation.Conversation{&conversation}) + err := c.conversationDatabase.SetUserConversations(ctx, req.Conversation.OwnerUserID, []*dbModel.Conversation{&conversation}) if err != nil { return nil, err } @@ -232,7 +236,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver } } var unequal int - var conv tablerelation.Conversation + var conv dbModel.Conversation if len(req.UserIDs) == 1 { cs, err := c.conversationDatabase.FindConversations(ctx, req.UserIDs[0], []string{req.Conversation.ConversationID}) if err != nil { @@ -243,7 +247,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver } conv = *cs[0] } - var conversation tablerelation.Conversation + var conversation dbModel.Conversation conversation.ConversationID = req.Conversation.ConversationID conversation.ConversationType = req.Conversation.ConversationType conversation.UserID = req.Conversation.UserID @@ -292,7 +296,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver } } if req.Conversation.IsPrivateChat != nil && req.Conversation.ConversationType != constant.ReadGroupChatType { - var conversations []*tablerelation.Conversation + var conversations []*dbModel.Conversation for _, ownerUserID := range req.UserIDs { conversation2 := conversation conversation2.OwnerUserID = ownerUserID @@ -340,12 +344,12 @@ func (c *conversationServer) CreateSingleChatConversations(ctx context.Context, ) (*pbconversation.CreateSingleChatConversationsResp, error) { switch req.ConversationType { case constant.SingleChatType: - var conversation tablerelation.Conversation + var conversation dbModel.Conversation conversation.ConversationID = req.ConversationID conversation.ConversationType = req.ConversationType conversation.OwnerUserID = req.SendID conversation.UserID = req.RecvID - err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation}) + err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation}) if err != nil { log.ZWarn(ctx, "create conversation failed", err, "conversation", conversation) } @@ -353,17 +357,17 @@ func (c *conversationServer) CreateSingleChatConversations(ctx context.Context, conversation2 := conversation conversation2.OwnerUserID = req.RecvID conversation2.UserID = req.SendID - err = c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation2}) + err = c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation2}) if err != nil { log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation) } case constant.NotificationChatType: - var conversation tablerelation.Conversation + var conversation dbModel.Conversation conversation.ConversationID = req.ConversationID conversation.ConversationType = req.ConversationType conversation.OwnerUserID = req.RecvID conversation.UserID = req.SendID - err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation}) + err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation}) if err != nil { log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation) } @@ -584,6 +588,9 @@ func (c *conversationServer) UpdateConversation(ctx context.Context, req *pbconv if req.MaxSeq != nil { m["max_seq"] = req.MaxSeq.Value } + if req.LatestMsgDestructTime != nil { + m["latest_msg_destruct_time"] = time.UnixMilli(req.LatestMsgDestructTime.Value) + } if len(m) > 0 { if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.UserIDs, req.ConversationID, m); err != nil { return nil, err @@ -602,3 +609,54 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco Conversations: convert.ConversationsDB2Pb(conversations), }, nil } + +func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Context, _ *pbconversation.GetConversationsNeedDestructMsgsReq) (*pbconversation.GetConversationsNeedDestructMsgsResp, error) { + log.ZInfo(ctx, "ConversationDestructMsgs cron start") + num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx) + if err != nil { + log.ZError(ctx, "GetAllConversationIDsNumber failed", err) + return nil, err + } + const batchNum = 100 + + if num == 0 { + return nil, errs.New("Need Destruct Msg is nil").Wrap() + } + + maxPage := (num + batchNum - 1) / batchNum + + temp := make([]*model.Conversation, 0, maxPage*batchNum) + + for pageNumber := 0; pageNumber < int(maxPage); pageNumber++ { + pagination := &sdkws.RequestPagination{ + PageNumber: int32(pageNumber), + ShowNumber: batchNum, + } + + conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination) + if err != nil { + log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) + continue + } + + log.ZDebug(ctx, "PageConversationIDs success", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs) + if len(conversationIDs) == 0 { + continue + } + + conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, conversationIDs) + if err != nil { + log.ZError(ctx, "GetConversationsByConversationID failed", err, "conversationIDs", conversationIDs) + continue + } + + for _, conversation := range conversations { + if conversation.IsMsgDestruct && conversation.MsgDestructTime != 0 && ((time.Now().UnixMilli() > (conversation.MsgDestructTime + conversation.LatestMsgDestructTime.UnixMilli() + 8*60*60)) || // 8*60*60 is UTC+8 + conversation.LatestMsgDestructTime.IsZero()) { + temp = append(temp, conversation) + } + } + } + + return &pbconversation.GetConversationsNeedDestructMsgsResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil +} diff --git a/internal/rpc/msg/clear.go b/internal/rpc/msg/clear.go index 774eae32c..151dab861 100644 --- a/internal/rpc/msg/clear.go +++ b/internal/rpc/msg/clear.go @@ -2,16 +2,25 @@ package msg import ( "context" + "strings" + "sync" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" + "github.com/openimsdk/open-im-server/v3/pkg/common/convert" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/protocol/conversation" + pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/wrapperspb" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" - "strings" - "time" + "github.com/openimsdk/tools/mcontext" + "github.com/openimsdk/tools/utils/idutil" + "github.com/openimsdk/tools/utils/stringutil" ) +// hard delete in Database. func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) { if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil { return nil, err @@ -26,17 +35,20 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. ) clearMsg := func(ctx context.Context) (bool, error) { conversationSeqs := make(map[string]struct{}) + + // update latest msg destruct time in conversation DB. defer func() { req := &conversation.UpdateConversationReq{ - MsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()), + LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()), } for conversationID := range conversationSeqs { req.ConversationID = conversationID - if err := m.Conversation.UpdateConversations(ctx, req); err != nil { + if err := m.Conversation.UpdateConversation(ctx, req); err != nil { log.ZError(ctx, "update conversation max seq failed", err, "conversationID", conversationID, "msgDestructTime", req.MsgDestructTime) } } }() + msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, 100) if err != nil { return false, err @@ -61,6 +73,7 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. } return true, nil } + for { keep, err := clearMsg(ctx) if err != nil { @@ -75,3 +88,54 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. } return &msg.ClearMsgResp{}, nil } + +// soft delete for self +func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) { + temp := convert.ConversationsPb2DB(req.Conversations) + + batchNum := 100 + var wg sync.WaitGroup + wg.Add((len(temp) + batchNum - 1) / batchNum) + + for i := 0; i < len(temp); i += batchNum { + batch := temp[i:min(i+batchNum, len(temp))] + + go func(batch []*model.Conversation) { + defer wg.Done() + + for _, conversation := range temp { + handleCtx := mcontext.NewCtx(stringutil.GetSelfFuncName() + "-" + idutil.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID) + log.ZDebug(handleCtx, "User MsgsDestruct", + "conversationID", conversation.ConversationID, + "ownerUserID", conversation.OwnerUserID, + "msgDestructTime", conversation.MsgDestructTime, + "lastMsgDestructTime", conversation.LatestMsgDestructTime) + + seqs, err := m.MsgDatabase.UserMsgsDestruct(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime) + if err != nil { + log.ZError(handleCtx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) + continue + } + + if len(seqs) > 0 { + if err := m.Conversation.UpdateConversation(handleCtx, + &pbconversation.UpdateConversationReq{ + UserIDs: []string{conversation.OwnerUserID}, + ConversationID: conversation.ConversationID, + LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli())}); err != nil { + log.ZError(handleCtx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) + continue + } + + // m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs) + } + } + }(batch) + + } + wg.Wait() + + return nil, nil +} + + diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index de0f698ea..91f41f1b1 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -16,6 +16,7 @@ package msg import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" @@ -50,6 +51,7 @@ type ( ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data. Handlers MessageInterceptorChain // Chain of handlers for processing messages. notificationSender *rpcclient.NotificationSender // RPC client for sending notifications. + msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications. config *Config // Global configuration settings. webhookClient *webhook.Client } @@ -117,7 +119,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg } s.notificationSender = rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithLocalSendMsg(s.SendMsg)) + s.msgNotificationSender = NewMsgNotificationSender(config, rpcclient.WithLocalSendMsg(s.SendMsg)) + msg.RegisterMsgServer(server, s) + return nil } diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 1ef4943cd..ffa54709d 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -17,16 +17,19 @@ package tools import ( "context" "fmt" + "os" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/protocol/third" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mw" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "os" - "time" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" @@ -50,18 +53,34 @@ func Start(ctx context.Context, config *CronTaskConfig) error { } client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0]) - conn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg) + + msgConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg) if err != nil { return err } - cli := msg.NewMsgClient(conn) + + thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) + if err != nil { + return err + } + + conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation) + if err != nil { + return err + } + + msgClient := msg.NewMsgClient(msgConn) + conversationClient := pbconversation.NewConversationClient(conversationConn) + thirdClient := third.NewThirdClient(thirdConn) + crontab := cron.New() + clearFunc := func() { now := time.Now() deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords)) ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli())) log.ZInfo(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli()) - if _, err := cli.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil { + if _, err := msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil { log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now)) return } @@ -71,11 +90,27 @@ func Start(ctx context.Context, config *CronTaskConfig) error { return errs.Wrap(err) } - tConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) - if err != nil { - return err + msgDestructFunc := func() { + now := time.Now() + ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli())) + log.ZInfo(ctx, "msg destruct ", "now", now) + + conversations, err := conversationClient.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{}) + if err != nil { + log.ZError(ctx, "Get conversation need Destruct msgs failed.", err) + return + } else { + _, err := msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Conversations: conversations.Conversations}) + if err != nil { + log.ZError(ctx, "Destruct Msgs failed.", err) + return + } + } + log.ZInfo(ctx, "msg destruct cron task completed", "cont", time.Since(now)) + } + if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, msgDestructFunc); err != nil { + return errs.Wrap(err) } - thirdClient := third.NewThirdClient(tConn) deleteFunc := func() { now := time.Now() @@ -91,6 +126,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error { if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteFunc); err != nil { return errs.Wrap(err) } + log.ZInfo(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) crontab.Start() <-ctx.Done() diff --git a/pkg/common/convert/conversation.go b/pkg/common/convert/conversation.go index a76d7d9f6..9389b0252 100644 --- a/pkg/common/convert/conversation.go +++ b/pkg/common/convert/conversation.go @@ -22,7 +22,7 @@ import ( func ConversationDB2Pb(conversationDB *model.Conversation) *conversation.Conversation { conversationPB := &conversation.Conversation{} - conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.Unix() + conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.UnixMilli() if err := datautil.CopyStructFields(conversationPB, conversationDB); err != nil { return nil } @@ -35,7 +35,7 @@ func ConversationsDB2Pb(conversationsDB []*model.Conversation) (conversationsPB if err := datautil.CopyStructFields(conversationPB, conversationDB); err != nil { continue } - conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.Unix() + conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.UnixMilli() conversationsPB = append(conversationsPB, conversationPB) } return conversationsPB diff --git a/pkg/rpcclient/conversation.go b/pkg/rpcclient/conversation.go index e078f432b..8f95f86a6 100644 --- a/pkg/rpcclient/conversation.go +++ b/pkg/rpcclient/conversation.go @@ -82,7 +82,7 @@ func (c *ConversationRpcClient) SetConversations(ctx context.Context, userIDs [] return err } -func (c *ConversationRpcClient) UpdateConversations(ctx context.Context, conversation *pbconversation.UpdateConversationReq) error { +func (c *ConversationRpcClient) UpdateConversation(ctx context.Context, conversation *pbconversation.UpdateConversationReq) error { _, err := c.Client.UpdateConversation(ctx, conversation) return err } @@ -146,3 +146,11 @@ func (c *ConversationRpcClient) GetConversationNotReceiveMessageUserIDs(ctx cont } return resp.UserIDs, nil } + +func (c *ConversationRpcClient) GetConversationsNeedDestructMsgs(ctx context.Context) ([]*pbconversation.Conversation, error) { + resp, err := c.Client.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{}) + if err != nil { + return nil, err + } + return resp.Conversations, nil +}