From c4a1649aaefd0908c02b34243e25e179b42ee50b Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Sat, 30 Aug 2025 16:46:00 +0800 Subject: [PATCH] feat: implement DeleteConversations interface. --- go.mod | 2 +- go.sum | 4 +- internal/api/conversation.go | 4 ++ internal/api/router.go | 1 + .../msgtransfer/online_history_msg_handler.go | 2 + internal/rpc/conversation/conversation.go | 69 ++++++++++++++++++- internal/rpc/conversation/notification.go | 9 +++ pkg/common/storage/controller/conversation.go | 20 ++++++ pkg/common/storage/database/conversation.go | 1 + .../storage/database/mgo/conversation.go | 17 +++++ 10 files changed, 125 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 775765706..dac5cbf35 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.73-alpha.12 + github.com/openimsdk/protocol v0.0.73-alpha.17 github.com/openimsdk/tools v0.0.50-alpha.97 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 329a916ec..42cb83c6d 100644 --- a/go.sum +++ b/go.sum @@ -347,8 +347,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FAx+o5El9U/Bw= github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk= -github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= +github.com/openimsdk/protocol v0.0.73-alpha.17 h1:ddo0QMns1GVwAmrPIPlAQ7uKmThAYLnOt+CIOgLsJyE= +github.com/openimsdk/protocol v0.0.73-alpha.17/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= github.com/openimsdk/tools v0.0.50-alpha.97 h1:6ik5w3PpgDG6VjSo3nb3FT/fxN3JX7iIARVxVu9g7VY= github.com/openimsdk/tools v0.0.50-alpha.97/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= diff --git a/internal/api/conversation.go b/internal/api/conversation.go index 5a191c0ec..78affee44 100644 --- a/internal/api/conversation.go +++ b/internal/api/conversation.go @@ -76,3 +76,7 @@ func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) { func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) { a2r.Call(c, conversation.ConversationClient.UpdateConversationsByUser, o.Client) } + +func (o *ConversationApi) DeleteConversations(c *gin.Context) { + a2r.Call(c, conversation.ConversationClient.DeleteConversations, o.Client) +} diff --git a/internal/api/router.go b/internal/api/router.go index 1d3a92dd7..15bbeb053 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -277,6 +277,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation) conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs) conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs) + conversationGroup.POST("/delete_conversations", c.DeleteConversations) } { diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 8b212774a..ec182ea73 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -280,6 +280,8 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key } if isNewConversation { + // 需要每一条消息 都去检查了 有没有会话 + switch msg.SessionType { case constant.ReadGroupChatType: log.ZDebug(ctx, "group chat first create conversation", "conversationID", diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index c9bfd3c56..303a4ecca 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -37,6 +37,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/protocol/constant" pbconversation "github.com/openimsdk/protocol/conversation" + "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" @@ -795,7 +796,7 @@ func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req * } latestMsgDestructTime := time.UnixMilli(req.Timestamp) for i, conversation := range conversations { - if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 { + if !conversation.IsMsgDestruct || conversation.MsgDestructTime == 0 { continue } seq, err := c.msgClient.GetLastMessageSeqByTime(ctx, conversation.ConversationID, req.Timestamp-(conversation.MsgDestructTime*1000)) @@ -835,3 +836,69 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID}) return nil } + +func (c *conversationServer) DeleteConversations(ctx context.Context, req *pbconversation.DeleteConversationsReq) (resp *pbconversation.DeleteConversationsResp, err error) { + if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil { + return nil, err + } + if req.NeedDeleteTime == 0 { + return nil, errs.ErrArgs.WrapMsg("need_delete_time need be set") + } + + deleteTimeThreshold := time.Now().AddDate(0, 0, -int(req.NeedDeleteTime)).UnixMilli() + + var conversationIDs []string + if len(req.ConversationIDs) == 0 { + conversationIDs, err = c.conversationDatabase.GetConversationIDs(ctx, req.OwnerUserID) + if err != nil { + return nil, err + } + } else { + conversationIDs = req.ConversationIDs + } + + // Check Conversation have a specific status + conversations, err := c.conversationDatabase.FindConversations(ctx, req.OwnerUserID, conversationIDs) + if err != nil { + return nil, err + } + + if len(conversations) == 0 { + return nil, errs.ErrRecordNotFound.Wrap() + } + + needCheckConversationIDs := make([]string, 0, len(conversations)) + + for _, conversation := range conversations { + if conversation.IsPinned { + continue + } + needCheckConversationIDs = append(needCheckConversationIDs, conversation.ConversationID) + } + + latestMsgs, err := c.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{ + UserID: req.OwnerUserID, + ConversationIDs: needCheckConversationIDs, + }) + if err != nil { + return nil, err + } + + var needDeleteConversationIDs []string + for conversationID, msg := range latestMsgs.Msgs { + if msg.SendTime < deleteTimeThreshold { + needDeleteConversationIDs = append(needDeleteConversationIDs, conversationID) + } + } + + if len(needDeleteConversationIDs) == 0 { + return &pbconversation.DeleteConversationsResp{}, nil + } + + if err := c.conversationDatabase.DeleteUsersConversations(ctx, req.OwnerUserID, needDeleteConversationIDs); err != nil { + return nil, err + } + c.conversationNotificationSender.ConversationDeleteNotification(ctx, req.OwnerUserID, needDeleteConversationIDs) + + return &pbconversation.DeleteConversationsResp{}, nil +} diff --git a/internal/rpc/conversation/notification.go b/internal/rpc/conversation/notification.go index 6e865ba42..aa7b7d227 100644 --- a/internal/rpc/conversation/notification.go +++ b/internal/rpc/conversation/notification.go @@ -73,3 +73,12 @@ func (c *ConversationNotificationSender) ConversationUnreadChangeNotification( c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, tips) } + +func (c *ConversationNotificationSender) ConversationDeleteNotification(ctx context.Context, userID string, conversationIDs []string) { + tips := &sdkws.ConversationDeleteTips{ + UserID: userID, + ConversationIDs: conversationIDs, + } + + c.Notification(ctx, userID, userID, constant.ConversationDeleteNotification, tips) +} diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index 27442ca66..b71d5dbbc 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -78,6 +78,8 @@ type ConversationDatabase interface { GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) // FindRandConversation finds random conversations based on the specified timestamp and limit. FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) + + DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) } func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { @@ -429,3 +431,21 @@ func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, use func (c *conversationDatabase) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) { return c.conversationDB.FindRandConversation(ctx, ts, limit) } + +func (c *conversationDatabase) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) { + return c.tx.Transaction(ctx, func(ctx context.Context) error { + err = c.conversationDB.DeleteUsersConversations(ctx, userID, conversationIDs) + if err != nil { + return err + } + cache := c.cache.CloneConversationCache() + cache = cache.DelConversations(userID, conversationIDs...). + DelConversationVersionUserIDs(userID). + DelConversationIDs(userID). + DelUserConversationIDsHash(userID). + DelConversationNotNotifyMessageUserIDs(userID). + DelConversationPinnedMessageUserIDs(userID) + + return cache.ChainExecDel(ctx) + }) +} diff --git a/pkg/common/storage/database/conversation.go b/pkg/common/storage/database/conversation.go index d612dfc2d..562782346 100644 --- a/pkg/common/storage/database/conversation.go +++ b/pkg/common/storage/database/conversation.go @@ -44,4 +44,5 @@ type Conversation interface { GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error) + DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) } diff --git a/pkg/common/storage/database/mgo/conversation.go b/pkg/common/storage/database/mgo/conversation.go index 89f13ea3d..d5ab046aa 100644 --- a/pkg/common/storage/database/mgo/conversation.go +++ b/pkg/common/storage/database/mgo/conversation.go @@ -308,3 +308,20 @@ func (c *ConversationMgo) FindRandConversation(ctx context.Context, ts int64, li } return mongoutil.Aggregate[*model.Conversation](ctx, c.coll, pipeline) } + +func (c *ConversationMgo) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) { + if len(conversationIDs) == 0 { + return nil + } + return mongoutil.IncrVersion(func() error { + err := mongoutil.DeleteMany(ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": bson.M{"$in": conversationIDs}}) + return err + }, func() error { + for _, conversationID := range conversationIDs { + if err := c.version.IncrVersion(ctx, userID, []string{conversationID}, model.VersionStateDelete); err != nil { + return err + } + } + return nil + }) +}