diff --git a/go.mod b/go.mod index 2111208f2..9b3a0dfca 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.69-alpha.17 + github.com/openimsdk/protocol v0.0.69-alpha.27 github.com/openimsdk/tools v0.0.49-alpha.45 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 5bc9f6d98..246315957 100644 --- a/go.sum +++ b/go.sum @@ -262,8 +262,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.69-alpha.17 h1:pEag4ZdlovE+AyLsw1VYFU/3sk6ayvGdPzgufQfKf9M= -github.com/openimsdk/protocol v0.0.69-alpha.17/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.69-alpha.27 h1:0Ctpu9VBXVCkKno6vVNBgUTyo9W9bG7SZuAhQr/4H8Y= +github.com/openimsdk/protocol v0.0.69-alpha.27/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.49-alpha.45 h1:XIzCoef4myybOiIlGuRY9FTtGBisZFC4Uy4PhG0ZWQ0= github.com/openimsdk/tools v0.0.49-alpha.45/go.mod h1:HtSRjPTL8PsuZ+PhR5noqzrYBF0sdwW3/O/sWVucWg8= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= diff --git a/internal/api/conversation.go b/internal/api/conversation.go index f273eaa4a..cb514bb7d 100644 --- a/internal/api/conversation.go +++ b/internal/api/conversation.go @@ -50,3 +50,11 @@ func (o *ConversationApi) SetConversations(c *gin.Context) { func (o *ConversationApi) GetConversationOfflinePushUserIDs(c *gin.Context) { a2r.Call(conversation.ConversationClient.GetConversationOfflinePushUserIDs, o.Client, c) } + +func (o *ConversationApi) GetFullOwnerConversationIDs(c *gin.Context) { + a2r.Call(conversation.ConversationClient.GetFullOwnerConversationIDs, o.Client, c) +} + +func (o *ConversationApi) GetIncrementalConversation(c *gin.Context) { + a2r.Call(conversation.ConversationClient.GetIncrementalConversation, o.Client, c) +} diff --git a/internal/api/router.go b/internal/api/router.go index 0f46f26ba..76a7eeea0 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -192,6 +192,8 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En conversationGroup.POST("/get_conversations", c.GetConversations) conversationGroup.POST("/set_conversations", c.SetConversations) conversationGroup.POST("/get_conversation_offline_push_user_ids", c.GetConversationOfflinePushUserIDs) + conversationGroup.POST("/get_full_conversation_ids", c.GetFullOwnerConversationIDs) + conversationGroup.POST("/get_incremental_conversation", c.GetIncrementalConversation) } statisticsGroup := r.Group("/statistics") diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index df9267ae0..9571ed7b2 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -184,13 +184,23 @@ func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbcon } func (c *conversationServer) GetConversations(ctx context.Context, req *pbconversation.GetConversationsReq) (*pbconversation.GetConversationsResp, error) { - conversations, err := c.conversationDatabase.FindConversations(ctx, req.OwnerUserID, req.ConversationIDs) + conversations, err := c.getConversations(ctx, req.OwnerUserID, req.ConversationIDs) + if err != nil { + return nil, err + } + return &pbconversation.GetConversationsResp{ + Conversations: conversations, + }, nil +} + +func (c *conversationServer) getConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) { + conversations, err := c.conversationDatabase.FindConversations(ctx, ownerUserID, conversationIDs) if err != nil { return nil, err } resp := &pbconversation.GetConversationsResp{Conversations: []*pbconversation.Conversation{}} resp.Conversations = convert.ConversationsDB2Pb(conversations) - return resp, nil + return convert.ConversationsDB2Pb(conversations), nil } func (c *conversationServer) SetConversation(ctx context.Context, req *pbconversation.SetConversationReq) (*pbconversation.SetConversationResp, error) { diff --git a/internal/rpc/conversation/sync.go b/internal/rpc/conversation/sync.go new file mode 100644 index 000000000..29c11c4a1 --- /dev/null +++ b/internal/rpc/conversation/sync.go @@ -0,0 +1,56 @@ +package conversation + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/open-im-server/v3/pkg/util/hashutil" + "github.com/openimsdk/protocol/conversation" +) + +func (c *conversationServer) GetFullOwnerConversationIDs(ctx context.Context, req *conversation.GetFullOwnerConversationIDsReq) (*conversation.GetFullOwnerConversationIDsResp, error) { + vl, err := c.conversationDatabase.FindMaxConversationUserVersionCache(ctx, req.UserID) + if err != nil { + return nil, err + } + conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.UserID) + if err != nil { + return nil, err + } + idHash := hashutil.IdHash(conversationIDs) + if req.IdHash == idHash { + conversationIDs = nil + } + return &conversation.GetFullOwnerConversationIDsResp{ + Version: idHash, + VersionID: vl.ID.Hex(), + Equal: req.IdHash == idHash, + ConversationIDs: conversationIDs, + }, nil +} + +func (c *conversationServer) GetIncrementalConversation(ctx context.Context, req *conversation.GetIncrementalConversationReq) (*conversation.GetIncrementalConversationResp, error) { + opt := incrversion.Option[*conversation.Conversation, conversation.GetIncrementalConversationResp]{ + Ctx: ctx, + VersionKey: req.UserID, + VersionID: req.VersionID, + VersionNumber: req.Version, + Version: c.conversationDatabase.FindConversationUserVersion, + CacheMaxVersion: c.conversationDatabase.FindMaxConversationUserVersionCache, + Find: func(ctx context.Context, conversationIDs []string) ([]*conversation.Conversation, error) { + return c.getConversations(ctx, req.UserID, conversationIDs) + }, + ID: func(elem *conversation.Conversation) string { return elem.GroupID }, + Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*conversation.Conversation, full bool) *conversation.GetIncrementalConversationResp { + return &conversation.GetIncrementalConversationResp{ + VersionID: version.ID.Hex(), + Version: uint64(version.Version), + Full: full, + Delete: delIDs, + Insert: insertList, + Update: updateList, + } + }, + } + return opt.Build() +} diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go index 4cb1b81d0..881a5a332 100644 --- a/internal/rpc/third/s3.go +++ b/internal/rpc/third/s3.go @@ -290,3 +290,8 @@ type FormDataMate struct { Group string `json:"group"` Key string `json:"key"` } + +func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) { + //TODO implement me + panic("implement me") +} diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index 7560486a0..d0d88b174 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -43,6 +43,7 @@ type thirdServer struct { defaultExpire time.Duration config *Config } + type Config struct { RpcConfig config.Third RedisConfig config.Redis diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 211b360b7..144a37602 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -736,3 +736,8 @@ func (s *userServer) SortQuery(ctx context.Context, req *pbuser.SortQueryReq) (* } return &pbuser.SortQueryResp{Users: convert.UsersDB2Pb(users)}, nil } + +func (s *userServer) SetUserOnlineStatus(ctx context.Context, req *pbuser.SetUserOnlineStatusReq) (*pbuser.SetUserOnlineStatusResp, error) { + //TODO implement me + panic("implement me") +} diff --git a/pkg/common/storage/cache/cachekey/conversation.go b/pkg/common/storage/cache/cachekey/conversation.go index aea4ceec6..d19fcc576 100644 --- a/pkg/common/storage/cache/cachekey/conversation.go +++ b/pkg/common/storage/cache/cachekey/conversation.go @@ -23,6 +23,7 @@ const ( SuperGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" SuperGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:" ConversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:" + ConversationUserMaxKey = "CONVERSATION_USER_MAX:" ) func GetConversationKey(ownerUserID, conversationID string) string { @@ -56,3 +57,7 @@ func GetConversationNotReceiveMessageUserIDsKey(conversationID string) string { func GetUserConversationIDsHashKey(ownerUserID string) string { return ConversationIDsHashKey + ownerUserID } + +func GetConversationUserMaxVersionKey(userID string) string { + return ConversationUserMaxKey + userID +} diff --git a/pkg/common/storage/cache/conversation.go b/pkg/common/storage/cache/conversation.go index f34fd599f..bc1761483 100644 --- a/pkg/common/storage/cache/conversation.go +++ b/pkg/common/storage/cache/conversation.go @@ -54,4 +54,8 @@ type ConversationCache interface { GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache + + DelConversationVersionUserIDs(userIDs ...string) ConversationCache + + FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error) } diff --git a/pkg/common/storage/cache/redis/conversation.go b/pkg/common/storage/cache/redis/conversation.go index 8c0393dd5..c491d1b94 100644 --- a/pkg/common/storage/cache/redis/conversation.go +++ b/pkg/common/storage/cache/redis/conversation.go @@ -95,6 +95,10 @@ func (c *ConversationRedisCache) getUserConversationIDsHashKey(ownerUserID strin return cachekey.GetUserConversationIDsHashKey(ownerUserID) } +func (c *ConversationRedisCache) getConversationUserMaxVersionKey(ownerUserID string) string { + return cachekey.GetConversationUserMaxVersionKey(ownerUserID) +} + func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { return getCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), c.expireTime, func(ctx context.Context) ([]string, error) { return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID) @@ -233,6 +237,19 @@ func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(convers for _, conversationID := range conversationIDs { cache.AddKeys(c.getConversationNotReceiveMessageUserIDsKey(conversationID)) } + return cache +} +func (c *ConversationRedisCache) DelConversationVersionUserIDs(userIDs ...string) cache.ConversationCache { + cache := c.CloneConversationCache() + for _, userID := range userIDs { + cache.AddKeys(c.getConversationUserMaxVersionKey(userID)) + } return cache } + +func (c *ConversationRedisCache) FindMaxConversationUserVersion(ctx context.Context, userID string) (*model.VersionLog, error) { + return getCache(ctx, c.rcClient, c.getConversationUserMaxVersionKey(userID), c.expireTime, func(ctx context.Context) (*model.VersionLog, error) { + return c.conversationDB.FindConversationUserVersion(ctx, userID, 0, 0) + }) +} diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index 18ef3f8ba..d4b39fb8b 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -66,6 +66,8 @@ type ConversationDatabase interface { GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) // GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) // FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) + FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*relationtb.VersionLog, error) + FindMaxConversationUserVersionCache(ctx context.Context, userID string) (*relationtb.VersionLog, error) } func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { @@ -106,6 +108,7 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context, if _, ok := fieldMap["recv_msg_opt"]; ok { cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID) } + cache = cache.DelConversationVersionUserIDs(haveUserIDs...) } NotUserIDs := stringutil.DifferenceString(haveUserIDs, userIDs) log.ZDebug(ctx, "SetUsersConversationFieldTx", "NotUserIDs", NotUserIDs, "haveUserIDs", haveUserIDs, "userIDs", userIDs) @@ -137,7 +140,7 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, return err } cache := c.cache.CloneConversationCache() - cache = cache.DelUsersConversation(conversationID, userIDs...) + cache = cache.DelUsersConversation(conversationID, userIDs...).DelConversationVersionUserIDs(userIDs...) if _, ok := args["recv_msg_opt"]; ok { cache = cache.DelConversationNotReceiveMessageUserIDs(conversationID) } @@ -155,13 +158,14 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID) userIDs = append(userIDs, conversation.OwnerUserID) } - return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).ChainExecDel(ctx) + return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).DelConversationVersionUserIDs(userIDs...).ChainExecDel(ctx) } func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationtb.Conversation) error { return c.tx.Transaction(ctx, func(ctx context.Context) error { cache := c.cache.CloneConversationCache() for _, conversation := range conversations { + cache = cache.DelConversationVersionUserIDs(conversation.OwnerUserID) for _, v := range [][2]string{{conversation.OwnerUserID, conversation.UserID}, {conversation.UserID, conversation.OwnerUserID}} { ownerUserID := v[0] userID := v[1] @@ -207,6 +211,7 @@ func (c *conversationDatabase) GetUserAllConversation(ctx context.Context, owner func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationtb.Conversation) error { return c.tx.Transaction(ctx, func(ctx context.Context) error { cache := c.cache.CloneConversationCache() + cache = cache.DelConversationVersionUserIDs(ownerUserID) groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) { return e.GroupID, e.GroupID != "" })) @@ -322,3 +327,11 @@ func (c *conversationDatabase) GetConversationIDsNeedDestruct(ctx context.Contex func (c *conversationDatabase) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) { return c.cache.GetConversationNotReceiveMessageUserIDs(ctx, conversationID) } + +func (c *conversationDatabase) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*relationtb.VersionLog, error) { + return c.conversationDB.FindConversationUserVersion(ctx, userID, version, limit) +} + +func (c *conversationDatabase) FindMaxConversationUserVersionCache(ctx context.Context, userID string) (*relationtb.VersionLog, error) { + return c.cache.FindMaxConversationUserVersion(ctx, userID) +} diff --git a/pkg/common/storage/database/conversation.go b/pkg/common/storage/database/conversation.go index 46aa02d98..85f3dd668 100644 --- a/pkg/common/storage/database/conversation.go +++ b/pkg/common/storage/database/conversation.go @@ -22,7 +22,6 @@ import ( type Conversation interface { Create(ctx context.Context, conversations []*model.Conversation) (err error) - Delete(ctx context.Context, groupIDs []string) (err error) UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error) Update(ctx context.Context, conversation *model.Conversation) (err error) Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error) @@ -39,4 +38,5 @@ type Conversation interface { GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error) GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) + FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) } diff --git a/pkg/common/storage/database/mgo/conversation.go b/pkg/common/storage/database/mgo/conversation.go index b462d3958..3d505f1d3 100644 --- a/pkg/common/storage/database/mgo/conversation.go +++ b/pkg/common/storage/database/mgo/conversation.go @@ -41,40 +41,71 @@ func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) { if err != nil { return nil, errs.Wrap(err) } - return &ConversationMgo{coll: coll}, nil + version, err := NewVersionLog(db.Collection(database.ConversationVersionName)) + if err != nil { + return nil, err + } + return &ConversationMgo{version: version, coll: coll}, nil } type ConversationMgo struct { - coll *mongo.Collection + version database.VersionLog + coll *mongo.Collection } func (c *ConversationMgo) Create(ctx context.Context, conversations []*model.Conversation) (err error) { - return mongoutil.InsertMany(ctx, c.coll, conversations) -} - -func (c *ConversationMgo) Delete(ctx context.Context, groupIDs []string) (err error) { - return mongoutil.DeleteMany(ctx, c.coll, bson.M{"group_id": bson.M{"$in": groupIDs}}) + return mongoutil.IncrVersion(func() error { + return mongoutil.InsertMany(ctx, c.coll, conversations) + }, func() error { + userConversation := make(map[string][]string) + for _, conversation := range conversations { + userConversation[conversation.OwnerUserID] = append(userConversation[conversation.OwnerUserID], conversation.ConversationID) + } + for userID, conversationIDs := range userConversation { + if err := c.version.IncrVersion(ctx, userID, conversationIDs, model.VersionStateInsert); err != nil { + return err + } + } + return nil + }) } -func (c *ConversationMgo) UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error) { - if len(args) == 0 { +func (c *ConversationMgo) UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (int64, error) { + if len(args) == 0 || len(userIDs) == 0 { return 0, nil } filter := bson.M{ "conversation_id": conversationID, + "owner_user_id": bson.M{"$in": userIDs}, } - if len(userIDs) > 0 { - filter["owner_user_id"] = bson.M{"$in": userIDs} - } - res, err := mongoutil.UpdateMany(ctx, c.coll, filter, bson.M{"$set": args}) + var rows int64 + err := mongoutil.IncrVersion(func() error { + res, err := mongoutil.UpdateMany(ctx, c.coll, filter, bson.M{"$set": args}) + if err != nil { + return err + } + rows = res.ModifiedCount + return nil + }, func() error { + for _, userID := range userIDs { + if err := c.version.IncrVersion(ctx, userID, []string{conversationID}, model.VersionStateUpdate); err != nil { + return err + } + } + return nil + }) if err != nil { return 0, err } - return res.ModifiedCount, nil + return rows, nil } func (c *ConversationMgo) Update(ctx context.Context, conversation *model.Conversation) (err error) { - return mongoutil.UpdateOne(ctx, c.coll, bson.M{"owner_user_id": conversation.OwnerUserID, "conversation_id": conversation.ConversationID}, bson.M{"$set": conversation}, true) + return mongoutil.IncrVersion(func() error { + return mongoutil.UpdateOne(ctx, c.coll, bson.M{"owner_user_id": conversation.OwnerUserID, "conversation_id": conversation.ConversationID}, bson.M{"$set": conversation}, true) + }, func() error { + return c.version.IncrVersion(ctx, conversation.OwnerUserID, []string{conversation.ConversationID}, model.VersionStateUpdate) + }) } func (c *ConversationMgo) Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error) { @@ -178,3 +209,7 @@ func (c *ConversationMgo) GetConversationNotReceiveMessageUserIDs(ctx context.Co options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1}), ) } + +func (c *ConversationMgo) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) { + return c.version.FindChangeLog(ctx, userID, version, limit) +} diff --git a/pkg/common/storage/database/name.go b/pkg/common/storage/database/name.go index 986f22a1a..3d9eb6a97 100644 --- a/pkg/common/storage/database/name.go +++ b/pkg/common/storage/database/name.go @@ -1,17 +1,18 @@ package database const ( - BlackName = "black" - ConversationName = "conversation" - FriendName = "friend" - FriendVersionName = "friend_version" - FriendRequestName = "friend_request" - GroupName = "group" - GroupMemberName = "group_member" - GroupMemberVersionName = "group_member_version" - GroupJoinVersionName = "group_join_version" - GroupRequestName = "group_request" - LogName = "log" - ObjectName = "s3" - UserName = "user" + BlackName = "black" + ConversationName = "conversation" + FriendName = "friend" + FriendVersionName = "friend_version" + FriendRequestName = "friend_request" + GroupName = "group" + GroupMemberName = "group_member" + GroupMemberVersionName = "group_member_version" + GroupJoinVersionName = "group_join_version" + ConversationVersionName = "conversation_version" + GroupRequestName = "group_request" + LogName = "log" + ObjectName = "s3" + UserName = "user" )