diff --git a/internal/rpc/friend/sync.go b/internal/rpc/friend/sync.go index ee8effa70..ba689c7ba 100644 --- a/internal/rpc/friend/sync.go +++ b/internal/rpc/friend/sync.go @@ -62,26 +62,28 @@ func (s *friendServer) GetIncrementalFriends(ctx context.Context, req *relation. if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil { return nil, err } - opt := incrversion.Option[*relation.FriendInfo, relation.GetIncrementalFriendsResp]{ - VersionID: req.VersionID, - Version: func() (*model.VersionLog, error) { - return s.friendDatabase.FindFriendIncrVersion(ctx, req.UserID, uint(req.Version), incrversion.Limit(s.config.RpcConfig.FriendSyncCount, req.Version)) - }, - AllID: func() ([]string, error) { - return s.friendDatabase.FindSortFriendUserIDs(ctx, req.UserID) - }, - Find: func(ids []string) ([]*relation.FriendInfo, error) { + + opt := incrversion.Option[*pbfriend.FriendInfo, pbfriend.GetIncrementalFriendsResp]{ + Ctx: ctx, + VersionKey: req.UserID, + VersionID: req.VersionID, + VersionNumber: req.Version, + SyncLimit: s.config.RpcConfig.FriendSyncCount, + Version: s.friendDatabase.FindFriendIncrVersion, + CacheMaxVersion: s.friendDatabase.FindMaxFriendVersionCache, + SortID: s.friendDatabase.FindSortFriendUserIDs, + Find: func(ctx context.Context, ids []string) ([]*pbfriend.FriendInfo, error) { + friends, err := s.friendDatabase.FindFriendsWithError(ctx, req.UserID, ids) if err != nil { return nil, err } return friendsDB2PB(friends), nil }, - ID: func(elem *relation.FriendInfo) string { - return elem.FriendUserID - }, - Resp: func(version *model.VersionLog, delIDs []string, list []*relation.FriendInfo, full bool) *relation.GetIncrementalFriendsResp { - return &relation.GetIncrementalFriendsResp{ + + ID: func(elem *pbfriend.FriendInfo) string { return elem.FriendUserID }, + Resp: func(version *model.VersionLog, delIDs []string, list []*pbfriend.FriendInfo, full bool) *pbfriend.GetIncrementalFriendsResp { + return &pbfriend.GetIncrementalFriendsResp{ VersionID: version.ID.Hex(), Version: uint64(version.Version), Full: full, @@ -93,44 +95,3 @@ func (s *friendServer) GetIncrementalFriends(ctx context.Context, req *relation. } return opt.Build() } - -//func (s *friendServer) GetIncrementalFriends(ctx context.Context, req *relation.GetIncrementalFriendsReq) (*relation.GetIncrementalFriendsResp, error) { -// if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil { -// return nil, err -// } -// var limit int -// if req.Version > 0 { -// limit = s.config.RpcConfig.FriendSyncCount -// } -// incrVer, err := s.friendDatabase.FindFriendIncrVersion(ctx, req.UserID, uint(req.Version), limit) -// if err != nil { -// return nil, err -// } -// var ( -// deleteUserIDs []string -// changeUserIDs []string -// ) -// if incrVer.Full() { -// changeUserIDs, err = s.friendDatabase.FindSortFriendUserIDs(ctx, req.UserID) -// if err != nil { -// return nil, err -// } -// } else { -// deleteUserIDs, changeUserIDs = incrVer.DeleteAndChangeIDs() -// } -// var friends []*model.Friend -// if len(changeUserIDs) > 0 { -// friends, err = s.friendDatabase.FindFriendsWithError(ctx, req.UserID, changeUserIDs) -// if err != nil { -// return nil, err -// } -// } -// return &relation.GetIncrementalFriendsResp{ -// Version: uint64(incrVer.Version), -// VersionID: incrVer.ID.Hex(), -// Full: incrVer.Full(), -// SyncCount: uint32(s.config.RpcConfig.FriendSyncCount), -// DeleteUserIds: deleteUserIDs, -// Changes: friendsDB2PB(friends), -// }, nil -//} diff --git a/internal/rpc/group/sync.go b/internal/rpc/group/sync.go index 80cb5a9ea..445ba6874 100644 --- a/internal/rpc/group/sync.go +++ b/internal/rpc/group/sync.go @@ -16,19 +16,18 @@ func (s *groupServer) SearchGroupMember(ctx context.Context, req *pbgroup.Search func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgroup.GetIncrementalGroupMemberReq) (*pbgroup.GetIncrementalGroupMemberResp, error) { opt := incrversion.Option[*sdkws.GroupMemberFullInfo, pbgroup.GetIncrementalGroupMemberResp]{ - VersionID: req.VersionID, - Version: func() (*model.VersionLog, error) { - return s.db.FindMemberIncrVersion(ctx, req.GroupID, uint(req.Version), incrversion.Limit(s.config.RpcConfig.GroupSyncCount, req.Version)) - }, - AllID: func() ([]string, error) { - return s.db.FindSortGroupMemberUserIDs(ctx, req.GroupID) - }, - Find: func(ids []string) ([]*sdkws.GroupMemberFullInfo, error) { + Ctx: ctx, + VersionKey: req.GroupID, + VersionID: req.VersionID, + VersionNumber: req.Version, + SyncLimit: s.config.RpcConfig.GroupSyncCount, + Version: s.db.FindMemberIncrVersion, + CacheMaxVersion: s.db.FindMaxGroupMemberVersionCache, + SortID: s.db.FindSortGroupMemberUserIDs, + Find: func(ctx context.Context, ids []string) ([]*sdkws.GroupMemberFullInfo, error) { return s.getGroupMembersInfo(ctx, req.GroupID, ids) }, - ID: func(elem *sdkws.GroupMemberFullInfo) string { - return elem.UserID - }, + ID: func(elem *sdkws.GroupMemberFullInfo) string { return elem.UserID }, Resp: func(version *model.VersionLog, delIDs []string, list []*sdkws.GroupMemberFullInfo, full bool) *pbgroup.GetIncrementalGroupMemberResp { return &pbgroup.GetIncrementalGroupMemberResp{ VersionID: version.ID.Hex(), @@ -48,19 +47,16 @@ func (s *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup. return nil, err } opt := incrversion.Option[*sdkws.GroupInfo, pbgroup.GetIncrementalJoinGroupResp]{ - VersionID: req.VersionID, - Version: func() (*model.VersionLog, error) { - return s.db.FindJoinIncrVersion(ctx, req.UserID, uint(req.Version), incrversion.Limit(s.config.RpcConfig.GroupSyncCount, req.Version)) - }, - AllID: func() ([]string, error) { - return s.db.FindSortJoinGroupIDs(ctx, req.UserID) - }, - Find: func(ids []string) ([]*sdkws.GroupInfo, error) { - return s.getGroupsInfo(ctx, ids) - }, - ID: func(elem *sdkws.GroupInfo) string { - return elem.GroupID - }, + Ctx: ctx, + VersionKey: req.UserID, + VersionID: req.VersionID, + VersionNumber: req.Version, + SyncLimit: s.config.RpcConfig.GroupSyncCount, + Version: s.db.FindJoinIncrVersion, + CacheMaxVersion: s.db.FindMaxJoinGroupVersionCache, + SortID: s.db.FindSortJoinGroupIDs, + Find: s.getGroupsInfo, + ID: func(elem *sdkws.GroupInfo) string { return elem.GroupID }, Resp: func(version *model.VersionLog, delIDs []string, list []*sdkws.GroupInfo, full bool) *pbgroup.GetIncrementalJoinGroupResp { return &pbgroup.GetIncrementalJoinGroupResp{ VersionID: version.ID.Hex(), diff --git a/internal/rpc/incrversion/option.go b/internal/rpc/incrversion/option.go index 3e19056bf..3146985aa 100644 --- a/internal/rpc/incrversion/option.go +++ b/internal/rpc/incrversion/option.go @@ -1,38 +1,139 @@ package incrversion import ( + "context" + "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/utils/datautil" + "go.mongodb.org/mongo-driver/bson/primitive" ) -func Limit(maxSync int, version uint64) int { - if version == 0 { - return 0 +//func Limit(maxSync int, version uint64) int { +// if version == 0 { +// return 0 +// } +// return maxSync +//} + +const ( + tagQuery = iota + 1 + tagFull + tageEqual +) + +type Option[A, B any] struct { + Ctx context.Context + VersionKey string + VersionID string + VersionNumber uint64 + SyncLimit int + CacheMaxVersion func(ctx context.Context, dId string) (*model.VersionLog, error) + Version func(ctx context.Context, dId string, version uint, limit int) (*model.VersionLog, error) + SortID func(ctx context.Context, dId string) ([]string, error) + Find func(ctx context.Context, ids []string) ([]A, error) + ID func(elem A) string + Resp func(version *model.VersionLog, delIDs []string, list []A, full bool) *B +} + +func (o *Option[A, B]) newError(msg string) error { + return errs.ErrInternalServer.WrapMsg(msg) +} + +func (o *Option[A, B]) check() error { + if o.Ctx == nil { + return o.newError("opt ctx is nil") } - return maxSync + if o.VersionKey == "" { + return o.newError("versionKey is empty") + } + if o.SyncLimit <= 0 { + return o.newError("invalid synchronization quantity") + } + if o.Version == nil { + return o.newError("func version is nil") + } + if o.SortID == nil { + return o.newError("func allID is nil") + } + if o.Find == nil { + return o.newError("func find is nil") + } + if o.ID == nil { + return o.newError("func id is nil") + } + if o.Resp == nil { + return o.newError("func resp is nil") + } + return nil } -type Option[A, B any] struct { - VersionID string - Version func() (*model.VersionLog, error) - AllID func() ([]string, error) - Find func(ids []string) ([]A, error) - ID func(elem A) string - Resp func(version *model.VersionLog, delIDs []string, list []A, full bool) *B +func (o *Option[A, B]) validVersion() bool { + objID, err := primitive.ObjectIDFromHex(o.VersionID) + return err == nil && (!objID.IsZero()) && o.VersionNumber > 0 +} + +func (o *Option[A, B]) equalID(objID primitive.ObjectID) bool { + return o.VersionID == objID.Hex() +} + +func (o *Option[A, B]) getVersion(tag *int) (*model.VersionLog, error) { + if o.CacheMaxVersion == nil { + if o.validVersion() { + *tag = tagQuery + return o.Version(o.Ctx, o.VersionKey, uint(o.VersionNumber), o.SyncLimit) + } + *tag = tagFull + return o.Version(o.Ctx, o.VersionKey, 0, 0) + } else { + cache, err := o.CacheMaxVersion(o.Ctx, o.VersionKey) + if err != nil { + return nil, err + } + if !o.validVersion() { + *tag = tagFull + return cache, nil + } + if !o.equalID(cache.ID) { + *tag = tagFull + return cache, nil + } + if o.VersionNumber == uint64(cache.Version) { + *tag = tageEqual + return cache, nil + } + *tag = tagQuery + return o.Version(o.Ctx, o.VersionKey, uint(o.VersionNumber), o.SyncLimit) + } } func (o *Option[A, B]) Build() (*B, error) { - version, err := o.Version() + if err := o.check(); err != nil { + return nil, err + } + var tag int + version, err := o.getVersion(&tag) if err != nil { return nil, err } + var full bool + switch tag { + case tagQuery: + full = version.ID.Hex() != o.VersionID || uint64(version.Version) < o.VersionNumber || len(version.Logs) != version.LogLen + case tagFull: + full = true + case tageEqual: + full = false + default: + panic(fmt.Errorf("undefined tag %d", tag)) + } var ( deleteIDs []string changeIDs []string ) - full := o.VersionID != version.ID.Hex() || version.Full() + //full := o.VersionID != version.ID.Hex() || version.Full() if full { - changeIDs, err = o.AllID() + changeIDs, err = o.SortID(o.Ctx, o.VersionKey) if err != nil { return nil, err } @@ -41,7 +142,7 @@ func (o *Option[A, B]) Build() (*B, error) { } var list []A if len(changeIDs) > 0 { - list, err = o.Find(changeIDs) + list, err = o.Find(o.Ctx, changeIDs) if err != nil { return nil, err } diff --git a/pkg/common/storage/cache/cachekey/friend.go b/pkg/common/storage/cache/cachekey/friend.go index 6a217bdef..8a053ca32 100644 --- a/pkg/common/storage/cache/cachekey/friend.go +++ b/pkg/common/storage/cache/cachekey/friend.go @@ -14,14 +14,13 @@ package cachekey -import "strconv" - const ( - FriendIDsKey = "FRIEND_IDS:" - TwoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:" - FriendKey = "FRIEND_INFO:" - IsFriendKey = "IS_FRIEND:" // local cache key - FriendSyncSortUserIDsKey = "FRIEND_SYNC_SORT_USER_IDS:" + FriendIDsKey = "FRIEND_IDS:" + TwoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:" + FriendKey = "FRIEND_INFO:" + IsFriendKey = "IS_FRIEND:" // local cache key + //FriendSyncSortUserIDsKey = "FRIEND_SYNC_SORT_USER_IDS:" + FriendMaxVersionKey = "FRIEND_MAX_VERSION:" ) func GetFriendIDsKey(ownerUserID string) string { @@ -36,10 +35,14 @@ func GetFriendKey(ownerUserID, friendUserID string) string { return FriendKey + ownerUserID + "-" + friendUserID } +func GetFriendMaxVersionKey(ownerUserID string) string { + return FriendMaxVersionKey + ownerUserID +} + func GetIsFriendKey(possibleFriendUserID, userID string) string { return IsFriendKey + possibleFriendUserID + "-" + userID } -func GetFriendSyncSortUserIDsKey(ownerUserID string, count int) string { - return FriendSyncSortUserIDsKey + strconv.Itoa(count) + ":" + ownerUserID -} +//func GetFriendSyncSortUserIDsKey(ownerUserID string, count int) string { +// return FriendSyncSortUserIDsKey + strconv.Itoa(count) + ":" + ownerUserID +//} diff --git a/pkg/common/storage/cache/cachekey/group.go b/pkg/common/storage/cache/cachekey/group.go index 681121ecb..2ef42c0ff 100644 --- a/pkg/common/storage/cache/cachekey/group.go +++ b/pkg/common/storage/cache/cachekey/group.go @@ -28,6 +28,8 @@ const ( JoinedGroupsKey = "JOIN_GROUPS_KEY:" GroupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:" GroupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:" + GroupMemberMaxVersionKey = "GROUP_MEMBER_MAX_VERSION:" + GroupJoinMaxVersionKey = "GROUP_JOIN_MAX_VERSION:" ) func GetGroupInfoKey(groupID string) string { @@ -57,3 +59,11 @@ func GetGroupMemberNumKey(groupID string) string { func GetGroupRoleLevelMemberIDsKey(groupID string, roleLevel int32) string { return GroupRoleLevelMemberIDsKey + groupID + "-" + strconv.Itoa(int(roleLevel)) } + +func GetGroupMemberMaxVersionKey(groupID string) string { + return GroupMemberMaxVersionKey + groupID +} + +func GetJoinGroupMaxVersionKey(userID string) string { + return GroupJoinMaxVersionKey + userID +} diff --git a/pkg/common/storage/cache/friend.go b/pkg/common/storage/cache/friend.go index 3fee297ac..064e3baae 100644 --- a/pkg/common/storage/cache/friend.go +++ b/pkg/common/storage/cache/friend.go @@ -35,9 +35,13 @@ type FriendCache interface { DelOwner(friendUserID string, ownerUserIDs []string) FriendCache - DelSortFriendUserIDs(ownerUserIDs ...string) FriendCache + DelMaxFriendVersion(ownerUserIDs ...string) FriendCache + + //DelSortFriendUserIDs(ownerUserIDs ...string) FriendCache FindSortFriendUserIDs(ctx context.Context, ownerUserID string) ([]string, error) - FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*relationtb.VersionLog, error) + //FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*relationtb.VersionLog, error) + + FindMaxFriendVersion(ctx context.Context, ownerUserID string) (*relationtb.VersionLog, error) } diff --git a/pkg/common/storage/cache/group.go b/pkg/common/storage/cache/group.go index ff840a4a5..f02379a7d 100644 --- a/pkg/common/storage/cache/group.go +++ b/pkg/common/storage/cache/group.go @@ -62,4 +62,9 @@ type GroupCache interface { FindSortGroupMemberUserIDs(ctx context.Context, groupID string) ([]string, error) FindSortJoinGroupIDs(ctx context.Context, userID string) ([]string, error) + + DelMaxGroupMemberVersion(groupIDs ...string) GroupCache + DelMaxJoinGroupVersion(userIDs ...string) GroupCache + FindMaxGroupMemberVersion(ctx context.Context, groupID string) (*model.VersionLog, error) + FindMaxJoinGroupVersion(ctx context.Context, userID string) (*model.VersionLog, error) } diff --git a/pkg/common/storage/cache/redis/friend.go b/pkg/common/storage/cache/redis/friend.go index f4edbca9a..62fad91cd 100644 --- a/pkg/common/storage/cache/redis/friend.go +++ b/pkg/common/storage/cache/redis/friend.go @@ -70,8 +70,12 @@ func (f *FriendCacheRedis) getFriendIDsKey(ownerUserID string) string { return cachekey.GetFriendIDsKey(ownerUserID) } -func (f *FriendCacheRedis) getFriendSyncSortUserIDsKey(ownerUserID string) string { - return cachekey.GetFriendSyncSortUserIDsKey(ownerUserID, f.syncCount) +//func (f *FriendCacheRedis) getFriendSyncSortUserIDsKey(ownerUserID string) string { +// return cachekey.GetFriendSyncSortUserIDsKey(ownerUserID, f.syncCount) +//} + +func (f *FriendCacheRedis) getFriendMaxVersionKey(ownerUserID string) string { + return cachekey.GetFriendMaxVersionKey(ownerUserID) } // getTwoWayFriendsIDsKey returns the key for storing two-way friend IDs in the cache. @@ -103,15 +107,15 @@ func (f *FriendCacheRedis) DelFriendIDs(ownerUserIDs ...string) cache.FriendCach return newFriendCache } -func (f *FriendCacheRedis) DelSortFriendUserIDs(ownerUserIDs ...string) cache.FriendCache { - newGroupCache := f.CloneFriendCache() - keys := make([]string, 0, len(ownerUserIDs)) - for _, userID := range ownerUserIDs { - keys = append(keys, f.getFriendSyncSortUserIDsKey(userID)) - } - newGroupCache.AddKeys(keys...) - return newGroupCache -} +//func (f *FriendCacheRedis) DelSortFriendUserIDs(ownerUserIDs ...string) cache.FriendCache { +// newGroupCache := f.CloneFriendCache() +// keys := make([]string, 0, len(ownerUserIDs)) +// for _, userID := range ownerUserIDs { +// keys = append(keys, f.getFriendSyncSortUserIDsKey(userID)) +// } +// newGroupCache.AddKeys(keys...) +// return newGroupCache +//} // GetTwoWayFriendIDs retrieves two-way friend IDs from the cache. func (f *FriendCacheRedis) GetTwoWayFriendIDs(ctx context.Context, ownerUserID string) (twoWayFriendIDs []string, err error) { @@ -179,12 +183,29 @@ func (f *FriendCacheRedis) DelOwner(friendUserID string, ownerUserIDs []string) return newFriendCache } +func (f *FriendCacheRedis) DelMaxFriendVersion(ownerUserIDs ...string) cache.FriendCache { + newFriendCache := f.CloneFriendCache() + for _, ownerUserID := range ownerUserIDs { + key := f.getFriendMaxVersionKey(ownerUserID) + newFriendCache.AddKeys(key) // Assuming AddKeys marks the keys for deletion + } + + return newFriendCache +} + func (f *FriendCacheRedis) FindSortFriendUserIDs(ctx context.Context, ownerUserID string) ([]string, error) { - return getCache(ctx, f.rcClient, f.getFriendSyncSortUserIDsKey(ownerUserID), f.expireTime, func(ctx context.Context) ([]string, error) { - return f.friendDB.FindOwnerFriendUserIds(ctx, ownerUserID, f.syncCount) - }) + userIDs, err := f.GetFriendIDs(ctx, ownerUserID) + if err != nil { + return nil, err + } + if len(userIDs) > f.syncCount { + userIDs = userIDs[:f.syncCount] + } + return userIDs, nil } -func (f *FriendCacheRedis) FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*model.VersionLog, error) { - return f.friendDB.FindIncrVersion(ctx, ownerUserID, version, limit) +func (f *FriendCacheRedis) FindMaxFriendVersion(ctx context.Context, ownerUserID string) (*model.VersionLog, error) { + return getCache(ctx, f.rcClient, f.getFriendMaxVersionKey(ownerUserID), f.expireTime, func(ctx context.Context) (*model.VersionLog, error) { + return f.friendDB.FindIncrVersion(ctx, ownerUserID, 0, 0) + }) } diff --git a/pkg/common/storage/cache/redis/group.go b/pkg/common/storage/cache/redis/group.go index 1dcace5ed..67e246058 100644 --- a/pkg/common/storage/cache/redis/group.go +++ b/pkg/common/storage/cache/redis/group.go @@ -114,6 +114,14 @@ func (g *GroupCacheRedis) getGroupRoleLevelMemberIDsKey(groupID string, roleLeve return cachekey.GetGroupRoleLevelMemberIDsKey(groupID, roleLevel) } +func (g *GroupCacheRedis) getGroupMemberMaxVersionKey(groupID string) string { + return cachekey.GetGroupMemberMaxVersionKey(groupID) +} + +func (g *GroupCacheRedis) getJoinGroupMaxVersionKey(userID string) string { + return cachekey.GetJoinGroupMaxVersionKey(userID) +} + func (g *GroupCacheRedis) GetGroupIndex(group *model.Group, keys []string) (int, error) { key := g.getGroupInfoKey(group.GroupID) for i, _key := range keys { @@ -249,9 +257,17 @@ func (g *GroupCacheRedis) DelGroupMemberIDs(groupID string) cache.GroupCache { return cache } +func (g *GroupCacheRedis) findUserJoinedGroupID(ctx context.Context, userID string) ([]string, error) { + groupIDs, err := g.groupMemberDB.FindUserJoinedGroupID(ctx, userID) + if err != nil { + return nil, err + } + return g.groupDB.FindJoinSortGroupID(ctx, groupIDs) +} + func (g *GroupCacheRedis) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) { return getCache(ctx, g.rcClient, g.getJoinedGroupsKey(userID), g.expireTime, func(ctx context.Context) ([]string, error) { - return g.groupMemberDB.FindUserJoinedGroupID(ctx, userID) + return g.findUserJoinedGroupID(ctx, userID) }) } @@ -431,3 +447,35 @@ func (g *GroupCacheRedis) FindSortJoinGroupIDs(ctx context.Context, userID strin } return groupIDs, nil } + +func (g *GroupCacheRedis) DelMaxGroupMemberVersion(groupIDs ...string) cache.GroupCache { + keys := make([]string, 0, len(groupIDs)) + for _, groupID := range groupIDs { + keys = append(keys, g.getGroupMemberMaxVersionKey(groupID)) + } + cache := g.CloneGroupCache() + cache.AddKeys(keys...) + return cache +} + +func (g *GroupCacheRedis) DelMaxJoinGroupVersion(userIDs ...string) cache.GroupCache { + keys := make([]string, 0, len(userIDs)) + for _, userID := range userIDs { + keys = append(keys, g.getJoinGroupMaxVersionKey(userID)) + } + cache := g.CloneGroupCache() + cache.AddKeys(keys...) + return cache +} + +func (g *GroupCacheRedis) FindMaxGroupMemberVersion(ctx context.Context, groupID string) (*model.VersionLog, error) { + return getCache(ctx, g.rcClient, g.getGroupMemberMaxVersionKey(groupID), g.expireTime, func(ctx context.Context) (*model.VersionLog, error) { + return g.groupMemberDB.FindJoinIncrVersion(ctx, groupID, 0, 0) + }) +} + +func (g *GroupCacheRedis) FindMaxJoinGroupVersion(ctx context.Context, userID string) (*model.VersionLog, error) { + return getCache(ctx, g.rcClient, g.getJoinGroupMaxVersionKey(userID), g.expireTime, func(ctx context.Context) (*model.VersionLog, error) { + return g.groupMemberDB.FindJoinIncrVersion(ctx, userID, 0, 0) + }) +} diff --git a/pkg/common/storage/controller/friend.go b/pkg/common/storage/controller/friend.go index 1af967b9b..8f72703c0 100644 --- a/pkg/common/storage/controller/friend.go +++ b/pkg/common/storage/controller/friend.go @@ -82,6 +82,8 @@ type FriendDatabase interface { FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*model.VersionLog, error) + FindMaxFriendVersionCache(ctx context.Context, ownerUserID string) (*model.VersionLog, error) + FindFriendUserID(ctx context.Context, friendUserID string) ([]string, error) UpdateFriendUserInfo(ctx context.Context, friendUserID string, ownerUserID []string, nickname string, faceURL string) error @@ -185,7 +187,7 @@ func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string, return err } newFriendIDs = append(newFriendIDs, ownerUserID) - cache = cache.DelFriendIDs(newFriendIDs...).DelSortFriendUserIDs(ownerUserID) + cache = cache.DelFriendIDs(newFriendIDs...).DelMaxFriendVersion(newFriendIDs...) return cache.ChainExecDel(ctx) }) @@ -288,7 +290,7 @@ func (f *friendDatabase) AgreeFriendRequest(ctx context.Context, friendRequest * return err } } - return f.cache.DelFriendIDs(friendRequest.ToUserID, friendRequest.FromUserID).DelSortFriendUserIDs(friendRequest.ToUserID, friendRequest.FromUserID).ChainExecDel(ctx) + return f.cache.DelFriendIDs(friendRequest.ToUserID, friendRequest.FromUserID).DelMaxFriendVersion(friendRequest.ToUserID, friendRequest.FromUserID).ChainExecDel(ctx) }) } @@ -297,7 +299,8 @@ func (f *friendDatabase) Delete(ctx context.Context, ownerUserID string, friendU if err := f.friend.Delete(ctx, ownerUserID, friendUserIDs); err != nil { return err } - return f.cache.DelFriendIDs(append(friendUserIDs, ownerUserID)...).DelSortFriendUserIDs(ownerUserID).ChainExecDel(ctx) + userIds := append(friendUserIDs, ownerUserID) + return f.cache.DelFriendIDs(userIds...).DelMaxFriendVersion(userIds...).ChainExecDel(ctx) } // UpdateRemark updates the remark for a friend. Zero value for remark is also supported. @@ -305,7 +308,7 @@ func (f *friendDatabase) UpdateRemark(ctx context.Context, ownerUserID, friendUs if err := f.friend.UpdateRemark(ctx, ownerUserID, friendUserID, remark); err != nil { return err } - return f.cache.DelFriend(ownerUserID, friendUserID).ChainExecDel(ctx) + return f.cache.DelFriend(ownerUserID, friendUserID).DelMaxFriendVersion(ownerUserID).ChainExecDel(ctx) } // PageOwnerFriends retrieves the list of friends for the ownerUserID. It does not return an error if the result is empty. @@ -351,10 +354,12 @@ func (f *friendDatabase) UpdateFriends(ctx context.Context, ownerUserID string, if len(val) == 0 { return nil } - if err := f.friend.UpdateFriends(ctx, ownerUserID, friendUserIDs, val); err != nil { - return err - } - return f.cache.DelFriends(ownerUserID, friendUserIDs).DelSortFriendUserIDs(ownerUserID).ChainExecDel(ctx) + return f.tx.Transaction(ctx, func(ctx context.Context) error { + if err := f.friend.UpdateFriends(ctx, ownerUserID, friendUserIDs, val); err != nil { + return err + } + return f.cache.DelFriends(ownerUserID, friendUserIDs).DelMaxFriendVersion(ownerUserID).ChainExecDel(ctx) + }) } func (f *friendDatabase) FindSortFriendUserIDs(ctx context.Context, ownerUserID string) ([]string, error) { @@ -362,7 +367,11 @@ func (f *friendDatabase) FindSortFriendUserIDs(ctx context.Context, ownerUserID } func (f *friendDatabase) FindFriendIncrVersion(ctx context.Context, ownerUserID string, version uint, limit int) (*model.VersionLog, error) { - return f.cache.FindFriendIncrVersion(ctx, ownerUserID, version, limit) + return f.friend.FindIncrVersion(ctx, ownerUserID, version, limit) +} + +func (f *friendDatabase) FindMaxFriendVersionCache(ctx context.Context, ownerUserID string) (*model.VersionLog, error) { + return f.cache.FindMaxFriendVersion(ctx, ownerUserID) } func (f *friendDatabase) FindFriendUserID(ctx context.Context, friendUserID string) ([]string, error) { @@ -370,10 +379,12 @@ func (f *friendDatabase) FindFriendUserID(ctx context.Context, friendUserID stri } func (f *friendDatabase) UpdateFriendUserInfo(ctx context.Context, friendUserID string, ownerUserIDs []string, nickname string, faceURL string) error { - if err := f.friend.UpdateFriendUserInfo(ctx, friendUserID, nickname, faceURL); err != nil { - return err - } - return f.cache.DelOwner(friendUserID, ownerUserIDs).ChainExecDel(ctx) + return f.tx.Transaction(ctx, func(ctx context.Context) error { + if err := f.friend.UpdateFriendUserInfo(ctx, friendUserID, nickname, faceURL); err != nil { + return err + } + return f.cache.DelOwner(friendUserID, ownerUserIDs).DelMaxFriendVersion(ownerUserIDs...).ChainExecDel(ctx) + }) } func (f *friendDatabase) SearchFriend(ctx context.Context, ownerUserID, keyword string, pagination pagination.Pagination) (int64, []*model.Friend, error) { diff --git a/pkg/common/storage/controller/group.go b/pkg/common/storage/controller/group.go index 640ded6bd..f9ee65955 100644 --- a/pkg/common/storage/controller/group.go +++ b/pkg/common/storage/controller/group.go @@ -113,6 +113,11 @@ type GroupDatabase interface { FindSortGroupMemberUserIDs(ctx context.Context, groupID string) ([]string, error) FindSortJoinGroupIDs(ctx context.Context, userID string) ([]string, error) + + FindMaxGroupMemberVersionCache(ctx context.Context, groupID string) (*model.VersionLog, error) + FindMaxJoinGroupVersionCache(ctx context.Context, userID string) (*model.VersionLog, error) + + SearchJoinGroup(ctx context.Context, userID string, keyword string, pagination pagination.Pagination) (int64, []*model.Group, error) } func NewGroupDatabase( @@ -182,7 +187,8 @@ func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*model.Group, DelGroupMembersHash(group.GroupID). DelGroupsMemberNum(group.GroupID). DelGroupMemberIDs(group.GroupID). - DelGroupAllRoleLevel(group.GroupID) + DelGroupAllRoleLevel(group.GroupID). + DelMaxGroupMemberVersion(group.GroupID) } } if len(groupMembers) > 0 { @@ -195,7 +201,9 @@ func (g *groupDatabase) CreateGroup(ctx context.Context, groups []*model.Group, DelGroupMemberIDs(groupMember.GroupID). DelJoinedGroupID(groupMember.UserID). DelGroupMembersInfo(groupMember.GroupID, groupMember.UserID). - DelGroupAllRoleLevel(groupMember.GroupID) + DelGroupAllRoleLevel(groupMember.GroupID). + DelMaxJoinGroupVersion(groupMember.UserID). + DelMaxGroupMemberVersion(groupMember.GroupID) } } return c.ChainExecDel(ctx) @@ -239,8 +247,9 @@ func (g *groupDatabase) UpdateGroup(ctx context.Context, groupID string, data ma if err := g.groupMemberDB.JoinGroupIncrVersion(ctx, userID, []string{groupID}, false); err != nil { return err } + } - return g.cache.DelGroupsInfo(groupID).ChainExecDel(ctx) + return g.cache.CloneGroupCache().DelGroupsInfo(groupID).DelMaxJoinGroupVersion(userIDs...).ChainExecDel(ctx) }) } @@ -263,8 +272,10 @@ func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string, delete DelGroupsMemberNum(groupID). DelGroupMembersHash(groupID). DelGroupAllRoleLevel(groupID). - DelGroupMembersInfo(groupID, userIDs...) + DelGroupMembersInfo(groupID, userIDs...). + DelMaxGroupMemberVersion(groupID) } + c = c.DelMaxJoinGroupVersion(userIDs...) if len(userIDs) > 0 { if err := g.groupMemberDB.JoinGroupIncrVersion(ctx, groupID, userIDs, true); err != nil { return err @@ -340,7 +351,9 @@ func (g *groupDatabase) HandlerGroupRequest(ctx context.Context, groupID string, DelGroupMemberIDs(groupID). DelGroupsMemberNum(groupID). DelJoinedGroupID(member.UserID). - DelGroupRoleLevel(groupID, []int32{member.RoleLevel}) + DelGroupRoleLevel(groupID, []int32{member.RoleLevel}). + DelMaxJoinGroupVersion(userID). + DelMaxGroupMemberVersion(groupID) if err := c.ChainExecDel(ctx); err != nil { return err } @@ -350,17 +363,20 @@ func (g *groupDatabase) HandlerGroupRequest(ctx context.Context, groupID string, } func (g *groupDatabase) DeleteGroupMember(ctx context.Context, groupID string, userIDs []string) error { - if err := g.groupMemberDB.Delete(ctx, groupID, userIDs); err != nil { - return err - } - c := g.cache.CloneGroupCache() - return c.DelGroupMembersHash(groupID). - DelGroupMemberIDs(groupID). - DelGroupsMemberNum(groupID). - DelJoinedGroupID(userIDs...). - DelGroupMembersInfo(groupID, userIDs...). - DelGroupAllRoleLevel(groupID). - ChainExecDel(ctx) + return g.ctxTx.Transaction(ctx, func(ctx context.Context) error { + if err := g.groupMemberDB.Delete(ctx, groupID, userIDs); err != nil { + return err + } + c := g.cache.CloneGroupCache() + return c.DelGroupMembersHash(groupID). + DelGroupMemberIDs(groupID). + DelGroupsMemberNum(groupID). + DelJoinedGroupID(userIDs...). + DelGroupMembersInfo(groupID, userIDs...). + DelGroupAllRoleLevel(groupID). + DelMaxGroupMemberVersion(groupID). + ChainExecDel(ctx) + }) } func (g *groupDatabase) MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string]*common.GroupSimpleUserID, error) { @@ -390,20 +406,25 @@ func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string, c := g.cache.CloneGroupCache() return c.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID). DelGroupAllRoleLevel(groupID). - DelGroupMembersHash(groupID).ChainExecDel(ctx) + DelGroupMembersHash(groupID). + DelJoinedGroupID(oldOwnerUserID, newOwnerUserID). + ChainExecDel(ctx) }) } func (g *groupDatabase) UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error { - if err := g.groupMemberDB.Update(ctx, groupID, userID, data); err != nil { - return err - } - c := g.cache.CloneGroupCache() - c = c.DelGroupMembersInfo(groupID, userID) - if g.groupMemberDB.IsUpdateRoleLevel(data) { - c = c.DelGroupAllRoleLevel(groupID) - } - return c.ChainExecDel(ctx) + return g.ctxTx.Transaction(ctx, func(ctx context.Context) error { + if err := g.groupMemberDB.Update(ctx, groupID, userID, data); err != nil { + return err + } + c := g.cache.CloneGroupCache() + c = c.DelGroupMembersInfo(groupID, userID) + if g.groupMemberDB.IsUpdateRoleLevel(data) { + c = c.DelGroupAllRoleLevel(groupID) + } + c = c.DelMaxGroupMemberVersion(groupID).DelMaxJoinGroupVersion(userID) + return c.ChainExecDel(ctx) + }) } func (g *groupDatabase) UpdateGroupMembers(ctx context.Context, data []*common.BatchUpdateGroupMember) error { @@ -483,3 +504,19 @@ func (g *groupDatabase) FindSortGroupMemberUserIDs(ctx context.Context, groupID func (g *groupDatabase) FindSortJoinGroupIDs(ctx context.Context, userID string) ([]string, error) { return g.cache.FindSortJoinGroupIDs(ctx, userID) } + +func (g *groupDatabase) FindMaxGroupMemberVersionCache(ctx context.Context, groupID string) (*model.VersionLog, error) { + return g.cache.FindMaxGroupMemberVersion(ctx, groupID) +} + +func (g *groupDatabase) FindMaxJoinGroupVersionCache(ctx context.Context, userID string) (*model.VersionLog, error) { + return g.cache.FindMaxJoinGroupVersion(ctx, userID) +} + +func (g *groupDatabase) SearchJoinGroup(ctx context.Context, userID string, keyword string, pagination pagination.Pagination) (int64, []*model.Group, error) { + groupIDs, err := g.cache.GetJoinedGroupIDs(ctx, userID) + if err != nil { + return 0, nil, err + } + return g.groupDB.SearchJoin(ctx, groupIDs, keyword, pagination) +} diff --git a/pkg/common/storage/database/group.go b/pkg/common/storage/database/group.go index 712db09d2..7ef22f6c9 100644 --- a/pkg/common/storage/database/group.go +++ b/pkg/common/storage/database/group.go @@ -32,4 +32,8 @@ type Group interface { CountTotal(ctx context.Context, before *time.Time) (count int64, err error) // Get Group total quantity every day CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error) + + FindJoinSortGroupID(ctx context.Context, groupIDs []string) ([]string, error) + + SearchJoin(ctx context.Context, groupIDs []string, keyword string, pagination pagination.Pagination) (int64, []*model.Group, error) } diff --git a/pkg/common/storage/database/mgo/friend.go b/pkg/common/storage/database/mgo/friend.go index 83daf9ecd..e0e30642c 100644 --- a/pkg/common/storage/database/mgo/friend.go +++ b/pkg/common/storage/database/mgo/friend.go @@ -52,6 +52,10 @@ func NewFriendMongo(db *mongo.Database) (database.Friend, error) { return &FriendMgo{coll: coll, owner: owner}, nil } +func (f *FriendMgo) friendSort() any { + return bson.D{{"is_pinned", -1}, {"friend_nickname", 1}, {"create_time", 1}} +} + // Create inserts multiple friend records. func (f *FriendMgo) Create(ctx context.Context, friends []*model.Friend) error { return mongoutil.IncrVersion(func() error { @@ -145,13 +149,13 @@ func (f *FriendMgo) FindReversalFriends(ctx context.Context, friendUserID string // FindOwnerFriends retrieves a paginated list of friends for a given owner. func (f *FriendMgo) FindOwnerFriends(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (int64, []*model.Friend, error) { filter := bson.M{"owner_user_id": ownerUserID} - opt := options.Find().SetSort(bson.D{{"friend_nickname", 1}, {"create_time", 1}}) + opt := options.Find().SetSort(f.friendSort()) return mongoutil.FindPage[*model.Friend](ctx, f.coll, filter, pagination, opt) } func (f *FriendMgo) FindOwnerFriendUserIds(ctx context.Context, ownerUserID string, limit int) ([]string, error) { filter := bson.M{"owner_user_id": ownerUserID} - opt := options.Find().SetProjection(bson.M{"_id": 0, "friend_user_id": 1}).SetSort(bson.D{{"friend_nickname", 1}, {"create_time", 1}}).SetLimit(int64(limit)) + opt := options.Find().SetProjection(bson.M{"_id": 0, "friend_user_id": 1}).SetSort(f.friendSort()).SetLimit(int64(limit)) return mongoutil.Find[string](ctx, f.coll, filter, opt) } @@ -197,7 +201,7 @@ func (f *FriendMgo) FindFriendUserID(ctx context.Context, friendUserID string) ( filter := bson.M{ "friend_user_id": friendUserID, } - return mongoutil.Find[string](ctx, f.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1})) + return mongoutil.Find[string](ctx, f.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1}).SetSort(f.friendSort())) } func (f *FriendMgo) UpdateFriendUserInfo(ctx context.Context, friendUserID string, nickname string, faceURL string) error { @@ -209,14 +213,16 @@ func (f *FriendMgo) UpdateFriendUserInfo(ctx context.Context, friendUserID strin } func (f *FriendMgo) SearchFriend(ctx context.Context, ownerUserID, keyword string, pagination pagination.Pagination) (int64, []*model.Friend, error) { - //where := bson.M{ - // "owner_user_id": ownerUserID, - // "$or": []bson.M{ - // {"remark": bson.M{"$regex": keyword, "$options": "i"}}, - // {"friend_user_id": bson.M{"$regex": keyword, "$options": "i"}}, - // {"nickname": bson.M{"$regex": keyword, "$options": "i"}}, - // }, - //} - //return f.aggregatePagination(ctx, where, pagination) - panic("todo") + filter := bson.M{ + "owner_user_id": ownerUserID, + } + if keyword != "" { + filter["$or"] = []bson.M{ + {"remark": bson.M{"$regex": keyword, "$options": "i"}}, + {"nickname": bson.M{"$regex": keyword, "$options": "i"}}, + {"friend_user_id": bson.M{"$regex": keyword, "$options": "i"}}, + } + } + opt := options.Find().SetSort(f.friendSort()) + return mongoutil.FindPage[*model.Friend](ctx, f.coll, filter, pagination, opt) } diff --git a/pkg/common/storage/database/mgo/group.go b/pkg/common/storage/database/mgo/group.go index 48d24560b..630bc0291 100644 --- a/pkg/common/storage/database/mgo/group.go +++ b/pkg/common/storage/database/mgo/group.go @@ -47,6 +47,10 @@ type GroupMgo struct { coll *mongo.Collection } +func (g *GroupMgo) sortGroup() any { + return bson.D{{"group_name", 1}, {"create_time", 1}} +} + func (g *GroupMgo) Create(ctx context.Context, groups []*model.Group) (err error) { return mongoutil.InsertMany(ctx, g.coll, groups) } @@ -126,3 +130,32 @@ func (g *GroupMgo) CountRangeEverydayTotal(ctx context.Context, start time.Time, } return res, nil } + +func (g *GroupMgo) FindJoinSortGroupID(ctx context.Context, groupIDs []string) ([]string, error) { + if len(groupIDs) < 2 { + return groupIDs, nil + } + filter := bson.M{ + "group_id": bson.M{"$in": groupIDs}, + "status": bson.M{"$ne": constant.GroupStatusDismissed}, + } + opt := options.Find().SetSort(g.sortGroup()).SetProjection(bson.M{"_id": 0, "group_id": 1}) + return mongoutil.Find[string](ctx, g.coll, filter, opt) +} + +func (g *GroupMgo) SearchJoin(ctx context.Context, groupIDs []string, keyword string, pagination pagination.Pagination) (int64, []*model.Group, error) { + if len(groupIDs) == 0 { + return 0, nil, nil + } + filter := bson.M{ + "group_id": bson.M{"$in": groupIDs}, + "status": bson.M{"$ne": constant.GroupStatusDismissed}, + } + if keyword != "" { + filter["group_name"] = bson.M{"$regex": keyword} + } + // Define the sorting options + opts := options.Find().SetSort(g.sortGroup()) + // Perform the search with pagination and sorting + return mongoutil.FindPage[*model.Group](ctx, g.coll, filter, pagination, opts) +} diff --git a/pkg/common/storage/database/mgo/group_member.go b/pkg/common/storage/database/mgo/group_member.go index 372babc9f..cb64c87a4 100644 --- a/pkg/common/storage/database/mgo/group_member.go +++ b/pkg/common/storage/database/mgo/group_member.go @@ -57,7 +57,7 @@ type GroupMemberMgo struct { join database.VersionLog } -func (g *GroupMemberMgo) sortBson() any { +func (g *GroupMemberMgo) memberSort() any { return bson.D{{"role_level", -1}, {"create_time", -1}} } @@ -128,7 +128,7 @@ func (g *GroupMemberMgo) Update(ctx context.Context, groupID string, userID stri } func (g *GroupMemberMgo) FindMemberUserID(ctx context.Context, groupID string) (userIDs []string, err error) { - return mongoutil.Find[string](ctx, g.coll, bson.M{"group_id": groupID}, options.Find().SetProjection(bson.M{"_id": 0, "user_id": 1}).SetSort(g.sortBson())) + return mongoutil.Find[string](ctx, g.coll, bson.M{"group_id": groupID}, options.Find().SetProjection(bson.M{"_id": 0, "user_id": 1}).SetSort(g.memberSort())) } func (g *GroupMemberMgo) Take(ctx context.Context, groupID string, userID string) (groupMember *model.GroupMember, err error) { @@ -143,13 +143,13 @@ func (g *GroupMemberMgo) FindRoleLevelUserIDs(ctx context.Context, groupID strin return mongoutil.Find[string](ctx, g.coll, bson.M{"group_id": groupID, "role_level": roleLevel}, options.Find().SetProjection(bson.M{"_id": 0, "user_id": 1})) } -func (g *GroupMemberMgo) SearchMember(ctx context.Context, keyword string, groupID string, pagination pagination.Pagination) (total int64, groupList []*model.GroupMember, err error) { +func (g *GroupMemberMgo) SearchMember(ctx context.Context, keyword string, groupID string, pagination pagination.Pagination) (int64, []*model.GroupMember, error) { filter := bson.M{"group_id": groupID, "nickname": bson.M{"$regex": keyword}} - return mongoutil.FindPage[*model.GroupMember](ctx, g.coll, filter, pagination, options.Find().SetSort(g.sortBson())) + return mongoutil.FindPage[*model.GroupMember](ctx, g.coll, filter, pagination, options.Find().SetSort(g.memberSort())) } func (g *GroupMemberMgo) FindUserJoinedGroupID(ctx context.Context, userID string) (groupIDs []string, err error) { - return mongoutil.Find[string](ctx, g.coll, bson.M{"user_id": userID}, options.Find().SetProjection(bson.M{"_id": 0, "group_id": 1}).SetSort(g.sortBson())) + return mongoutil.Find[string](ctx, g.coll, bson.M{"user_id": userID}, options.Find().SetProjection(bson.M{"_id": 0, "group_id": 1}).SetSort(g.memberSort())) } func (g *GroupMemberMgo) TakeGroupMemberNum(ctx context.Context, groupID string) (count int64, err error) { diff --git a/pkg/common/storage/model/version_log.go b/pkg/common/storage/model/version_log.go index 044bd42da..a09f493a8 100644 --- a/pkg/common/storage/model/version_log.go +++ b/pkg/common/storage/model/version_log.go @@ -30,7 +30,6 @@ func (v *VersionLogTable) VersionLog() *VersionLog { Deleted: v.Deleted, LastUpdate: v.LastUpdate, LogLen: 0, - queryDoc: true, } } @@ -42,15 +41,10 @@ type VersionLog struct { Deleted uint `bson:"deleted"` LastUpdate time.Time `bson:"last_update"` LogLen int `bson:"log_len"` - queryDoc bool `bson:"-"` } -func (w *VersionLog) Full() bool { - return w.queryDoc || w.Version == 0 || len(w.Logs) != w.LogLen -} - -func (w *VersionLog) DeleteAndChangeIDs() (delIds []string, changeIds []string) { - for _, l := range w.Logs { +func (v *VersionLog) DeleteAndChangeIDs() (delIds []string, changeIds []string) { + for _, l := range v.Logs { if l.Deleted { delIds = append(delIds, l.EID) } else {