sync: hash code

pull/857/head
withchao 2 years ago
parent b97f168753
commit 6a50db86d1

@ -16,6 +16,9 @@ package group
import (
"context"
"crypto/md5"
"encoding/binary"
"encoding/json"
"fmt"
"math/big"
"math/rand"
@ -73,20 +76,32 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
userRpcClient := rpcclient.NewUserRpcClient(client)
msgRpcClient := rpcclient.NewMessageRpcClient(client)
conversationRpcClient := rpcclient.NewConversationRpcClient(client)
database := controller.InitGroupDatabase(db, rdb, mongo.GetDatabase())
pbGroup.RegisterGroupServer(server, &groupServer{
GroupDatabase: database,
User: userRpcClient,
Notification: notification.NewGroupNotificationSender(database, &msgRpcClient, &userRpcClient, func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) {
users, err := userRpcClient.GetUsersInfo(ctx, userIDs)
if err != nil {
return nil, err
}
return utils.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil
}),
conversationRpcClient: conversationRpcClient,
msgRpcClient: msgRpcClient,
var gs groupServer
database := controller.InitGroupDatabase(db, rdb, mongo.GetDatabase(), gs.groupMemberHashCode)
gs.GroupDatabase = database
gs.User = userRpcClient
gs.Notification = notification.NewGroupNotificationSender(database, &msgRpcClient, &userRpcClient, func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) {
users, err := userRpcClient.GetUsersInfo(ctx, userIDs)
if err != nil {
return nil, err
}
return utils.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil
})
gs.conversationRpcClient = conversationRpcClient
gs.msgRpcClient = msgRpcClient
//pbGroup.RegisterGroupServer(server, &groupServer{
// GroupDatabase: database,
// User: userRpcClient,
// Notification: notification.NewGroupNotificationSender(database, &msgRpcClient, &userRpcClient, func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) {
// users, err := userRpcClient.GetUsersInfo(ctx, userIDs)
// if err != nil {
// return nil, err
// }
// return utils.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil
// }),
// conversationRpcClient: conversationRpcClient,
// msgRpcClient: msgRpcClient,
//})
return nil
}
@ -1491,3 +1506,37 @@ func (s *groupServer) GetGroupUsersReqApplicationList(ctx context.Context, req *
resp.Total = total
return resp, nil
}
func (s *groupServer) groupMemberHashCode(ctx context.Context, groupID string) (uint64, error) {
userIDs, err := s.GroupDatabase.FindGroupMemberUserID(ctx, groupID)
if err != nil {
return 0, err
}
var members []*sdkws.GroupMemberFullInfo
if len(userIDs) > 0 {
resp, err := s.GetGroupMembersInfo(ctx, &pbGroup.GetGroupMembersInfoReq{GroupID: groupID, UserIDs: userIDs})
if err != nil {
return 0, err
}
members = resp.Members
utils.Sort(userIDs, true)
}
memberMap := utils.SliceToMap(members, func(e *sdkws.GroupMemberFullInfo) string {
return e.GroupID
})
res := make([]*sdkws.GroupMemberFullInfo, 0, len(members))
for _, userID := range userIDs {
member, ok := memberMap[userID]
if !ok {
continue
}
member.AppMangerLevel = 0
res = append(res, member)
}
data, err := json.Marshal(res)
if err != nil {
return 0, err
}
sum := md5.Sum(data)
return binary.BigEndian.Uint64(sum[:]), nil
}

