pull/3549/merge
Monet Lee 2 weeks ago committed by GitHub
commit 166a292263
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -12,7 +12,7 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.73-alpha.14
github.com/openimsdk/protocol v0.0.73-alpha.17
github.com/openimsdk/tools v0.0.50-alpha.97
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0

@ -347,8 +347,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FAx+o5El9U/Bw=
github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.73-alpha.14 h1:lv9wNiPRm6G7q74TfpMobKrSfeTaBlZ+Ps3O6UFPmaE=
github.com/openimsdk/protocol v0.0.73-alpha.14/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
github.com/openimsdk/protocol v0.0.73-alpha.17 h1:ddo0QMns1GVwAmrPIPlAQ7uKmThAYLnOt+CIOgLsJyE=
github.com/openimsdk/protocol v0.0.73-alpha.17/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
github.com/openimsdk/tools v0.0.50-alpha.97 h1:6ik5w3PpgDG6VjSo3nb3FT/fxN3JX7iIARVxVu9g7VY=
github.com/openimsdk/tools v0.0.50-alpha.97/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=

@ -76,3 +76,7 @@ func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) {
func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) {
a2r.Call(c, conversation.ConversationClient.UpdateConversationsByUser, o.Client)
}
func (o *ConversationApi) DeleteConversations(c *gin.Context) {
a2r.Call(c, conversation.ConversationClient.DeleteConversations, o.Client)
}

@ -277,6 +277,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf
conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation)
conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs)
conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs)
conversationGroup.POST("/delete_conversations", c.DeleteConversations)
}
{

@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"errors"
"github.com/openimsdk/tools/mq"
"sync"

@ -37,6 +37,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
@ -795,7 +796,7 @@ func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req *
}
latestMsgDestructTime := time.UnixMilli(req.Timestamp)
for i, conversation := range conversations {
if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 {
if !conversation.IsMsgDestruct || conversation.MsgDestructTime == 0 {
continue
}
seq, err := c.msgClient.GetLastMessageSeqByTime(ctx, conversation.ConversationID, req.Timestamp-(conversation.MsgDestructTime*1000))
@ -835,3 +836,53 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c
c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID})
return nil
}
func (c *conversationServer) DeleteConversations(ctx context.Context, req *pbconversation.DeleteConversationsReq) (resp *pbconversation.DeleteConversationsResp, err error) {
if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil {
return nil, err
}
if req.NeedDeleteTime == 0 && len(req.ConversationIDs) == 0 {
return nil, errs.ErrArgs.WrapMsg("need_delete_time or conversationIDs need be set")
}
if req.NeedDeleteTime != 0 && len(req.ConversationIDs) != 0 {
return nil, errs.ErrArgs.WrapMsg("need_delete_time and conversationIDs cannot both be set")
}
var needDeleteConversationIDs []string
if len(req.ConversationIDs) == 0 {
deleteTimeThreshold := time.Now().AddDate(0, 0, -int(req.NeedDeleteTime)).UnixMilli()
conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.OwnerUserID)
if err != nil {
return nil, err
}
latestMsgs, err := c.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{
UserID: req.OwnerUserID,
ConversationIDs: conversationIDs,
})
if err != nil {
return nil, err
}
for conversationID, msg := range latestMsgs.Msgs {
if msg.SendTime < deleteTimeThreshold {
needDeleteConversationIDs = append(needDeleteConversationIDs, conversationID)
}
}
if len(needDeleteConversationIDs) == 0 {
return &pbconversation.DeleteConversationsResp{}, nil
}
} else {
needDeleteConversationIDs = req.ConversationIDs
}
if err := c.conversationDatabase.DeleteUsersConversations(ctx, req.OwnerUserID, needDeleteConversationIDs); err != nil {
return nil, err
}
// c.conversationNotificationSender.ConversationDeleteNotification(ctx, req.OwnerUserID, needDeleteConversationIDs)
return &pbconversation.DeleteConversationsResp{}, nil
}

@ -73,3 +73,12 @@ func (c *ConversationNotificationSender) ConversationUnreadChangeNotification(
c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, tips)
}
func (c *ConversationNotificationSender) ConversationDeleteNotification(ctx context.Context, userID string, conversationIDs []string) {
tips := &sdkws.ConversationDeleteTips{
UserID: userID,
ConversationIDs: conversationIDs,
}
c.Notification(ctx, userID, userID, constant.ConversationDeleteNotification, tips)
}

