You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Open-IM-Server/pkg/common/db/cache/group.go

423 lines
14 KiB

2 years ago
package cache
import (
2 years ago
"Open_IM/pkg/common/constant"
2 years ago
"Open_IM/pkg/common/db/relation"
2 years ago
relationTb "Open_IM/pkg/common/db/table/relation"
2 years ago
"Open_IM/pkg/common/db/unrelation"
2 years ago
"Open_IM/pkg/common/tracelog"
2 years ago
"Open_IM/pkg/utils"
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
"github.com/go-redis/redis/v8"
2 years ago
"math/big"
"sort"
"strconv"
2 years ago
"time"
)
2 years ago
const (
groupExpireTime = time.Second * 60 * 60 * 12
groupInfoKey = "GROUP_INFO:"
groupMemberIDsKey = "GROUP_MEMBER_IDS:"
groupMembersHashKey = "GROUP_MEMBERS_HASH:"
groupMemberInfoKey = "GROUP_MEMBER_INFO:"
joinedSuperGroupsKey = "JOIN_SUPER_GROUPS:"
joinedGroupsKey = "JOIN_GROUPS_KEY:"
groupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:"
)
2 years ago
2 years ago
type GroupCache interface {
GetGroupsInfo(ctx context.Context, groupIDs []string, fn func(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error)) (groups []*relationTb.GroupModel, err error)
DelGroupsInfo(ctx context.Context, groupID string) (err error)
GetGroupInfo(ctx context.Context, groupID string, fn func(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error)) (group *relationTb.GroupModel, err error)
DelGroupInfo(ctx context.Context, groupID string) (err error)
2 years ago
BatchDelJoinedSuperGroupIDs(ctx context.Context, userIDs []string, fn func(ctx context.Context, userIDs []string) error) (err error)
GetJoinedSuperGroupIDs(ctx context.Context, userID string, fn func(ctx context.Context, userID string) (joinedSuperGroupIDs []string, err error)) (joinedSuperGroupIDs []string, err error)
2 years ago
DelJoinedSuperGroupIDs(ctx context.Context, userID string) (err error)
2 years ago
GetGroupMembersHash(ctx context.Context, groupID string, fn func(ctx context.Context, groupID string) (hashCodeUint64 uint64, err error)) (hashCodeUint64 uint64, err error)
DelGroupMembersHash(ctx context.Context, groupID string) (err error)
GetGroupMemberIDs(ctx context.Context, groupID string, fn func(ctx context.Context, groupID string) (groupMemberIDs []string, err error)) (groupMemberIDs []string, err error)
2 years ago
DelGroupMemberIDs(ctx context.Context, groupID string) error
2 years ago
GetJoinedGroupIDs(ctx context.Context, userID string, fn func(ctx context.Context, userID string) (joinedGroupIDs []string, err error)) (joinedGroupIDs []string, err error)
DelJoinedGroupIDs(ctx context.Context, userID string) (err error)
GetGroupMemberInfo(ctx context.Context, groupID, userID string, fn func(ctx context.Context, groupID, userID string) (groupMember *relationTb.GroupMemberModel, err error)) (groupMember *relationTb.GroupMemberModel, err error)
DelGroupMemberInfo(ctx context.Context, groupID, userID string) (err error)
GetGroupMemberNum(ctx context.Context, groupID string, fn func(ctx context.Context, groupID string) (num int, err error)) (num int, err error)
DelGroupMemberNum(ctx context.Context, groupID string) (err error)
2 years ago
}
type GroupCacheRedis struct {
2 years ago
group *relation.GroupGorm
groupMember *relation.GroupMemberGorm
groupRequest *relation.GroupRequestGorm
2 years ago
mongoDB *unrelation.SuperGroupMongoDriver
2 years ago
expireTime time.Duration
redisClient *RedisClient
rcClient *rockscache.Client
2 years ago
}
2 years ago
func NewGroupCacheRedis(rdb redis.UniversalClient, groupDB *relation.GroupGorm, groupMemberDB *relation.GroupMemberGorm, groupRequestDB *relation.GroupRequestGorm, mongoClient *unrelation.SuperGroupMongoDriver, opts rockscache.Options) *GroupCacheRedis {
return &GroupCacheRedis{rcClient: rockscache.NewClient(rdb, opts), expireTime: groupExpireTime,
2 years ago
group: groupDB, groupMember: groupMemberDB, groupRequest: groupRequestDB, redisClient: NewRedisClient(rdb),
2 years ago
mongoDB: mongoClient,
2 years ago
}
2 years ago
}
2 years ago
func (g *GroupCacheRedis) getRedisClient() *RedisClient {
2 years ago
return g.redisClient
2 years ago
}
2 years ago
func (g *GroupCacheRedis) getGroupInfoKey(groupID string) string {
2 years ago
return groupInfoKey + groupID
}
2 years ago
func (g *GroupCacheRedis) getJoinedSuperGroupsIDKey(userID string) string {
2 years ago
return joinedSuperGroupsKey + userID
}
2 years ago
func (g *GroupCacheRedis) getJoinedGroupsKey(userID string) string {
2 years ago
return joinedGroupsKey + userID
}
2 years ago
func (g *GroupCacheRedis) getGroupMembersHashKey(groupID string) string {
2 years ago
return groupMembersHashKey + groupID
}
2 years ago
func (g *GroupCacheRedis) getGroupMemberIDsKey(groupID string) string {
2 years ago
return groupMemberIDsKey + groupID
}
2 years ago
func (g *GroupCacheRedis) getGroupMemberInfoKey(groupID, userID string) string {
2 years ago
return groupMemberInfoKey + groupID + "-" + userID
}
2 years ago
func (g *GroupCacheRedis) getGroupMemberNumKey(groupID string) string {
2 years ago
return groupMemberNumKey + groupID
}
2 years ago
// / groupInfo
2 years ago
func (g *GroupCacheRedis) GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relation.Group, err error) {
2 years ago
for _, groupID := range groupIDs {
2 years ago
group, err := g.GetGroupInfo(ctx, groupID)
2 years ago
if err != nil {
return nil, err
}
groups = append(groups, group)
}
return groups, nil
}
2 years ago
func (g *GroupCacheRedis) GetGroupInfo(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) {
2 years ago
getGroup := func() (string, error) {
2 years ago
groupInfo, err := g.group.Take(ctx, groupID)
2 years ago
if err != nil {
return "", utils.Wrap(err, "")
}
bytes, err := json.Marshal(groupInfo)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
2 years ago
group = &relationTb.GroupModel{}
2 years ago
defer func() {
2 years ago
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "group", *group)
2 years ago
}()
2 years ago
groupStr, err := g.rcClient.Fetch(g.getGroupInfoKey(groupID), g.expireTime, getGroup)
2 years ago
if err != nil {
2 years ago
return nil, err
2 years ago
}
err = json.Unmarshal([]byte(groupStr), group)
return group, utils.Wrap(err, "")
}
2 years ago
func (g *GroupCacheRedis) DelGroupInfo(ctx context.Context, groupID string) (err error) {
2 years ago
defer func() {
2 years ago
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
2 years ago
}()
2 years ago
return g.rcClient.TagAsDeleted(g.getGroupInfoKey(groupID))
2 years ago
}
2 years ago
func (g *GroupCacheRedis) DelGroupsInfo(ctx context.Context, groupIDs []string) error {
2 years ago
for _, groupID := range groupIDs {
2 years ago
if err := g.DelGroupInfo(ctx, groupID); err != nil {
2 years ago
return err
}
}
return nil
2 years ago
}
2 years ago
// userJoinSuperGroup
2 years ago
func (g *GroupCacheRedis) BatchDelJoinedSuperGroupIDs(ctx context.Context, userIDs []string) (err error) {
2 years ago
for _, userID := range userIDs {
2 years ago
if err := g.DelJoinedSuperGroupIDs(ctx, userID); err != nil {
2 years ago
return err
}
}
2 years ago
return nil
}
2 years ago
func (g *GroupCacheRedis) DelJoinedSuperGroupIDs(ctx context.Context, userID string) (err error) {
2 years ago
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
}()
return g.rcClient.TagAsDeleted(g.getJoinedSuperGroupsIDKey(userID))
}
2 years ago
func (g *GroupCacheRedis) GetJoinedSuperGroupIDs(ctx context.Context, userID string) (joinedSuperGroupIDs []string, err error) {
2 years ago
getJoinedSuperGroupIDList := func() (string, error) {
userToSuperGroup, err := g.mongoDB.GetSuperGroupByUserID(ctx, userID)
if err != nil {
return "", err
}
bytes, err := json.Marshal(userToSuperGroup.GroupIDList)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "joinedSuperGroupIDs", joinedSuperGroupIDs)
}()
joinedSuperGroupListStr, err := g.rcClient.Fetch(g.getJoinedSuperGroupsIDKey(userID), time.Second*30*60, getJoinedSuperGroupIDList)
if err != nil {
return nil, err
}
err = json.Unmarshal([]byte(joinedSuperGroupListStr), &joinedSuperGroupIDs)
return joinedSuperGroupIDs, utils.Wrap(err, "")
}
// groupMembersHash
2 years ago
func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID string) (hashCodeUint64 uint64, err error) {
2 years ago
generateHash := func() (string, error) {
groupInfo, err := g.GetGroupInfo(ctx, groupID)
if err != nil {
return "", err
}
if groupInfo.Status == constant.GroupStatusDismissed {
return "0", nil
}
groupMemberIDList, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return "", err
}
sort.Strings(groupMemberIDList)
var all string
for _, v := range groupMemberIDList {
all += v
}
bi := big.NewInt(0)
bi.SetString(utils.Md5(all)[0:8], 16)
return strconv.Itoa(int(bi.Uint64())), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "hashCodeUint64", hashCodeUint64)
}()
hashCodeStr, err := g.rcClient.Fetch(g.getGroupMembersHashKey(groupID), time.Second*30*60, generateHash)
if err != nil {
return 0, utils.Wrap(err, "fetch failed")
}
hashCode, err := strconv.Atoi(hashCodeStr)
return uint64(hashCode), err
}
2 years ago
func (g *GroupCacheRedis) DelGroupMembersHash(ctx context.Context, groupID string) (err error) {
2 years ago
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
}()
return g.rcClient.TagAsDeleted(g.getGroupMembersHashKey(groupID))
}
// groupMemberIDs
// from redis
2 years ago
func (g *GroupCacheRedis) GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) {
2 years ago
f := func() (string, error) {
groupInfo, err := g.GetGroupInfo(ctx, groupID)
if err != nil {
return "", err
}
var groupMemberIDList []string
if groupInfo.GroupType == constant.SuperGroup {
superGroup, err := g.mongoDB.GetSuperGroup(ctx, groupID)
if err != nil {
return "", err
}
groupMemberIDList = superGroup.MemberIDList
} else {
groupMemberIDList, err = relation.GetGroupMemberIDListByGroupID(groupID)
if err != nil {
return "", err
}
}
bytes, err := json.Marshal(groupMemberIDList)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupMemberIDList", groupMemberIDs)
}()
groupIDListStr, err := g.rcClient.Fetch(g.getGroupMemberIDsKey(groupID), time.Second*30*60, f)
if err != nil {
return nil, err
}
err = json.Unmarshal([]byte(groupIDListStr), &groupMemberIDs)
return groupMemberIDs, nil
2 years ago
}
2 years ago
func (g *GroupCacheRedis) DelGroupMemberIDs(ctx context.Context, groupID string) (err error) {
2 years ago
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
}()
return g.rcClient.TagAsDeleted(g.getGroupMemberIDsKey(groupID))
}
// JoinedGroups
2 years ago
func (g *GroupCacheRedis) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) {
2 years ago
getJoinedGroupIDList := func() (string, error) {
joinedGroupList, err := relation.GetJoinedGroupIDListByUserID(userID)
if err != nil {
return "", err
}
bytes, err := json.Marshal(joinedGroupList)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "joinedGroupIDs", joinedGroupIDs)
}()
joinedGroupIDListStr, err := g.rcClient.Fetch(g.getJoinedGroupsKey(userID), time.Second*30*60, getJoinedGroupIDList)
if err != nil {
return nil, err
}
err = json.Unmarshal([]byte(joinedGroupIDListStr), &joinedGroupIDs)
return joinedGroupIDs, utils.Wrap(err, "")
}
2 years ago
func (g *GroupCacheRedis) DelJoinedGroupIDs(ctx context.Context, userID string) (err error) {
2 years ago
defer func() {
2 years ago
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
2 years ago
}()
2 years ago
return g.rcClient.TagAsDeleted(g.getJoinedGroupsKey(userID))
}
// GetGroupMemberInfo
2 years ago
func (g *GroupCacheRedis) GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relation.GroupMember, err error) {
2 years ago
getGroupMemberInfo := func() (string, error) {
groupMemberInfo, err := relation.GetGroupMemberInfoByGroupIDAndUserID(groupID, userID)
if err != nil {
return "", err
}
bytes, err := json.Marshal(groupMemberInfo)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "userID", userID, "groupMember", *groupMember)
}()
groupMemberInfoStr, err := g.rcClient.Fetch(g.getGroupMemberInfoKey(groupID, userID), time.Second*30*60, getGroupMemberInfo)
if err != nil {
return nil, err
}
groupMember = &relation.GroupMember{}
err = json.Unmarshal([]byte(groupMemberInfoStr), groupMember)
return groupMember, utils.Wrap(err, "")
}
2 years ago
func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, groupID, userIDs []string) (groupMember *relationTb.GroupMemberModel, err error) {
return nil, err
}
2 years ago
func (g *GroupCacheRedis) GetGroupMembersInfo(ctx context.Context, count, offset int32, groupID string) (groupMembers []*relation.GroupMember, err error) {
2 years ago
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "count", count, "offset", offset, "groupID", groupID, "groupMember", groupMembers)
}()
groupMemberIDList, err := g.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return nil, err
}
if count < 0 || offset < 0 {
return nil, nil
}
var groupMemberList []*relation.GroupMember
var start, stop int32
start = offset
stop = offset + count
l := int32(len(groupMemberIDList))
if start > stop {
return nil, nil
}
if start >= l {
return nil, nil
}
if count != 0 {
if stop >= l {
stop = l
}
groupMemberIDList = groupMemberIDList[start:stop]
} else {
if l < 1000 {
stop = l
} else {
stop = 1000
}
groupMemberIDList = groupMemberIDList[start:stop]
}
for _, userID := range groupMemberIDList {
groupMember, err := g.GetGroupMemberInfo(ctx, groupID, userID)
if err != nil {
return
}
groupMembers = append(groupMembers, groupMember)
}
return groupMemberList, nil
}
2 years ago
func (g *GroupCacheRedis) DelGroupMemberInfo(ctx context.Context, groupID, userID string) (err error) {
2 years ago
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "userID", userID)
}()
return g.rcClient.TagAsDeleted(g.getGroupMemberInfoKey(groupID, userID))
}
// groupMemberNum
2 years ago
func (g *GroupCacheRedis) GetGroupMemberNum(ctx context.Context, groupID string) (num int, err error) {
2 years ago
getGroupMemberNum := func() (string, error) {
num, err := relation.GetGroupMemberNumByGroupID(groupID)
if err != nil {
return "", err
}
return strconv.Itoa(int(num)), nil
}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "num", num)
}()
groupMember, err := g.rcClient.Fetch(g.getGroupMemberNumKey(groupID), time.Second*30*60, getGroupMemberNum)
if err != nil {
return 0, err
}
return strconv.Atoi(groupMember)
}
2 years ago
func (g *GroupCacheRedis) DelGroupMemberNum(ctx context.Context, groupID string) (err error) {
2 years ago
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
}()
return g.rcClient.TagAsDeleted(g.getGroupMemberNumKey(groupID))
2 years ago
}