@ -16,11 +16,7 @@ package cache
import (
"context"
"crypto/md5"
"encoding/binary"
"encoding/json"
"github.com/OpenIMSDK/tools/log"
"strconv"
"time"
"github.com/dtm-labs/rockscache"
@ -99,6 +95,7 @@ type GroupCacheRedis struct {
mongoDB unrelationTb.SuperGroupModelInterface
expireTime time.Duration
rcClient *rockscache.Client
hashCode func(ctx context.Context, groupID string) (uint64, error)
}
func NewGroupCacheRedis(
@ -107,13 +104,16 @@ func NewGroupCacheRedis(
groupMemberDB relationTb.GroupMemberModelInterface,
groupRequestDB relationTb.GroupRequestModelInterface,
mongoClient unrelationTb.SuperGroupModelInterface,
hashCode func(ctx context.Context, groupID string) (uint64, error),
opts rockscache.Options,
) GroupCache {
rcClient := rockscache.NewClient(rdb, opts)
return &GroupCacheRedis{
rcClient: rcClient, expireTime: groupExpireTime,
groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB,
mongoDB: mongoClient, metaCache: NewMetaCacheRedis(rcClient),
mongoDB: mongoClient,
hashCode: hashCode,
metaCache: NewMetaCacheRedis(rcClient),
}
}
@ -293,57 +293,61 @@ func (g *GroupCacheRedis) DelSuperGroupMemberIDs(groupIDs ...string) GroupCache
// groupMembersHash.
func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID string) (hashCode uint64, err error) {
return getCache(ctx, g.rcClient, g.getGroupMembersHashKey(groupID), g.expireTime,
func(ctx context.Context) (uint64, error) {
userIDs, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return 0, err
}
log.ZInfo(ctx, "GetGroupMembersHash", "groupID", groupID, "userIDs", userIDs)
var members []*relationTb.GroupMemberModel
if len(userIDs) > 0 {
members, err = g.GetGroupMembersInfo(ctx, groupID, userIDs)
if err != nil {
return 0, err
}
utils.Sort(userIDs, true)
}
memberMap := make(map[string]*relationTb.GroupMemberModel)
for i, member := range members {
memberMap[member.UserID] = members[i]
}
data := make([]string, 0, len(members)*11)
for _, userID := range userIDs {
member, ok := memberMap[userID]
if !ok {
continue
}
data = append(data,
member.GroupID,
member.UserID,
member.Nickname,
member.FaceURL,
strconv.Itoa(int(member.RoleLevel)),
strconv.FormatInt(member.JoinTime.UnixMilli(), 10),
strconv.Itoa(int(member.JoinSource)),
member.InviterUserID,
member.OperatorUserID,
strconv.FormatInt(member.MuteEndTime.UnixMilli(), 10),
member.Ex,
)
}
log.ZInfo(ctx, "hash data info", "userIDs.len", len(userIDs), "hash.data.len", len(data))
log.ZInfo(ctx, "json hash data", "groupID", groupID, "data", data)
val, err := json.Marshal(data)
if err != nil {
return 0, err
}
sum := md5.Sum(val)
code := binary.BigEndian.Uint64(sum[:])
log.ZInfo(ctx, "GetGroupMembersHash", "groupID", groupID, "hashCode", code, "num", len(members))
return code, nil
},
)
return getCache(ctx, g.rcClient, g.getGroupMembersHashKey(groupID), g.expireTime, func(ctx context.Context) (uint64, error) {
return g.hashCode(ctx, groupID)
})
//return getCache(ctx, g.rcClient, g.getGroupMembersHashKey(groupID), g.expireTime,
// func(ctx context.Context) (uint64, error) {
// userIDs, err := g.GetGroupMemberIDs(ctx, groupID)
// if err != nil {
// return 0, err
// }
// log.ZInfo(ctx, "GetGroupMembersHash", "groupID", groupID, "userIDs", userIDs)
// var members []*relationTb.GroupMemberModel
// if len(userIDs) > 0 {
// members, err = g.GetGroupMembersInfo(ctx, groupID, userIDs)
// if err != nil {
// return 0, err
// }
// utils.Sort(userIDs, true)
// }
// memberMap := make(map[string]*relationTb.GroupMemberModel)
// for i, member := range members {
// memberMap[member.UserID] = members[i]
// }
// data := make([]string, 0, len(members)*11)
// for _, userID := range userIDs {
// member, ok := memberMap[userID]
// if !ok {
// continue
// }
// data = append(data,
// member.GroupID,
// member.UserID,
// member.Nickname,
// member.FaceURL,
// strconv.Itoa(int(member.RoleLevel)),
// strconv.FormatInt(member.JoinTime.UnixMilli(), 10),
// strconv.Itoa(int(member.JoinSource)),
// member.InviterUserID,
// member.OperatorUserID,
// strconv.FormatInt(member.MuteEndTime.UnixMilli(), 10),
// member.Ex,
// )
// }
// log.ZInfo(ctx, "hash data info", "userIDs.len", len(userIDs), "hash.data.len", len(data))
// log.ZInfo(ctx, "json hash data", "groupID", groupID, "data", data)
// val, err := json.Marshal(data)
// if err != nil {
// return 0, err
// }
// sum := md5.Sum(val)
// code := binary.BigEndian.Uint64(sum[:])
// log.ZInfo(ctx, "GetGroupMembersHash", "groupID", groupID, "hashCode", code, "num", len(members))
// return code, nil
// },
//)
}
func (g *GroupCacheRedis) GetGroupMemberHashMap(

@ -153,7 +153,7 @@ func NewGroupDatabase(
return database
}
func InitGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, database *mongo.Database) GroupDatabase {
func InitGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, database *mongo.Database, hashCode func(ctx context.Context, groupID string) (uint64, error)) GroupDatabase {
rcOptions := rockscache.NewDefaultOptions()
rcOptions.StrongConsistency = true
rcOptions.RandomExpireAdjustment = 0.2
@ -170,6 +170,7 @@ func InitGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, database *mongo.D
relation.NewGroupMemberDB(db),
relation.NewGroupRequest(db),
unrelation.NewSuperGroupMongoDriver(database),
hashCode,
rcOptions,
),
)
@ -315,7 +316,7 @@ func (g *groupDatabase) FindGroupMember(
userIDs []string,
roleLevels []int32,
) (totalGroupMembers []*relationTb.GroupMemberModel, err error) {
if roleLevels == nil {
if len(roleLevels) == 0 {
for _, groupID := range groupIDs {
groupMembers, err := g.cache.GetGroupMembersInfo(ctx, groupID, userIDs)
if err != nil {

Loading…
Cancel
Save