pull/3705/merge
dsx137 3 weeks ago committed by GitHub
commit 78a5fee810
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -24,6 +24,8 @@ import (
"time"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/tools/utils/stringutil"
"google.golang.org/grpc"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/common"
@ -53,7 +55,6 @@ import (
"github.com/openimsdk/tools/mw/specialerror"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/encrypt"
"google.golang.org/grpc"
)
type groupServer struct {
@ -147,8 +148,8 @@ func (s *groupServer) NotificationUserInfoUpdate(ctx context.Context, req *pbgro
return nil, err
}
}
for _, groupID := range groupIDs {
s.notification.GroupMemberInfoSetNotification(ctx, groupID, req.UserID)
if err = s.notification.GroupMemberInfoSetNotificationBulk(ctx, groupIDs, req.NewUserInfo); err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
if err = s.db.DeleteGroupMemberHash(ctx, groupIDs); err != nil {
return nil, err

@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"slices"
"time"
"github.com/google/uuid"
@ -803,6 +804,182 @@ func (g *NotificationSender) GroupCancelMutedNotification(ctx context.Context, g
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupCancelMutedNotification, tips)
}
func (g *NotificationSender) getPublicUserInfos(ctx context.Context, userIDs []string) ([]*sdkws.PublicUserInfo, error) {
users, err := g.getUsersInfo(ctx, userIDs)
if err != nil {
return nil, err
}
userMap := make(map[string]common_user.CommonUser)
for _, user := range users {
userMap[user.GetUserID()] = user
}
for _, userID := range userIDs {
if _, ok := userMap[userID]; !ok {
return nil, servererrs.ErrUserIDNotFound.WrapMsg(fmt.Sprintf("user %s not found", userID))
}
}
return datautil.Slice(users, func(e common_user.CommonUser) *sdkws.PublicUserInfo {
return &sdkws.PublicUserInfo{
UserID: e.GetUserID(),
Nickname: e.GetNickname(),
FaceURL: e.GetFaceURL(),
Ex: e.GetEx(),
}
}), nil
}
func (g *NotificationSender) getGroupMembersForUser(ctx context.Context, groupIDs []string, userID string) ([]*sdkws.GroupMemberFullInfo, error) {
members, err := g.db.FindGroupMemberUser(ctx, groupIDs, userID)
if err != nil {
return nil, err
}
if err := g.PopulateGroupMember(ctx, members...); err != nil {
return nil, err
}
log.ZDebug(ctx, "getGroupMembers", "members", members)
return datautil.Slice(members, func(e *model.GroupMember) *sdkws.GroupMemberFullInfo { return g.groupMemberDB2PB(e, 0) }), nil
}
func (g *NotificationSender) getGroupMemberMapForUser(ctx context.Context, groupIDs []string, userID string) (map[string]*sdkws.GroupMemberFullInfo, error) {
members, err := g.getGroupMembersForUser(ctx, groupIDs, userID)
if err != nil {
return nil, err
}
m := make(map[string]*sdkws.GroupMemberFullInfo)
for i, member := range members {
m[member.GroupID] = members[i]
}
return m, nil
}
func (g *NotificationSender) migrateFallbackToGroupMemberFullInfo(groupMemberFullInfo *sdkws.GroupMemberFullInfo, publicUserInfo *sdkws.PublicUserInfo) *sdkws.GroupMemberFullInfo {
if groupMemberFullInfo == nil {
groupMemberFullInfo = &sdkws.GroupMemberFullInfo{}
}
if publicUserInfo == nil {
publicUserInfo = &sdkws.PublicUserInfo{}
}
firstNotEmpty := func(s ...string) string {
for _, s := range s {
if s != "" {
return s
}
}
return ""
}
return &sdkws.GroupMemberFullInfo{
GroupID: groupMemberFullInfo.GroupID,
UserID: firstNotEmpty(groupMemberFullInfo.UserID, publicUserInfo.UserID),
RoleLevel: groupMemberFullInfo.RoleLevel,
JoinTime: groupMemberFullInfo.JoinTime,
Nickname: firstNotEmpty(groupMemberFullInfo.Nickname, publicUserInfo.Nickname),
FaceURL: firstNotEmpty(groupMemberFullInfo.FaceURL, publicUserInfo.FaceURL),
AppMangerLevel: groupMemberFullInfo.AppMangerLevel,
JoinSource: groupMemberFullInfo.JoinSource,
OperatorUserID: firstNotEmpty(groupMemberFullInfo.OperatorUserID, publicUserInfo.UserID),
InviterUserID: groupMemberFullInfo.InviterUserID,
Ex: groupMemberFullInfo.Ex,
MuteEndTime: groupMemberFullInfo.MuteEndTime,
}
}
func (g *NotificationSender) getGroupInfos(ctx context.Context, groupIDs []string) ([]*sdkws.GroupInfo, error) {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
groups, err := g.db.FindGroup(ctx, groupIDs)
if err != nil {
return nil, err
}
numMap, err := g.db.FindGroupMemberNums(ctx, groupIDs)
if err != nil {
return nil, err
}
owners, err := g.db.FindGroupsOwner(ctx, groupIDs)
if err != nil {
return nil, err
}
ownerMap := make(map[string]string)
for _, owner := range owners {
ownerMap[owner.GroupID] = owner.UserID
}
return datautil.Slice(groups, func(group *model.Group) *sdkws.GroupInfo {
return convert.Db2PbGroupInfo(group, ownerMap[group.GroupID], numMap[group.GroupID])
}), nil
}
func (g *NotificationSender) GroupMemberInfoSetNotificationBulk(ctx context.Context, groupIDs []string, changedUserInfo *sdkws.UserInfo) error {
opUserID := mcontext.GetOpUserID(ctx)
opIsAdmin := slices.Contains(g.config.Share.IMAdminUserID, opUserID)
groupInfos, err := g.getGroupInfos(ctx, groupIDs)
if err != nil {
return err
}
groupMemberMap, err := g.getGroupMemberMapForUser(ctx, groupIDs, changedUserInfo.UserID)
if err != nil {
return err
}
opMemberMap := groupMemberMap
if opUserID != changedUserInfo.UserID {
opMemberMap, err = g.getGroupMemberMapForUser(ctx, groupIDs, opUserID)
if err != nil {
return err
}
}
opPublicUser := &sdkws.PublicUserInfo{
UserID: changedUserInfo.GetUserID(),
Nickname: changedUserInfo.GetNickname(),
FaceURL: changedUserInfo.GetFaceURL(),
Ex: changedUserInfo.GetEx(),
}
if opUserID != changedUserInfo.UserID {
opPublicUser, err = g.getUser(ctx, opUserID)
if err != nil {
return err
}
}
for _, groupInfo := range groupInfos {
if groupMemberMap[groupInfo.GroupID] == nil {
continue
}
opUserGroupMemberFullInfo := datautil.If(opIsAdmin, &sdkws.GroupMemberFullInfo{
GroupID: groupInfo.GroupID,
UserID: opUserID,
RoleLevel: constant.GroupAdmin,
AppMangerLevel: constant.AppAdmin,
}, opMemberMap[groupInfo.GroupID])
if opUserGroupMemberFullInfo == nil {
opUserGroupMemberFullInfo = &sdkws.GroupMemberFullInfo{}
}
opUserGroupMemberFullInfo.GroupID = groupInfo.GroupID
tips := &sdkws.GroupMemberInfoSetTips{
Group: groupInfo,
OpUser: g.migrateFallbackToGroupMemberFullInfo(opUserGroupMemberFullInfo, opPublicUser),
ChangedUser: groupMemberMap[groupInfo.GroupID],
}
g.setSortVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID, &tips.GroupSortVersion)
g.Notification(ctx, opUserID, groupInfo.GroupID, constant.GroupMemberInfoSetNotification, tips)
}
return nil
}
func (g *NotificationSender) GroupMemberInfoSetNotification(ctx context.Context, groupID, groupMemberUserID string) {
var err error
defer func() {
@ -810,13 +987,11 @@ func (g *NotificationSender) GroupMemberInfoSetNotification(ctx context.Context,
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID)
group, err := g.getGroupInfo(ctx, groupID)
if err != nil {
return
}
var user map[string]*sdkws.GroupMemberFullInfo
user, err = g.getGroupMemberMap(ctx, groupID, []string{groupMemberUserID})
user, err := g.getGroupMemberMap(ctx, groupID, []string{groupMemberUserID})
if err != nil {
return
}
@ -835,8 +1010,7 @@ func (g *NotificationSender) GroupMemberSetToAdminNotification(ctx context.Conte
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID)
group, err := g.getGroupInfo(ctx, groupID)
if err != nil {
return
}
@ -859,13 +1033,11 @@ func (g *NotificationSender) GroupMemberSetToOrdinaryUserNotification(ctx contex
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID)
group, err := g.getGroupInfo(ctx, groupID)
if err != nil {
return
}
var user map[string]*sdkws.GroupMemberFullInfo
user, err = g.getGroupMemberMap(ctx, groupID, []string{mcontext.GetOpUserID(ctx), groupMemberUserID})
user, err := g.getGroupMemberMap(ctx, groupID, []string{mcontext.GetOpUserID(ctx), groupMemberUserID})
if err != nil {
return
}

@ -57,6 +57,7 @@ type GroupCache interface {
GetGroupRoleLevelMemberInfo(ctx context.Context, groupID string, roleLevel int32) ([]*model.GroupMember, error)
GetGroupRolesLevelMemberInfo(ctx context.Context, groupID string, roleLevels []int32) ([]*model.GroupMember, error)
GetGroupMemberNum(ctx context.Context, groupID string) (memberNum int64, err error)
GetGroupMemberNums(ctx context.Context, groupIDs []string) (memberNumMap map[string]int64, err error)
DelGroupsMemberNum(groupID ...string) GroupCache
//FindSortGroupMemberUserIDs(ctx context.Context, groupID string) ([]string, error)

@ -16,10 +16,13 @@ package redis
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
@ -286,6 +289,57 @@ func (g *GroupCacheRedis) GetGroupMemberNum(ctx context.Context, groupID string)
})
}
type groupMemberNumCache struct {
GroupID string `json:"group_id"`
MemberNum int64 `json:"member_num"`
}
type groupMemberNumBatchCache struct {
GroupID string
MemberNum int64
}
func (r *groupMemberNumBatchCache) BatchCache(groupID string) {
r.GroupID = groupID
}
func (r *groupMemberNumBatchCache) UnmarshalJSON(bytes []byte) error {
return json.Unmarshal(bytes, &r.MemberNum)
}
func (r *groupMemberNumBatchCache) MarshalJSON() ([]byte, error) {
return json.Marshal(r.MemberNum)
}
func (g *GroupCacheRedis) GetGroupMemberNums(ctx context.Context, groupIDs []string) (map[string]int64, error) {
items, err := batchGetCache2(
ctx,
g.rcClient,
g.expireTime,
groupIDs,
func(groupID string) string { return g.getGroupMemberNumKey(groupID) },
func(v *groupMemberNumBatchCache) string { return v.GroupID },
func(ctx context.Context, ids []string) ([]*groupMemberNumBatchCache, error) {
res := make([]*groupMemberNumBatchCache, 0, len(ids))
for _, groupID := range ids {
num, err := g.groupMemberDB.TakeGroupMemberNum(ctx, groupID)
if err != nil {
return nil, err
}
res = append(res, &groupMemberNumBatchCache{
GroupID: groupID,
MemberNum: num,
})
}
return res, nil
},
)
if err != nil {
return nil, err
}
return datautil.SliceToMapAny(items, func(item *groupMemberNumBatchCache) (string, int64) {
return item.GroupID, item.MemberNum
}), nil
}
func (g *GroupCacheRedis) DelGroupsMemberNum(groupID ...string) cache.GroupCache {
keys := make([]string, 0, len(groupID))
for _, groupID := range groupID {
@ -308,18 +362,111 @@ func (g *GroupCacheRedis) GetGroupOwner(ctx context.Context, groupID string) (*m
return members[0], nil
}
type groupRoleLevelMemberIDsBatchCache struct {
GroupID string
UserIDs []string
}
func (r *groupRoleLevelMemberIDsBatchCache) BatchCache(groupID string) {
r.GroupID = groupID
}
func (r *groupRoleLevelMemberIDsBatchCache) UnmarshalJSON(bytes []byte) (err error) {
return json.Unmarshal(bytes, &r.UserIDs)
}
func (r *groupRoleLevelMemberIDsBatchCache) MarshalJSON() ([]byte, error) {
return json.Marshal(r.UserIDs)
}
func (g *GroupCacheRedis) batchGetGroupRoleLevelMemberIDs(ctx context.Context, groupIDs []string, roleLevel int32) (map[string][]string, error) {
items, err := batchGetCache2(
ctx,
g.rcClient,
g.expireTime,
groupIDs,
func(groupID string) string {
return g.getGroupRoleLevelMemberIDsKey(groupID, roleLevel)
},
func(v *groupRoleLevelMemberIDsBatchCache) string {
return v.GroupID
},
func(ctx context.Context, ids []string) ([]*groupRoleLevelMemberIDsBatchCache, error) {
res := make([]*groupRoleLevelMemberIDsBatchCache, 0, len(ids))
for _, groupID := range ids {
userIDs, err := g.groupMemberDB.FindRoleLevelUserIDs(ctx, groupID, roleLevel)
if err != nil {
return nil, err
}
res = append(res, &groupRoleLevelMemberIDsBatchCache{
GroupID: groupID,
UserIDs: userIDs,
})
}
return res, nil
},
)
if err != nil {
return nil, err
}
return datautil.SliceToMapAny(items, func(item *groupRoleLevelMemberIDsBatchCache) (string, []string) {
return item.GroupID, item.UserIDs
}), nil
}
type groupUserIDPair struct {
GroupID string
UserID string
}
func (g *GroupCacheRedis) batchGetGroupMembersByPairs(ctx context.Context, ids []groupUserIDPair) ([]*model.GroupMember, error) {
return batchGetCache2(ctx, g.rcClient, g.expireTime, ids, func(id groupUserIDPair) string {
return g.getGroupMemberInfoKey(id.GroupID, id.UserID)
}, func(member *model.GroupMember) groupUserIDPair {
return groupUserIDPair{GroupID: member.GroupID, UserID: member.UserID}
}, func(ctx context.Context, ids []groupUserIDPair) ([]*model.GroupMember, error) {
groupIDsByUser := make(map[string][]string)
for _, id := range ids {
groupIDsByUser[id.UserID] = append(groupIDsByUser[id.UserID], id.GroupID)
}
members := make([]*model.GroupMember, 0, len(ids))
for userID, groupIDs := range groupIDsByUser {
items, err := g.groupMemberDB.FindInGroup(ctx, userID, groupIDs)
if err != nil {
return nil, err
}
members = append(members, items...)
}
return members, nil
})
}
func (g *GroupCacheRedis) GetGroupsOwner(ctx context.Context, groupIDs []string) ([]*model.GroupMember, error) {
members := make([]*model.GroupMember, 0, len(groupIDs))
ownerIDs, err := g.batchGetGroupRoleLevelMemberIDs(ctx, groupIDs, constant.GroupOwner)
if err != nil {
return nil, err
}
pairs := make([]groupUserIDPair, 0, len(groupIDs))
for _, groupID := range groupIDs {
items, err := g.GetGroupRoleLevelMemberInfo(ctx, groupID, constant.GroupOwner)
if err != nil {
return nil, err
ids := ownerIDs[groupID]
if len(ids) == 0 {
continue
}
if len(items) > 0 {
members = append(members, items[0])
pairs = append(pairs, groupUserIDPair{GroupID: groupID, UserID: ids[0]})
}
members, err := g.batchGetGroupMembersByPairs(ctx, pairs)
if err != nil {
return nil, err
}
memberMap := datautil.SliceToMapAny(members, func(member *model.GroupMember) (groupUserIDPair, *model.GroupMember) {
return groupUserIDPair{GroupID: member.GroupID, UserID: member.UserID}, member
})
result := make([]*model.GroupMember, 0, len(pairs))
for _, pair := range pairs {
if member, ok := memberMap[pair]; ok {
result = append(result, member)
}
}
return members, nil
return result, nil
}
func (g *GroupCacheRedis) GetGroupRoleLevelMemberIDs(ctx context.Context, groupID string, roleLevel int32) ([]string, error) {

@ -65,6 +65,8 @@ type GroupDatabase interface {
FindGroupMemberUserID(ctx context.Context, groupID string) ([]string, error)
// FindGroupMemberNum retrieves the number of members in a group.
FindGroupMemberNum(ctx context.Context, groupID string) (uint32, error)
// FindGroupMemberNums retrieves the number of members for multiple groups.
FindGroupMemberNums(ctx context.Context, groupIDs []string) (map[string]uint32, error)
// FindUserManagedGroupID retrieves group IDs managed by a user.
FindUserManagedGroupID(ctx context.Context, userID string) (groupIDs []string, err error)
// PageGroupRequest paginates through group requests for specified groups.
@ -233,6 +235,18 @@ func (g *groupDatabase) FindGroupMemberNum(ctx context.Context, groupID string)
return uint32(num), nil
}
func (g *groupDatabase) FindGroupMemberNums(ctx context.Context, groupIDs []string) (map[string]uint32, error) {
nums, err := g.cache.GetGroupMemberNums(ctx, groupIDs)
if err != nil {
return nil, err
}
result := make(map[string]uint32, len(nums))
for groupID, num := range nums {
result[groupID] = uint32(num)
}
return result, nil
}
func (g *groupDatabase) TakeGroup(ctx context.Context, groupID string) (*model.Group, error) {
return g.cache.GetGroupInfo(ctx, groupID)
}

Loading…
Cancel
Save