diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 778316e5c..574ae5dc2 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -24,6 +24,8 @@ import ( "time" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "github.com/openimsdk/tools/utils/stringutil" + "google.golang.org/grpc" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/common" @@ -53,7 +55,6 @@ import ( "github.com/openimsdk/tools/mw/specialerror" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/encrypt" - "google.golang.org/grpc" ) type groupServer struct { @@ -147,8 +148,8 @@ func (s *groupServer) NotificationUserInfoUpdate(ctx context.Context, req *pbgro return nil, err } } - for _, groupID := range groupIDs { - s.notification.GroupMemberInfoSetNotification(ctx, groupID, req.UserID) + if err = s.notification.GroupMemberInfoSetNotificationBulk(ctx, groupIDs, req.NewUserInfo); err != nil { + log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } if err = s.db.DeleteGroupMemberHash(ctx, groupIDs); err != nil { return nil, err diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index 95335b2fc..c8ec39355 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -803,6 +803,182 @@ func (g *NotificationSender) GroupCancelMutedNotification(ctx context.Context, g g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupCancelMutedNotification, tips) } +func (g *NotificationSender) getPublicUserInfos(ctx context.Context, userIDs []string) ([]*sdkws.PublicUserInfo, error) { + users, err := g.getUsersInfo(ctx, userIDs) + if err != nil { + return nil, err + } + userMap := make(map[string]common_user.CommonUser) + for _, user := range users { + userMap[user.GetUserID()] = user + } + for _, userID := range userIDs { + if _, ok := userMap[userID]; !ok { + return nil, servererrs.ErrUserIDNotFound.WrapMsg(fmt.Sprintf("user %s not found", userID)) + } + } + + return datautil.Slice(users, func(e common_user.CommonUser) *sdkws.PublicUserInfo { + return &sdkws.PublicUserInfo{ + UserID: e.GetUserID(), + Nickname: e.GetNickname(), + FaceURL: e.GetFaceURL(), + Ex: e.GetEx(), + } + }), nil +} + +func (g *NotificationSender) getGroupMembersForUser(ctx context.Context, groupIDs []string, userID string) ([]*sdkws.GroupMemberFullInfo, error) { + members, err := g.db.FindGroupMemberUser(ctx, groupIDs, userID) + if err != nil { + return nil, err + } + if err := g.PopulateGroupMember(ctx, members...); err != nil { + return nil, err + } + log.ZDebug(ctx, "getGroupMembers", "members", members) + return datautil.Slice(members, func(e *model.GroupMember) *sdkws.GroupMemberFullInfo { return g.groupMemberDB2PB(e, 0) }), nil +} + +func (g *NotificationSender) getGroupMemberMapForUser(ctx context.Context, groupIDs []string, userID string) (map[string]*sdkws.GroupMemberFullInfo, error) { + members, err := g.getGroupMembersForUser(ctx, groupIDs, userID) + if err != nil { + return nil, err + } + m := make(map[string]*sdkws.GroupMemberFullInfo) + for i, member := range members { + m[member.GroupID] = members[i] + } + return m, nil +} + +func (g *NotificationSender) migrateFallbackToGroupMemberFullInfo(groupMemberFullInfo *sdkws.GroupMemberFullInfo, publicUserInfo *sdkws.PublicUserInfo) *sdkws.GroupMemberFullInfo { + if groupMemberFullInfo == nil { + groupMemberFullInfo = &sdkws.GroupMemberFullInfo{} + } + if publicUserInfo == nil { + publicUserInfo = &sdkws.PublicUserInfo{} + } + + firstNotEmpty := func(s ...string) string { + for _, s := range s { + if s != "" { + return s + } + } + return "" + } + + return &sdkws.GroupMemberFullInfo{ + GroupID: groupMemberFullInfo.GroupID, + UserID: firstNotEmpty(groupMemberFullInfo.UserID, publicUserInfo.UserID), + RoleLevel: groupMemberFullInfo.RoleLevel, + JoinTime: groupMemberFullInfo.JoinTime, + Nickname: firstNotEmpty(groupMemberFullInfo.Nickname, publicUserInfo.Nickname), + FaceURL: firstNotEmpty(groupMemberFullInfo.FaceURL, publicUserInfo.FaceURL), + AppMangerLevel: groupMemberFullInfo.AppMangerLevel, + JoinSource: groupMemberFullInfo.JoinSource, + OperatorUserID: firstNotEmpty(groupMemberFullInfo.OperatorUserID, publicUserInfo.UserID), + InviterUserID: groupMemberFullInfo.InviterUserID, + Ex: groupMemberFullInfo.Ex, + MuteEndTime: groupMemberFullInfo.MuteEndTime, + } +} + +func (g *NotificationSender) getGroupInfos(ctx context.Context, groupIDs []string) ([]*sdkws.GroupInfo, error) { + var err error + defer func() { + if err != nil { + log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) + } + }() + + groups, err := g.db.FindGroup(ctx, groupIDs) + if err != nil { + return nil, err + } + numMap, err := g.db.FindGroupMemberNums(ctx, groupIDs) + if err != nil { + return nil, err + } + owners, err := g.db.FindGroupsOwner(ctx, groupIDs) + if err != nil { + return nil, err + } + ownerMap := make(map[string]string) + for _, owner := range owners { + ownerMap[owner.GroupID] = owner.UserID + } + + return datautil.Slice(groups, func(group *model.Group) *sdkws.GroupInfo { + return convert.Db2PbGroupInfo(group, ownerMap[group.GroupID], numMap[group.GroupID]) + }), nil +} + +func (g *NotificationSender) GroupMemberInfoSetNotificationBulk(ctx context.Context, groupIDs []string, changedUserInfo *sdkws.UserInfo) error { + opUserID := mcontext.GetOpUserID(ctx) + opIsAdmin := authverify.CheckUserIsAdmin(ctx, opUserID) + + groupInfos, err := g.getGroupInfos(ctx, groupIDs) + if err != nil { + return err + } + + groupMemberMap, err := g.getGroupMemberMapForUser(ctx, groupIDs, changedUserInfo.UserID) + if err != nil { + return err + } + + opMemberMap := groupMemberMap + if opUserID != changedUserInfo.UserID { + opMemberMap, err = g.getGroupMemberMapForUser(ctx, groupIDs, opUserID) + if err != nil { + return err + } + } + + opPublicUser := &sdkws.PublicUserInfo{ + UserID: changedUserInfo.GetUserID(), + Nickname: changedUserInfo.GetNickname(), + FaceURL: changedUserInfo.GetFaceURL(), + Ex: changedUserInfo.GetEx(), + } + if opUserID != changedUserInfo.UserID { + opPublicUser, err = g.getUser(ctx, opUserID) + if err != nil { + return err + } + } + + for _, groupInfo := range groupInfos { + if groupMemberMap[groupInfo.GroupID] == nil { + continue + } + + opUserGroupMemberFullInfo := datautil.If(opIsAdmin, &sdkws.GroupMemberFullInfo{ + GroupID: groupInfo.GroupID, + UserID: opUserID, + RoleLevel: constant.GroupAdmin, + AppMangerLevel: constant.AppAdmin, + }, opMemberMap[groupInfo.GroupID]) + if opUserGroupMemberFullInfo == nil { + opUserGroupMemberFullInfo = &sdkws.GroupMemberFullInfo{} + } + opUserGroupMemberFullInfo.GroupID = groupInfo.GroupID + + tips := &sdkws.GroupMemberInfoSetTips{ + Group: groupInfo, + OpUser: g.migrateFallbackToGroupMemberFullInfo(opUserGroupMemberFullInfo, opPublicUser), + ChangedUser: groupMemberMap[groupInfo.GroupID], + } + + g.setSortVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID, &tips.GroupSortVersion) + g.Notification(ctx, opUserID, groupInfo.GroupID, constant.GroupMemberInfoSetNotification, tips) + } + + return nil +} + func (g *NotificationSender) GroupMemberInfoSetNotification(ctx context.Context, groupID, groupMemberUserID string) { var err error defer func() { @@ -810,13 +986,11 @@ func (g *NotificationSender) GroupMemberInfoSetNotification(ctx context.Context, log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - var group *sdkws.GroupInfo - group, err = g.getGroupInfo(ctx, groupID) + group, err := g.getGroupInfo(ctx, groupID) if err != nil { return } - var user map[string]*sdkws.GroupMemberFullInfo - user, err = g.getGroupMemberMap(ctx, groupID, []string{groupMemberUserID}) + user, err := g.getGroupMemberMap(ctx, groupID, []string{groupMemberUserID}) if err != nil { return } @@ -835,8 +1009,7 @@ func (g *NotificationSender) GroupMemberSetToAdminNotification(ctx context.Conte log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - var group *sdkws.GroupInfo - group, err = g.getGroupInfo(ctx, groupID) + group, err := g.getGroupInfo(ctx, groupID) if err != nil { return } @@ -859,13 +1032,11 @@ func (g *NotificationSender) GroupMemberSetToOrdinaryUserNotification(ctx contex log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err) } }() - var group *sdkws.GroupInfo - group, err = g.getGroupInfo(ctx, groupID) + group, err := g.getGroupInfo(ctx, groupID) if err != nil { return } - var user map[string]*sdkws.GroupMemberFullInfo - user, err = g.getGroupMemberMap(ctx, groupID, []string{mcontext.GetOpUserID(ctx), groupMemberUserID}) + user, err := g.getGroupMemberMap(ctx, groupID, []string{mcontext.GetOpUserID(ctx), groupMemberUserID}) if err != nil { return } diff --git a/pkg/common/storage/cache/group.go b/pkg/common/storage/cache/group.go index 05b75745a..4be494bf1 100644 --- a/pkg/common/storage/cache/group.go +++ b/pkg/common/storage/cache/group.go @@ -57,6 +57,7 @@ type GroupCache interface { GetGroupRoleLevelMemberInfo(ctx context.Context, groupID string, roleLevel int32) ([]*model.GroupMember, error) GetGroupRolesLevelMemberInfo(ctx context.Context, groupID string, roleLevels []int32) ([]*model.GroupMember, error) GetGroupMemberNum(ctx context.Context, groupID string) (memberNum int64, err error) + GetGroupMemberNums(ctx context.Context, groupIDs []string) (memberNumMap map[string]int64, err error) DelGroupsMemberNum(groupID ...string) GroupCache //FindSortGroupMemberUserIDs(ctx context.Context, groupID string) ([]string, error) diff --git a/pkg/common/storage/cache/redis/group.go b/pkg/common/storage/cache/redis/group.go index 736111df3..98e36f678 100644 --- a/pkg/common/storage/cache/redis/group.go +++ b/pkg/common/storage/cache/redis/group.go @@ -16,10 +16,13 @@ package redis import ( "context" + "encoding/json" "fmt" "time" "github.com/dtm-labs/rockscache" + "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" @@ -286,6 +289,57 @@ func (g *GroupCacheRedis) GetGroupMemberNum(ctx context.Context, groupID string) }) } +type groupMemberNumCache struct { + GroupID string `json:"group_id"` + MemberNum int64 `json:"member_num"` +} + +type groupMemberNumBatchCache struct { + GroupID string + MemberNum int64 +} + +func (r *groupMemberNumBatchCache) BatchCache(groupID string) { + r.GroupID = groupID +} +func (r *groupMemberNumBatchCache) UnmarshalJSON(bytes []byte) error { + return json.Unmarshal(bytes, &r.MemberNum) +} +func (r *groupMemberNumBatchCache) MarshalJSON() ([]byte, error) { + return json.Marshal(r.MemberNum) +} +func (g *GroupCacheRedis) GetGroupMemberNums(ctx context.Context, groupIDs []string) (map[string]int64, error) { + items, err := batchGetCache2( + ctx, + g.rcClient, + g.expireTime, + groupIDs, + func(groupID string) string { return g.getGroupMemberNumKey(groupID) }, + func(v *groupMemberNumBatchCache) string { return v.GroupID }, + func(ctx context.Context, ids []string) ([]*groupMemberNumBatchCache, error) { + res := make([]*groupMemberNumBatchCache, 0, len(ids)) + for _, groupID := range ids { + num, err := g.groupMemberDB.TakeGroupMemberNum(ctx, groupID) + if err != nil { + return nil, err + } + res = append(res, &groupMemberNumBatchCache{ + GroupID: groupID, + MemberNum: num, + }) + } + return res, nil + }, + ) + if err != nil { + return nil, err + } + + return datautil.SliceToMapAny(items, func(item *groupMemberNumBatchCache) (string, int64) { + return item.GroupID, item.MemberNum + }), nil +} + func (g *GroupCacheRedis) DelGroupsMemberNum(groupID ...string) cache.GroupCache { keys := make([]string, 0, len(groupID)) for _, groupID := range groupID { @@ -308,18 +362,111 @@ func (g *GroupCacheRedis) GetGroupOwner(ctx context.Context, groupID string) (*m return members[0], nil } +type groupRoleLevelMemberIDsBatchCache struct { + GroupID string + UserIDs []string +} + +func (r *groupRoleLevelMemberIDsBatchCache) BatchCache(groupID string) { + r.GroupID = groupID +} +func (r *groupRoleLevelMemberIDsBatchCache) UnmarshalJSON(bytes []byte) (err error) { + return json.Unmarshal(bytes, &r.UserIDs) +} +func (r *groupRoleLevelMemberIDsBatchCache) MarshalJSON() ([]byte, error) { + return json.Marshal(r.UserIDs) +} + +func (g *GroupCacheRedis) batchGetGroupRoleLevelMemberIDs(ctx context.Context, groupIDs []string, roleLevel int32) (map[string][]string, error) { + items, err := batchGetCache2( + ctx, + g.rcClient, + g.expireTime, + groupIDs, + func(groupID string) string { + return g.getGroupRoleLevelMemberIDsKey(groupID, roleLevel) + }, + func(v *groupRoleLevelMemberIDsBatchCache) string { + return v.GroupID + }, + func(ctx context.Context, ids []string) ([]*groupRoleLevelMemberIDsBatchCache, error) { + res := make([]*groupRoleLevelMemberIDsBatchCache, 0, len(ids)) + for _, groupID := range ids { + userIDs, err := g.groupMemberDB.FindRoleLevelUserIDs(ctx, groupID, roleLevel) + if err != nil { + return nil, err + } + res = append(res, &groupRoleLevelMemberIDsBatchCache{ + GroupID: groupID, + UserIDs: userIDs, + }) + } + return res, nil + }, + ) + if err != nil { + return nil, err + } + + return datautil.SliceToMapAny(items, func(item *groupRoleLevelMemberIDsBatchCache) (string, []string) { + return item.GroupID, item.UserIDs + }), nil +} + +type groupUserIDPair struct { + GroupID string + UserID string +} + +func (g *GroupCacheRedis) batchGetGroupMembersByPairs(ctx context.Context, ids []groupUserIDPair) ([]*model.GroupMember, error) { + return batchGetCache2(ctx, g.rcClient, g.expireTime, ids, func(id groupUserIDPair) string { + return g.getGroupMemberInfoKey(id.GroupID, id.UserID) + }, func(member *model.GroupMember) groupUserIDPair { + return groupUserIDPair{GroupID: member.GroupID, UserID: member.UserID} + }, func(ctx context.Context, ids []groupUserIDPair) ([]*model.GroupMember, error) { + groupIDsByUser := make(map[string][]string) + for _, id := range ids { + groupIDsByUser[id.UserID] = append(groupIDsByUser[id.UserID], id.GroupID) + } + members := make([]*model.GroupMember, 0, len(ids)) + for userID, groupIDs := range groupIDsByUser { + items, err := g.groupMemberDB.FindInGroup(ctx, userID, groupIDs) + if err != nil { + return nil, err + } + members = append(members, items...) + } + return members, nil + }) +} + func (g *GroupCacheRedis) GetGroupsOwner(ctx context.Context, groupIDs []string) ([]*model.GroupMember, error) { - members := make([]*model.GroupMember, 0, len(groupIDs)) + ownerIDs, err := g.batchGetGroupRoleLevelMemberIDs(ctx, groupIDs, constant.GroupOwner) + if err != nil { + return nil, err + } + pairs := make([]groupUserIDPair, 0, len(groupIDs)) for _, groupID := range groupIDs { - items, err := g.GetGroupRoleLevelMemberInfo(ctx, groupID, constant.GroupOwner) - if err != nil { - return nil, err + ids := ownerIDs[groupID] + if len(ids) == 0 { + continue } - if len(items) > 0 { - members = append(members, items[0]) + pairs = append(pairs, groupUserIDPair{GroupID: groupID, UserID: ids[0]}) + } + members, err := g.batchGetGroupMembersByPairs(ctx, pairs) + if err != nil { + return nil, err + } + memberMap := datautil.SliceToMapAny(members, func(member *model.GroupMember) (groupUserIDPair, *model.GroupMember) { + return groupUserIDPair{GroupID: member.GroupID, UserID: member.UserID}, member + }) + result := make([]*model.GroupMember, 0, len(pairs)) + for _, pair := range pairs { + if member, ok := memberMap[pair]; ok { + result = append(result, member) } } - return members, nil + return result, nil } func (g *GroupCacheRedis) GetGroupRoleLevelMemberIDs(ctx context.Context, groupID string, roleLevel int32) ([]string, error) { diff --git a/pkg/common/storage/controller/group.go b/pkg/common/storage/controller/group.go index 539f7dccc..1c7fada7e 100644 --- a/pkg/common/storage/controller/group.go +++ b/pkg/common/storage/controller/group.go @@ -65,6 +65,8 @@ type GroupDatabase interface { FindGroupMemberUserID(ctx context.Context, groupID string) ([]string, error) // FindGroupMemberNum retrieves the number of members in a group. FindGroupMemberNum(ctx context.Context, groupID string) (uint32, error) + // FindGroupMemberNums retrieves the number of members for multiple groups. + FindGroupMemberNums(ctx context.Context, groupIDs []string) (map[string]uint32, error) // FindUserManagedGroupID retrieves group IDs managed by a user. FindUserManagedGroupID(ctx context.Context, userID string) (groupIDs []string, err error) // PageGroupRequest paginates through group requests for specified groups. @@ -233,6 +235,18 @@ func (g *groupDatabase) FindGroupMemberNum(ctx context.Context, groupID string) return uint32(num), nil } +func (g *groupDatabase) FindGroupMemberNums(ctx context.Context, groupIDs []string) (map[string]uint32, error) { + nums, err := g.cache.GetGroupMemberNums(ctx, groupIDs) + if err != nil { + return nil, err + } + result := make(map[string]uint32, len(nums)) + for groupID, num := range nums { + result[groupID] = uint32(num) + } + return result, nil +} + func (g *groupDatabase) TakeGroup(ctx context.Context, groupID string) (*model.Group, error) { return g.cache.GetGroupInfo(ctx, groupID) }