diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index ce720e640..ccae5b40b 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -436,19 +436,37 @@ func (c *conversationServer) CreateGroupChatConversations(ctx context.Context, r return &pbconversation.CreateGroupChatConversationsResp{}, nil } +// func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbconversation.SetConversationMaxSeqReq) (*pbconversation.SetConversationMaxSeqResp, error) { +// if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID, +// map[string]any{"max_seq": req.MaxSeq}); err != nil { +// return nil, err +// } + +// return &pbconversation.SetConversationMaxSeqResp{}, nil +// } + func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbconversation.SetConversationMaxSeqReq) (*pbconversation.SetConversationMaxSeqResp, error) { - if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID, - map[string]any{"max_seq": req.MaxSeq}); err != nil { - return nil, err + + if err := c.conversationDatabase.SetUserConversationMaxSeq(ctx, req.ConversationID, req.OwnerUserID[0], req.MaxSeq); err != nil { + return nil, errs.Wrap(err) } + return &pbconversation.SetConversationMaxSeqResp{}, nil } +// func (c *conversationServer) SetConversationMinSeq(ctx context.Context, req *pbconversation.SetConversationMinSeqReq) (*pbconversation.SetConversationMinSeqResp, error) { +// if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID, +// map[string]any{"min_seq": req.MinSeq}); err != nil { +// return nil, err +// } +// return &pbconversation.SetConversationMinSeqResp{}, nil +// } + func (c *conversationServer) SetConversationMinSeq(ctx context.Context, req *pbconversation.SetConversationMinSeqReq) (*pbconversation.SetConversationMinSeqResp, error) { - if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID, - map[string]any{"min_seq": req.MinSeq}); err != nil { - return nil, err + if err := c.conversationDatabase.SetUserConversationMinSeq(ctx, req.ConversationID, req.OwnerUserID[0], req.MinSeq); err != nil { + return nil, errs.Wrap(err) } + return &pbconversation.SetConversationMinSeqResp{}, nil } diff --git a/internal/rpc/msg/clear.go b/internal/rpc/msg/clear.go index c5bd36b44..0caab3625 100644 --- a/internal/rpc/msg/clear.go +++ b/internal/rpc/msg/clear.go @@ -12,6 +12,7 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" + "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/idutil" "github.com/openimsdk/tools/utils/stringutil" "golang.org/x/sync/errgroup" @@ -100,6 +101,8 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) } if len(seqs) > 0 { + minseq := datautil.Max(seqs...) + if err := m.Conversation.UpdateConversation(handleCtx, &pbconversation.UpdateConversationReq{ UserIDs: []string{conversation.OwnerUserID}, @@ -109,6 +112,10 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) continue } + if err := m.Conversation.SetConversationMinSeq(handleCtx, []string{conversation.OwnerUserID}, conversation.ConversationID, minseq); err != nil { + return err + } + // if you need Notify SDK client userseq is update. // m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs) } diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index f0b7d70db..16abf103f 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -74,6 +74,15 @@ type ConversationDatabase interface { GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) // GetPinnedConversationIDs gets pinned conversationIDs by userID GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) + + // GetUserConversationMinSeq is get user specific conversation min seq + GetUserConversationMinSeq(ctx context.Context, conversationID, userID string) (int64, error) + // SetUserConversationMinSeq is set user specific conversation min seq + SetUserConversationMinSeq(ctx context.Context, conversationID, userID string, seq int64) error + // GetUserConversationMaxSeq is get user specific conversation max seq + GetUserConversationMaxSeq(ctx context.Context, conversationID, userID string) (int64, error) + // SetUserConversationMaxSeq is set user specific conversation max seq + SetUserConversationMaxSeq(ctx context.Context, conversationID, userID string, seq int64) error } func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { @@ -401,3 +410,28 @@ func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, use } return conversationIDs, nil } + +func (c *conversationDatabase) GetUserConversationMinSeq(ctx context.Context, conversationID, userID string) (int64, error) { + seq, err := c.conversationDB.GetUserConversationMinSeq(ctx, conversationID, userID) + if err != nil { + return 0, err + } + + return seq, nil +} +func (c *conversationDatabase) SetUserConversationMinSeq(ctx context.Context, conversationID, userID string, seq int64) error { + return c.conversationDB.SetUserConversationMinSeq(ctx, conversationID, userID, seq) +} + +func (c *conversationDatabase) GetUserConversationMaxSeq(ctx context.Context, conversationID, userID string) (int64, error) { + seq, err := c.conversationDB.GetUserConversationMaxSeq(ctx, conversationID, userID) + if err != nil { + return 0, err + } + + return seq, nil +} + +func (c *conversationDatabase) SetUserConversationMaxSeq(ctx context.Context, conversationID, userID string, seq int64) error { + return c.conversationDB.SetUserConversationMaxSeq(ctx, conversationID, userID, seq) +} diff --git a/pkg/common/storage/database/conversation.go b/pkg/common/storage/database/conversation.go index 5a9b19035..5288da8b9 100644 --- a/pkg/common/storage/database/conversation.go +++ b/pkg/common/storage/database/conversation.go @@ -16,6 +16,7 @@ package database import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/tools/db/pagination" ) @@ -41,4 +42,9 @@ type Conversation interface { GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) + + GetUserConversationMinSeq(ctx context.Context, conversationID, userID string) (int64, error) + SetUserConversationMinSeq(ctx context.Context, conversationID, userID string, seq int64) error + GetUserConversationMaxSeq(ctx context.Context, conversationID, userID string) (int64, error) + SetUserConversationMaxSeq(ctx context.Context, conversationID, userID string, seq int64) error } diff --git a/pkg/common/storage/database/mgo/conversation.go b/pkg/common/storage/database/mgo/conversation.go index f7ced1c2c..cd29f6e02 100644 --- a/pkg/common/storage/database/mgo/conversation.go +++ b/pkg/common/storage/database/mgo/conversation.go @@ -16,9 +16,11 @@ package mgo import ( "context" + "errors" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "time" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/db/mongoutil" @@ -227,3 +229,49 @@ func (c *ConversationMgo) GetConversationNotReceiveMessageUserIDs(ctx context.Co func (c *ConversationMgo) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) { return c.version.FindChangeLog(ctx, userID, version, limit) } + +func (s *ConversationMgo) setSeq(ctx context.Context, conversationID string, userID string, seq int64, field string) error { + filter := map[string]any{ + "owner_user_id": userID, + "conversation_id": conversationID, + } + update := map[string]any{ + "$set": bson.M{ + field: seq, + }, + } + return mongoutil.UpdateOne(ctx, s.coll, filter, update, false, nil) +} + +func (s *ConversationMgo) getSeq(ctx context.Context, conversationID string, userID string, failed string) (int64, error) { + filter := map[string]any{ + "owner_user_id": userID, + "conversation_id": conversationID, + } + + opt := options.FindOne().SetProjection(bson.M{"_id": 0, failed: 1}) + seq, err := mongoutil.FindOne[int64](ctx, s.coll, filter, opt) + if err == nil { + return seq, nil + } else if errors.Is(err, mongo.ErrNoDocuments) { + return 0, nil + } else { + return 0, err + } +} + +func (s *ConversationMgo) GetUserConversationMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { + return s.getSeq(ctx, conversationID, userID, "min_seq") +} + +func (s *ConversationMgo) SetUserConversationMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error { + return s.setSeq(ctx, conversationID, userID, seq, "min_seq") +} + +func (s *ConversationMgo) GetUserConversationMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) { + return s.getSeq(ctx, conversationID, userID, "max_seq") +} + +func (s *ConversationMgo) SetUserConversationMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error { + return s.setSeq(ctx, conversationID, userID, seq, "max_seq") +}