From c4a1649aaefd0908c02b34243e25e179b42ee50b Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Sat, 30 Aug 2025 16:46:00 +0800 Subject: [PATCH 1/3] feat: implement DeleteConversations interface. --- go.mod | 2 +- go.sum | 4 +- internal/api/conversation.go | 4 ++ internal/api/router.go | 1 + .../msgtransfer/online_history_msg_handler.go | 2 + internal/rpc/conversation/conversation.go | 69 ++++++++++++++++++- internal/rpc/conversation/notification.go | 9 +++ pkg/common/storage/controller/conversation.go | 20 ++++++ pkg/common/storage/database/conversation.go | 1 + .../storage/database/mgo/conversation.go | 17 +++++ 10 files changed, 125 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 775765706..dac5cbf35 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.73-alpha.12 + github.com/openimsdk/protocol v0.0.73-alpha.17 github.com/openimsdk/tools v0.0.50-alpha.97 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 329a916ec..42cb83c6d 100644 --- a/go.sum +++ b/go.sum @@ -347,8 +347,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FAx+o5El9U/Bw= github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.73-alpha.12 h1:2NYawXeHChYUeSme6QJ9pOLh+Empce2WmwEtbP4JvKk= -github.com/openimsdk/protocol v0.0.73-alpha.12/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= +github.com/openimsdk/protocol v0.0.73-alpha.17 h1:ddo0QMns1GVwAmrPIPlAQ7uKmThAYLnOt+CIOgLsJyE= +github.com/openimsdk/protocol v0.0.73-alpha.17/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= github.com/openimsdk/tools v0.0.50-alpha.97 h1:6ik5w3PpgDG6VjSo3nb3FT/fxN3JX7iIARVxVu9g7VY= github.com/openimsdk/tools v0.0.50-alpha.97/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= diff --git a/internal/api/conversation.go b/internal/api/conversation.go index 5a191c0ec..78affee44 100644 --- a/internal/api/conversation.go +++ b/internal/api/conversation.go @@ -76,3 +76,7 @@ func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) { func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) { a2r.Call(c, conversation.ConversationClient.UpdateConversationsByUser, o.Client) } + +func (o *ConversationApi) DeleteConversations(c *gin.Context) { + a2r.Call(c, conversation.ConversationClient.DeleteConversations, o.Client) +} diff --git a/internal/api/router.go b/internal/api/router.go index 1d3a92dd7..15bbeb053 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -277,6 +277,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation) conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs) conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs) + conversationGroup.POST("/delete_conversations", c.DeleteConversations) } { diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 8b212774a..ec182ea73 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -280,6 +280,8 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key } if isNewConversation { + // 需要每一条消息 都去检查了 有没有会话 + switch msg.SessionType { case constant.ReadGroupChatType: log.ZDebug(ctx, "group chat first create conversation", "conversationID", diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index c9bfd3c56..303a4ecca 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -37,6 +37,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/protocol/constant" pbconversation "github.com/openimsdk/protocol/conversation" + "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" @@ -795,7 +796,7 @@ func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req * } latestMsgDestructTime := time.UnixMilli(req.Timestamp) for i, conversation := range conversations { - if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 { + if !conversation.IsMsgDestruct || conversation.MsgDestructTime == 0 { continue } seq, err := c.msgClient.GetLastMessageSeqByTime(ctx, conversation.ConversationID, req.Timestamp-(conversation.MsgDestructTime*1000)) @@ -835,3 +836,69 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID}) return nil } + +func (c *conversationServer) DeleteConversations(ctx context.Context, req *pbconversation.DeleteConversationsReq) (resp *pbconversation.DeleteConversationsResp, err error) { + if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil { + return nil, err + } + if req.NeedDeleteTime == 0 { + return nil, errs.ErrArgs.WrapMsg("need_delete_time need be set") + } + + deleteTimeThreshold := time.Now().AddDate(0, 0, -int(req.NeedDeleteTime)).UnixMilli() + + var conversationIDs []string + if len(req.ConversationIDs) == 0 { + conversationIDs, err = c.conversationDatabase.GetConversationIDs(ctx, req.OwnerUserID) + if err != nil { + return nil, err + } + } else { + conversationIDs = req.ConversationIDs + } + + // Check Conversation have a specific status + conversations, err := c.conversationDatabase.FindConversations(ctx, req.OwnerUserID, conversationIDs) + if err != nil { + return nil, err + } + + if len(conversations) == 0 { + return nil, errs.ErrRecordNotFound.Wrap() + } + + needCheckConversationIDs := make([]string, 0, len(conversations)) + + for _, conversation := range conversations { + if conversation.IsPinned { + continue + } + needCheckConversationIDs = append(needCheckConversationIDs, conversation.ConversationID) + } + + latestMsgs, err := c.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{ + UserID: req.OwnerUserID, + ConversationIDs: needCheckConversationIDs, + }) + if err != nil { + return nil, err + } + + var needDeleteConversationIDs []string + for conversationID, msg := range latestMsgs.Msgs { + if msg.SendTime < deleteTimeThreshold { + needDeleteConversationIDs = append(needDeleteConversationIDs, conversationID) + } + } + + if len(needDeleteConversationIDs) == 0 { + return &pbconversation.DeleteConversationsResp{}, nil + } + + if err := c.conversationDatabase.DeleteUsersConversations(ctx, req.OwnerUserID, needDeleteConversationIDs); err != nil { + return nil, err + } + c.conversationNotificationSender.ConversationDeleteNotification(ctx, req.OwnerUserID, needDeleteConversationIDs) + + return &pbconversation.DeleteConversationsResp{}, nil +} diff --git a/internal/rpc/conversation/notification.go b/internal/rpc/conversation/notification.go index 6e865ba42..aa7b7d227 100644 --- a/internal/rpc/conversation/notification.go +++ b/internal/rpc/conversation/notification.go @@ -73,3 +73,12 @@ func (c *ConversationNotificationSender) ConversationUnreadChangeNotification( c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, tips) } + +func (c *ConversationNotificationSender) ConversationDeleteNotification(ctx context.Context, userID string, conversationIDs []string) { + tips := &sdkws.ConversationDeleteTips{ + UserID: userID, + ConversationIDs: conversationIDs, + } + + c.Notification(ctx, userID, userID, constant.ConversationDeleteNotification, tips) +} diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index 27442ca66..b71d5dbbc 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -78,6 +78,8 @@ type ConversationDatabase interface { GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) // FindRandConversation finds random conversations based on the specified timestamp and limit. FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) + + DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) } func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { @@ -429,3 +431,21 @@ func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, use func (c *conversationDatabase) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) { return c.conversationDB.FindRandConversation(ctx, ts, limit) } + +func (c *conversationDatabase) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) { + return c.tx.Transaction(ctx, func(ctx context.Context) error { + err = c.conversationDB.DeleteUsersConversations(ctx, userID, conversationIDs) + if err != nil { + return err + } + cache := c.cache.CloneConversationCache() + cache = cache.DelConversations(userID, conversationIDs...). + DelConversationVersionUserIDs(userID). + DelConversationIDs(userID). + DelUserConversationIDsHash(userID). + DelConversationNotNotifyMessageUserIDs(userID). + DelConversationPinnedMessageUserIDs(userID) + + return cache.ChainExecDel(ctx) + }) +} diff --git a/pkg/common/storage/database/conversation.go b/pkg/common/storage/database/conversation.go index d612dfc2d..562782346 100644 --- a/pkg/common/storage/database/conversation.go +++ b/pkg/common/storage/database/conversation.go @@ -44,4 +44,5 @@ type Conversation interface { GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error) + DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) } diff --git a/pkg/common/storage/database/mgo/conversation.go b/pkg/common/storage/database/mgo/conversation.go index 89f13ea3d..d5ab046aa 100644 --- a/pkg/common/storage/database/mgo/conversation.go +++ b/pkg/common/storage/database/mgo/conversation.go @@ -308,3 +308,20 @@ func (c *ConversationMgo) FindRandConversation(ctx context.Context, ts int64, li } return mongoutil.Aggregate[*model.Conversation](ctx, c.coll, pipeline) } + +func (c *ConversationMgo) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) { + if len(conversationIDs) == 0 { + return nil + } + return mongoutil.IncrVersion(func() error { + err := mongoutil.DeleteMany(ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": bson.M{"$in": conversationIDs}}) + return err + }, func() error { + for _, conversationID := range conversationIDs { + if err := c.version.IncrVersion(ctx, userID, []string{conversationID}, model.VersionStateDelete); err != nil { + return err + } + } + return nil + }) +} From cf8d373a8cf6567f9fe27752670939e95e72e9b7 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Mon, 1 Sep 2025 09:58:50 +0800 Subject: [PATCH 2/3] remove unused comment. --- internal/msgtransfer/online_history_msg_handler.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index ec182ea73..8af585ce3 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/tools/mq" "sync" @@ -280,8 +281,6 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key } if isNewConversation { - // 需要每一条消息 都去检查了 有没有会话 - switch msg.SessionType { case constant.ReadGroupChatType: log.ZDebug(ctx, "group chat first create conversation", "conversationID", From bcee8dff5a0570e8ff062831bae86427da3a34d0 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Mon, 1 Sep 2025 17:03:15 +0800 Subject: [PATCH 3/3] update logic and rename method. --- internal/rpc/conversation/conversation.go | 68 +++++++------------ pkg/common/storage/cache/conversation.go | 3 +- .../storage/cache/redis/conversation.go | 2 +- pkg/common/storage/controller/conversation.go | 10 +-- 4 files changed, 34 insertions(+), 49 deletions(-) diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index 303a4ecca..562a76d82 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -841,64 +841,48 @@ func (c *conversationServer) DeleteConversations(ctx context.Context, req *pbcon if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil { return nil, err } - if req.NeedDeleteTime == 0 { - return nil, errs.ErrArgs.WrapMsg("need_delete_time need be set") + if req.NeedDeleteTime == 0 && len(req.ConversationIDs) == 0 { + return nil, errs.ErrArgs.WrapMsg("need_delete_time or conversationIDs need be set") } - deleteTimeThreshold := time.Now().AddDate(0, 0, -int(req.NeedDeleteTime)).UnixMilli() + if req.NeedDeleteTime != 0 && len(req.ConversationIDs) != 0 { + return nil, errs.ErrArgs.WrapMsg("need_delete_time and conversationIDs cannot both be set") + } + + var needDeleteConversationIDs []string - var conversationIDs []string if len(req.ConversationIDs) == 0 { - conversationIDs, err = c.conversationDatabase.GetConversationIDs(ctx, req.OwnerUserID) + deleteTimeThreshold := time.Now().AddDate(0, 0, -int(req.NeedDeleteTime)).UnixMilli() + conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.OwnerUserID) if err != nil { return nil, err } - } else { - conversationIDs = req.ConversationIDs - } - - // Check Conversation have a specific status - conversations, err := c.conversationDatabase.FindConversations(ctx, req.OwnerUserID, conversationIDs) - if err != nil { - return nil, err - } - - if len(conversations) == 0 { - return nil, errs.ErrRecordNotFound.Wrap() - } - - needCheckConversationIDs := make([]string, 0, len(conversations)) - - for _, conversation := range conversations { - if conversation.IsPinned { - continue + latestMsgs, err := c.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{ + UserID: req.OwnerUserID, + ConversationIDs: conversationIDs, + }) + if err != nil { + return nil, err } - needCheckConversationIDs = append(needCheckConversationIDs, conversation.ConversationID) - } - - latestMsgs, err := c.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{ - UserID: req.OwnerUserID, - ConversationIDs: needCheckConversationIDs, - }) - if err != nil { - return nil, err - } - var needDeleteConversationIDs []string - for conversationID, msg := range latestMsgs.Msgs { - if msg.SendTime < deleteTimeThreshold { - needDeleteConversationIDs = append(needDeleteConversationIDs, conversationID) + for conversationID, msg := range latestMsgs.Msgs { + if msg.SendTime < deleteTimeThreshold { + needDeleteConversationIDs = append(needDeleteConversationIDs, conversationID) + } } - } - if len(needDeleteConversationIDs) == 0 { - return &pbconversation.DeleteConversationsResp{}, nil + if len(needDeleteConversationIDs) == 0 { + return &pbconversation.DeleteConversationsResp{}, nil + } + } else { + needDeleteConversationIDs = req.ConversationIDs } if err := c.conversationDatabase.DeleteUsersConversations(ctx, req.OwnerUserID, needDeleteConversationIDs); err != nil { return nil, err } - c.conversationNotificationSender.ConversationDeleteNotification(ctx, req.OwnerUserID, needDeleteConversationIDs) + + // c.conversationNotificationSender.ConversationDeleteNotification(ctx, req.OwnerUserID, needDeleteConversationIDs) return &pbconversation.DeleteConversationsResp{}, nil } diff --git a/pkg/common/storage/cache/conversation.go b/pkg/common/storage/cache/conversation.go index ac3011107..d5ab2264f 100644 --- a/pkg/common/storage/cache/conversation.go +++ b/pkg/common/storage/cache/conversation.go @@ -16,6 +16,7 @@ package cache import ( "context" + relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" ) @@ -57,7 +58,7 @@ type ConversationCache interface { GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache - DelConversationPinnedMessageUserIDs(userIDs ...string) ConversationCache + DelUserPinnedConversations(userIDs ...string) ConversationCache DelConversationVersionUserIDs(userIDs ...string) ConversationCache FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error) diff --git a/pkg/common/storage/cache/redis/conversation.go b/pkg/common/storage/cache/redis/conversation.go index 3453b570d..16d67322b 100644 --- a/pkg/common/storage/cache/redis/conversation.go +++ b/pkg/common/storage/cache/redis/conversation.go @@ -253,7 +253,7 @@ func (c *ConversationRedisCache) DelConversationNotNotifyMessageUserIDs(userIDs return cache } -func (c *ConversationRedisCache) DelConversationPinnedMessageUserIDs(userIDs ...string) cache.ConversationCache { +func (c *ConversationRedisCache) DelUserPinnedConversations(userIDs ...string) cache.ConversationCache { cache := c.CloneConversationCache() for _, userID := range userIDs { cache.AddKeys(c.getPinnedConversationIDsKey(userID)) diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index b71d5dbbc..d085c7466 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -122,7 +122,7 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context, cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...) } if _, ok := fieldMap["is_pinned"]; ok { - cache = cache.DelConversationPinnedMessageUserIDs(userIDs...) + cache = cache.DelUserPinnedConversations(userIDs...) } cache = cache.DelConversationVersionUserIDs(haveUserIDs...) } @@ -174,7 +174,7 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...) } if _, ok := args["is_pinned"]; ok { - cache = cache.DelConversationPinnedMessageUserIDs(userIDs...) + cache = cache.DelUserPinnedConversations(userIDs...) } return cache.ChainExecDel(ctx) } @@ -205,7 +205,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat DelUserConversationIDsHash(userIDs...). DelConversationVersionUserIDs(userIDs...). DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...). - DelConversationPinnedMessageUserIDs(pinnedUserIDs...). + DelUserPinnedConversations(pinnedUserIDs...). ChainExecDel(ctx) } @@ -261,7 +261,7 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs cache := c.cache.CloneConversationCache() cache = cache.DelConversationVersionUserIDs(ownerUserID). DelConversationNotNotifyMessageUserIDs(ownerUserID). - DelConversationPinnedMessageUserIDs(ownerUserID) + DelUserPinnedConversations(ownerUserID) groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) { return e.GroupID, e.GroupID != "" @@ -444,7 +444,7 @@ func (c *conversationDatabase) DeleteUsersConversations(ctx context.Context, use DelConversationIDs(userID). DelUserConversationIDsHash(userID). DelConversationNotNotifyMessageUserIDs(userID). - DelConversationPinnedMessageUserIDs(userID) + DelUserPinnedConversations(userID) return cache.ChainExecDel(ctx) })