@ -16,6 +16,7 @@ package cache
import (
"context"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
)
@ -57,7 +58,7 @@ type ConversationCache interface {
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache
DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache
DelConversationPinnedMessageUserIDs(userIDs ...string) ConversationCache
DelUserPinnedConversations(userIDs ...string) ConversationCache
DelConversationVersionUserIDs(userIDs ...string) ConversationCache
FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error)

@ -253,7 +253,7 @@ func (c *ConversationRedisCache) DelConversationNotNotifyMessageUserIDs(userIDs
return cache
}
func (c *ConversationRedisCache) DelConversationPinnedMessageUserIDs(userIDs ...string) cache.ConversationCache {
func (c *ConversationRedisCache) DelUserPinnedConversations(userIDs ...string) cache.ConversationCache {
cache := c.CloneConversationCache()
for _, userID := range userIDs {
cache.AddKeys(c.getPinnedConversationIDsKey(userID))

@ -78,6 +78,8 @@ type ConversationDatabase interface {
GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error)
// FindRandConversation finds random conversations based on the specified timestamp and limit.
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error)
DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error)
}
func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
@ -120,7 +122,7 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context,
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
}
if _, ok := fieldMap["is_pinned"]; ok {
cache = cache.DelConversationPinnedMessageUserIDs(userIDs...)
cache = cache.DelUserPinnedConversations(userIDs...)
}
cache = cache.DelConversationVersionUserIDs(haveUserIDs...)
}
@ -172,7 +174,7 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context,
cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...)
}
if _, ok := args["is_pinned"]; ok {
cache = cache.DelConversationPinnedMessageUserIDs(userIDs...)
cache = cache.DelUserPinnedConversations(userIDs...)
}
return cache.ChainExecDel(ctx)
}
@ -203,7 +205,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
DelUserConversationIDsHash(userIDs...).
DelConversationVersionUserIDs(userIDs...).
DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...).
DelConversationPinnedMessageUserIDs(pinnedUserIDs...).
DelUserPinnedConversations(pinnedUserIDs...).
ChainExecDel(ctx)
}
@ -259,7 +261,7 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs
cache := c.cache.CloneConversationCache()
cache = cache.DelConversationVersionUserIDs(ownerUserID).
DelConversationNotNotifyMessageUserIDs(ownerUserID).
DelConversationPinnedMessageUserIDs(ownerUserID)
DelUserPinnedConversations(ownerUserID)
groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) {
return e.GroupID, e.GroupID != ""
@ -429,3 +431,21 @@ func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, use
func (c *conversationDatabase) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) {
return c.conversationDB.FindRandConversation(ctx, ts, limit)
}
func (c *conversationDatabase) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) {
return c.tx.Transaction(ctx, func(ctx context.Context) error {
err = c.conversationDB.DeleteUsersConversations(ctx, userID, conversationIDs)
if err != nil {
return err
}
cache := c.cache.CloneConversationCache()
cache = cache.DelConversations(userID, conversationIDs...).
DelConversationVersionUserIDs(userID).
DelConversationIDs(userID).
DelUserConversationIDsHash(userID).
DelConversationNotNotifyMessageUserIDs(userID).
DelUserPinnedConversations(userID)
return cache.ChainExecDel(ctx)
})
}

@ -44,4 +44,5 @@ type Conversation interface {
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error)
DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error)
}

@ -308,3 +308,20 @@ func (c *ConversationMgo) FindRandConversation(ctx context.Context, ts int64, li
}
return mongoutil.Aggregate[*model.Conversation](ctx, c.coll, pipeline)
}
func (c *ConversationMgo) DeleteUsersConversations(ctx context.Context, userID string, conversationIDs []string) (err error) {
if len(conversationIDs) == 0 {
return nil
}
return mongoutil.IncrVersion(func() error {
err := mongoutil.DeleteMany(ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": bson.M{"$in": conversationIDs}})
return err
}, func() error {
for _, conversationID := range conversationIDs {
if err := c.version.IncrVersion(ctx, userID, []string{conversationID}, model.VersionStateDelete); err != nil {
return err
}
}
return nil
})
}

Loading…
Cancel
Save