diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 282d41011..91fa1ba84 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -16,6 +16,9 @@ package group import ( "context" + "crypto/md5" + "encoding/binary" + "encoding/json" "fmt" "math/big" "math/rand" @@ -73,20 +76,32 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e userRpcClient := rpcclient.NewUserRpcClient(client) msgRpcClient := rpcclient.NewMessageRpcClient(client) conversationRpcClient := rpcclient.NewConversationRpcClient(client) - database := controller.InitGroupDatabase(db, rdb, mongo.GetDatabase()) - pbGroup.RegisterGroupServer(server, &groupServer{ - GroupDatabase: database, - User: userRpcClient, - Notification: notification.NewGroupNotificationSender(database, &msgRpcClient, &userRpcClient, func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) { - users, err := userRpcClient.GetUsersInfo(ctx, userIDs) - if err != nil { - return nil, err - } - return utils.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil - }), - conversationRpcClient: conversationRpcClient, - msgRpcClient: msgRpcClient, + var gs groupServer + database := controller.InitGroupDatabase(db, rdb, mongo.GetDatabase(), gs.groupMemberHashCode) + gs.GroupDatabase = database + gs.User = userRpcClient + gs.Notification = notification.NewGroupNotificationSender(database, &msgRpcClient, &userRpcClient, func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) { + users, err := userRpcClient.GetUsersInfo(ctx, userIDs) + if err != nil { + return nil, err + } + return utils.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil }) + gs.conversationRpcClient = conversationRpcClient + gs.msgRpcClient = msgRpcClient + //pbGroup.RegisterGroupServer(server, &groupServer{ + // GroupDatabase: database, + // User: userRpcClient, + // Notification: notification.NewGroupNotificationSender(database, &msgRpcClient, &userRpcClient, func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) { + // users, err := userRpcClient.GetUsersInfo(ctx, userIDs) + // if err != nil { + // return nil, err + // } + // return utils.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil + // }), + // conversationRpcClient: conversationRpcClient, + // msgRpcClient: msgRpcClient, + //}) return nil } @@ -1491,3 +1506,37 @@ func (s *groupServer) GetGroupUsersReqApplicationList(ctx context.Context, req * resp.Total = total return resp, nil } + +func (s *groupServer) groupMemberHashCode(ctx context.Context, groupID string) (uint64, error) { + userIDs, err := s.GroupDatabase.FindGroupMemberUserID(ctx, groupID) + if err != nil { + return 0, err + } + var members []*sdkws.GroupMemberFullInfo + if len(userIDs) > 0 { + resp, err := s.GetGroupMembersInfo(ctx, &pbGroup.GetGroupMembersInfoReq{GroupID: groupID, UserIDs: userIDs}) + if err != nil { + return 0, err + } + members = resp.Members + utils.Sort(userIDs, true) + } + memberMap := utils.SliceToMap(members, func(e *sdkws.GroupMemberFullInfo) string { + return e.GroupID + }) + res := make([]*sdkws.GroupMemberFullInfo, 0, len(members)) + for _, userID := range userIDs { + member, ok := memberMap[userID] + if !ok { + continue + } + member.AppMangerLevel = 0 + res = append(res, member) + } + data, err := json.Marshal(res) + if err != nil { + return 0, err + } + sum := md5.Sum(data) + return binary.BigEndian.Uint64(sum[:]), nil +} diff --git a/pkg/common/db/cache/group.go b/pkg/common/db/cache/group.go index d63f92c40..1a2a7d698 100644 --- a/pkg/common/db/cache/group.go +++ b/pkg/common/db/cache/group.go @@ -16,11 +16,7 @@ package cache import ( "context" - "crypto/md5" - "encoding/binary" - "encoding/json" "github.com/OpenIMSDK/tools/log" - "strconv" "time" "github.com/dtm-labs/rockscache" @@ -99,6 +95,7 @@ type GroupCacheRedis struct { mongoDB unrelationTb.SuperGroupModelInterface expireTime time.Duration rcClient *rockscache.Client + hashCode func(ctx context.Context, groupID string) (uint64, error) } func NewGroupCacheRedis( @@ -107,13 +104,16 @@ func NewGroupCacheRedis( groupMemberDB relationTb.GroupMemberModelInterface, groupRequestDB relationTb.GroupRequestModelInterface, mongoClient unrelationTb.SuperGroupModelInterface, + hashCode func(ctx context.Context, groupID string) (uint64, error), opts rockscache.Options, ) GroupCache { rcClient := rockscache.NewClient(rdb, opts) return &GroupCacheRedis{ rcClient: rcClient, expireTime: groupExpireTime, groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB, - mongoDB: mongoClient, metaCache: NewMetaCacheRedis(rcClient), + mongoDB: mongoClient, + hashCode: hashCode, + metaCache: NewMetaCacheRedis(rcClient), } } @@ -293,57 +293,61 @@ func (g *GroupCacheRedis) DelSuperGroupMemberIDs(groupIDs ...string) GroupCache // groupMembersHash. func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID string) (hashCode uint64, err error) { - return getCache(ctx, g.rcClient, g.getGroupMembersHashKey(groupID), g.expireTime, - func(ctx context.Context) (uint64, error) { - userIDs, err := g.GetGroupMemberIDs(ctx, groupID) - if err != nil { - return 0, err - } - log.ZInfo(ctx, "GetGroupMembersHash", "groupID", groupID, "userIDs", userIDs) - var members []*relationTb.GroupMemberModel - if len(userIDs) > 0 { - members, err = g.GetGroupMembersInfo(ctx, groupID, userIDs) - if err != nil { - return 0, err - } - utils.Sort(userIDs, true) - } - memberMap := make(map[string]*relationTb.GroupMemberModel) - for i, member := range members { - memberMap[member.UserID] = members[i] - } - data := make([]string, 0, len(members)*11) - for _, userID := range userIDs { - member, ok := memberMap[userID] - if !ok { - continue - } - data = append(data, - member.GroupID, - member.UserID, - member.Nickname, - member.FaceURL, - strconv.Itoa(int(member.RoleLevel)), - strconv.FormatInt(member.JoinTime.UnixMilli(), 10), - strconv.Itoa(int(member.JoinSource)), - member.InviterUserID, - member.OperatorUserID, - strconv.FormatInt(member.MuteEndTime.UnixMilli(), 10), - member.Ex, - ) - } - log.ZInfo(ctx, "hash data info", "userIDs.len", len(userIDs), "hash.data.len", len(data)) - log.ZInfo(ctx, "json hash data", "groupID", groupID, "data", data) - val, err := json.Marshal(data) - if err != nil { - return 0, err - } - sum := md5.Sum(val) - code := binary.BigEndian.Uint64(sum[:]) - log.ZInfo(ctx, "GetGroupMembersHash", "groupID", groupID, "hashCode", code, "num", len(members)) - return code, nil - }, - ) + return getCache(ctx, g.rcClient, g.getGroupMembersHashKey(groupID), g.expireTime, func(ctx context.Context) (uint64, error) { + return g.hashCode(ctx, groupID) + }) + + //return getCache(ctx, g.rcClient, g.getGroupMembersHashKey(groupID), g.expireTime, + // func(ctx context.Context) (uint64, error) { + // userIDs, err := g.GetGroupMemberIDs(ctx, groupID) + // if err != nil { + // return 0, err + // } + // log.ZInfo(ctx, "GetGroupMembersHash", "groupID", groupID, "userIDs", userIDs) + // var members []*relationTb.GroupMemberModel + // if len(userIDs) > 0 { + // members, err = g.GetGroupMembersInfo(ctx, groupID, userIDs) + // if err != nil { + // return 0, err + // } + // utils.Sort(userIDs, true) + // } + // memberMap := make(map[string]*relationTb.GroupMemberModel) + // for i, member := range members { + // memberMap[member.UserID] = members[i] + // } + // data := make([]string, 0, len(members)*11) + // for _, userID := range userIDs { + // member, ok := memberMap[userID] + // if !ok { + // continue + // } + // data = append(data, + // member.GroupID, + // member.UserID, + // member.Nickname, + // member.FaceURL, + // strconv.Itoa(int(member.RoleLevel)), + // strconv.FormatInt(member.JoinTime.UnixMilli(), 10), + // strconv.Itoa(int(member.JoinSource)), + // member.InviterUserID, + // member.OperatorUserID, + // strconv.FormatInt(member.MuteEndTime.UnixMilli(), 10), + // member.Ex, + // ) + // } + // log.ZInfo(ctx, "hash data info", "userIDs.len", len(userIDs), "hash.data.len", len(data)) + // log.ZInfo(ctx, "json hash data", "groupID", groupID, "data", data) + // val, err := json.Marshal(data) + // if err != nil { + // return 0, err + // } + // sum := md5.Sum(val) + // code := binary.BigEndian.Uint64(sum[:]) + // log.ZInfo(ctx, "GetGroupMembersHash", "groupID", groupID, "hashCode", code, "num", len(members)) + // return code, nil + // }, + //) } func (g *GroupCacheRedis) GetGroupMemberHashMap( diff --git a/pkg/common/db/controller/group.go b/pkg/common/db/controller/group.go index 677916bb8..360b68c57 100644 --- a/pkg/common/db/controller/group.go +++ b/pkg/common/db/controller/group.go @@ -153,7 +153,7 @@ func NewGroupDatabase( return database } -func InitGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, database *mongo.Database) GroupDatabase { +func InitGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, database *mongo.Database, hashCode func(ctx context.Context, groupID string) (uint64, error)) GroupDatabase { rcOptions := rockscache.NewDefaultOptions() rcOptions.StrongConsistency = true rcOptions.RandomExpireAdjustment = 0.2 @@ -170,6 +170,7 @@ func InitGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, database *mongo.D relation.NewGroupMemberDB(db), relation.NewGroupRequest(db), unrelation.NewSuperGroupMongoDriver(database), + hashCode, rcOptions, ), ) @@ -315,7 +316,7 @@ func (g *groupDatabase) FindGroupMember( userIDs []string, roleLevels []int32, ) (totalGroupMembers []*relationTb.GroupMemberModel, err error) { - if roleLevels == nil { + if len(roleLevels) == 0 { for _, groupID := range groupIDs { groupMembers, err := g.cache.GetGroupMembersInfo(ctx, groupID, userIDs) if err != nil {