|
|
|
package cache
|
|
|
|
|
|
|
|
import (
|
|
|
|
"Open_IM/pkg/common/constant"
|
|
|
|
"Open_IM/pkg/common/db/relation"
|
|
|
|
"Open_IM/pkg/common/db/unrelation"
|
|
|
|
"Open_IM/pkg/common/tracelog"
|
|
|
|
"Open_IM/pkg/utils"
|
|
|
|
"context"
|
|
|
|
"encoding/json"
|
|
|
|
"github.com/dtm-labs/rockscache"
|
|
|
|
"github.com/go-redis/redis/v8"
|
|
|
|
"math/big"
|
|
|
|
"sort"
|
|
|
|
"strconv"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
groupExpireTime = time.Second * 60 * 60 * 12
|
|
|
|
groupInfoKey = "GROUP_INFO:"
|
|
|
|
groupMemberIDsKey = "GROUP_MEMBER_IDS:"
|
|
|
|
groupMembersHashKey = "GROUP_MEMBERS_HASH:"
|
|
|
|
groupMemberInfoKey = "GROUP_MEMBER_INFO:"
|
|
|
|
joinedSuperGroupsKey = "JOIN_SUPER_GROUPS:"
|
|
|
|
joinedGroupsKey = "JOIN_GROUPS_KEY:"
|
|
|
|
groupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:"
|
|
|
|
)
|
|
|
|
|
|
|
|
type GroupCache struct {
|
|
|
|
group *relation.Group
|
|
|
|
groupMember *relation.GroupMember
|
|
|
|
groupRequest *relation.GroupRequest
|
|
|
|
mongoDB *unrelation.SuperGroupMgoDB
|
|
|
|
expireTime time.Duration
|
|
|
|
redisClient *RedisClient
|
|
|
|
rcClient *rockscache.Client
|
|
|
|
|
|
|
|
//local cache
|
|
|
|
cacheGroupMtx sync.RWMutex
|
|
|
|
cacheGroupMemberUserIDs map[string]*GroupMemberIDsHash
|
|
|
|
}
|
|
|
|
|
|
|
|
type GroupMemberIDsHash struct {
|
|
|
|
MemberListHash uint64
|
|
|
|
UserIDs []string
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewGroupCache(rdb redis.UniversalClient, groupDB *relation.Group, groupMemberDB *relation.GroupMember, groupRequestDB *relation.GroupRequest, mongoClient *unrelation.SuperGroupMgoDB, opts rockscache.Options) *GroupCache {
|
|
|
|
return &GroupCache{rcClient: rockscache.NewClient(rdb, opts), expireTime: groupExpireTime,
|
|
|
|
group: groupDB, groupMember: groupMemberDB, groupRequest: groupRequestDB, redisClient: NewRedisClient(rdb),
|
|
|
|
mongoDB: mongoClient, cacheGroupMemberUserIDs: make(map[string]*GroupMemberIDsHash, 0),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *GroupCache) getRedisClient() *RedisClient {
|
|
|
|
return g.redisClient
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *GroupCache) getGroupInfoKey(groupID string) string {
|
|
|
|
return groupInfoKey + groupID
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *GroupCache) getJoinedSuperGroupsIDKey(userID string) string {
|
|
|
|
return joinedSuperGroupsKey + userID
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *GroupCache) getJoinedGroupsKey(userID string) string {
|
|
|
|
return joinedGroupsKey + userID
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *GroupCache) getGroupMembersHashKey(groupID string) string {
|
|
|
|
return groupMembersHashKey + groupID
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *GroupCache) getGroupMemberIDsKey(groupID string) string {
|
|
|
|
return groupMemberIDsKey + groupID
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *GroupCache) getGroupMemberInfoKey(groupID, userID string) string {
|
|
|
|
return groupMemberInfoKey + groupID + "-" + userID
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *GroupCache) getGroupMemberNumKey(groupID string) string {
|
|
|
|
return groupMemberNumKey + groupID
|
|
|
|
}
|
|
|
|
|
|
|
|
/// groupInfo
|
|
|
|
func (g *GroupCache) GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relation.Group, err error) {
|
|
|
|
for _, groupID := range groupIDs {
|
|
|
|
group, err := g.GetGroupInfo(ctx, groupID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
groups = append(groups, group)
|
|
|
|
}
|
|
|
|
return groups, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *GroupCache) GetGroupInfo(ctx context.Context, groupID string) (group *relation.Group, err error) {
|
|
|
|
getGroup := func() (string, error) {
|
|
|
|
groupInfo, err := g.group.Take(ctx, groupID)
|
|
|
|
if err != nil {
|
|
|
|
return "", utils.Wrap(err, "")
|
|
|
|
}
|
|
|
|
bytes, err := json.Marshal(groupInfo)
|
|
|
|
if err != nil {
|
|
|
|
return "", utils.Wrap(err, "")
|
|
|
|
}
|
|
|
|
return string(bytes), nil
|
|
|
|
}
|
|
|
|
group = &relation.Group{}
|
|
|
|
defer func() {
|
|
|
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "group", *group)
|
|
|
|
}()
|
|
|
|
groupStr, err := g.rcClient.Fetch(g.getGroupInfoKey(groupID), g.expireTime, getGroup)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
err = json.Unmarshal([]byte(groupStr), group)
|
|
|
|
return group, utils.Wrap(err, "")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *GroupCache) DelGroupInfo(ctx context.Context, groupID string) (err error) {
|
|
|
|
defer func() {
|
|
|
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
|
|
|
|
}()
|
|
|
|
return g.rcClient.TagAsDeleted(g.getGroupInfoKey(groupID))
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *GroupCache) DelGroupsInfo(ctx context.Context, groupIDs []string) error {
|
|
|
|
for _, groupID := range groupIDs {
|
|
|
|
if err := g.DelGroupInfo(ctx, groupID); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// userJoinSuperGroup
|
|
|
|
func (g *GroupCache) BatchDelJoinedSuperGroupIDs(ctx context.Context, userIDs []string) (err error) {
|
|
|
|
for _, userID := range userIDs {
|
|
|
|
if err := g.DelJoinedSuperGroupIDs(ctx, userID); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *GroupCache) DelJoinedSuperGroupIDs(ctx context.Context, userID string) (err error) {
|
|
|
|
defer func() {
|
|
|
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
|
|
|
|
}()
|
|
|
|
return g.rcClient.TagAsDeleted(g.getJoinedSuperGroupsIDKey(userID))
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *GroupCache) GetJoinedSuperGroupIDs(ctx context.Context, userID string) (joinedSuperGroupIDs []string, err error) {
|
|
|
|
getJoinedSuperGroupIDList := func() (string, error) {
|
|
|
|
userToSuperGroup, err := g.mongoDB.GetSuperGroupByUserID(ctx, userID)
|
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
bytes, err := json.Marshal(userToSuperGroup.GroupIDList)
|
|
|
|
if err != nil {
|
|
|
|
return "", utils.Wrap(err, "")
|
|
|
|
}
|
|
|
|
return string(bytes), nil
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "joinedSuperGroupIDs", joinedSuperGroupIDs)
|
|
|
|
}()
|
|
|
|
joinedSuperGroupListStr, err := g.rcClient.Fetch(g.getJoinedSuperGroupsIDKey(userID), time.Second*30*60, getJoinedSuperGroupIDList)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
err = json.Unmarshal([]byte(joinedSuperGroupListStr), &joinedSuperGroupIDs)
|
|
|
|
return joinedSuperGroupIDs, utils.Wrap(err, "")
|
|
|
|
}
|
|
|
|
|
|
|
|
// groupMembersHash
|
|
|
|
func (g *GroupCache) GetGroupMembersHash(ctx context.Context, groupID string) (hashCodeUint64 uint64, err error) {
|
|
|
|
generateHash := func() (string, error) {
|
|
|
|
groupInfo, err := g.GetGroupInfo(ctx, groupID)
|
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
if groupInfo.Status == constant.GroupStatusDismissed {
|
|
|
|
return "0", nil
|
|
|
|
}
|
|
|
|
groupMemberIDList, err := g.GetGroupMemberIDs(ctx, groupID)
|
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
sort.Strings(groupMemberIDList)
|
|
|
|
var all string
|
|
|
|
for _, v := range groupMemberIDList {
|
|
|
|
all += v
|
|
|
|
}
|
|
|
|
bi := big.NewInt(0)
|
|
|
|
bi.SetString(utils.Md5(all)[0:8], 16)
|
|
|
|
return strconv.Itoa(int(bi.Uint64())), nil
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "hashCodeUint64", hashCodeUint64)
|
|
|
|
}()
|
|
|
|
hashCodeStr, err := g.rcClient.Fetch(g.getGroupMembersHashKey(groupID), time.Second*30*60, generateHash)
|
|
|
|
if err != nil {
|
|
|
|
return 0, utils.Wrap(err, "fetch failed")
|
|
|
|
}
|
|
|
|
hashCode, err := strconv.Atoi(hashCodeStr)
|
|
|
|
return uint64(hashCode), err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *GroupCache) DelGroupMembersHash(ctx context.Context, groupID string) (err error) {
|
|
|
|
defer func() {
|
|
|
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
|
|
|
|
}()
|
|
|
|
return g.rcClient.TagAsDeleted(g.getGroupMembersHashKey(groupID))
|
|
|
|
}
|
|
|
|
|
|
|
|
// groupMemberIDs
|
|
|
|
// from redis
|
|
|
|
func (g *GroupCache) GetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) {
|
|
|
|
f := func() (string, error) {
|
|
|
|
groupInfo, err := g.GetGroupInfo(ctx, groupID)
|
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
var groupMemberIDList []string
|
|
|
|
if groupInfo.GroupType == constant.SuperGroup {
|
|
|
|
superGroup, err := g.mongoDB.GetSuperGroup(ctx, groupID)
|
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
groupMemberIDList = superGroup.MemberIDList
|
|
|
|
} else {
|
|
|
|
groupMemberIDList, err = relation.GetGroupMemberIDListByGroupID(groupID)
|
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
bytes, err := json.Marshal(groupMemberIDList)
|
|
|
|
if err != nil {
|
|
|
|
return "", utils.Wrap(err, "")
|
|
|
|
}
|
|
|
|
return string(bytes), nil
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "groupMemberIDList", groupMemberIDs)
|
|
|
|
}()
|
|
|
|
groupIDListStr, err := g.rcClient.Fetch(g.getGroupMemberIDsKey(groupID), time.Second*30*60, f)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
err = json.Unmarshal([]byte(groupIDListStr), &groupMemberIDs)
|
|
|
|
return groupMemberIDs, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *GroupCache) DelGroupMemberIDs(ctx context.Context, groupID string) (err error) {
|
|
|
|
defer func() {
|
|
|
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
|
|
|
|
}()
|
|
|
|
return g.rcClient.TagAsDeleted(g.getGroupMemberIDsKey(groupID))
|
|
|
|
}
|
|
|
|
|
|
|
|
// from local map
|
|
|
|
func (g *GroupCache) LocalGetGroupMemberIDs(ctx context.Context, groupID string) (groupMemberIDs []string, err error) {
|
|
|
|
remoteHash, err := g.GetGroupMembersHash(ctx, groupID)
|
|
|
|
if err != nil {
|
|
|
|
g.cacheGroupMtx.Lock()
|
|
|
|
defer g.cacheGroupMtx.Unlock()
|
|
|
|
delete(g.cacheGroupMemberUserIDs, groupID)
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
g.cacheGroupMtx.Lock()
|
|
|
|
defer g.cacheGroupMtx.Unlock()
|
|
|
|
if remoteHash == 0 {
|
|
|
|
delete(g.cacheGroupMemberUserIDs, groupID)
|
|
|
|
return []string{}, nil
|
|
|
|
}
|
|
|
|
localCache, ok := g.cacheGroupMemberUserIDs[groupID]
|
|
|
|
if ok && localCache.MemberListHash == remoteHash {
|
|
|
|
return localCache.UserIDs, nil
|
|
|
|
}
|
|
|
|
groupMemberIDsRemote, err := g.GetGroupMemberIDs(ctx, groupID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
g.cacheGroupMemberUserIDs[groupID] = &GroupMemberIDsHash{
|
|
|
|
MemberListHash: remoteHash,
|
|
|
|
UserIDs: groupMemberIDsRemote,
|
|
|
|
}
|
|
|
|
return groupMemberIDsRemote, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// JoinedGroups
|
|
|
|
func (g *GroupCache) GetJoinedGroupIDs(ctx context.Context, userID string) (joinedGroupIDs []string, err error) {
|
|
|
|
getJoinedGroupIDList := func() (string, error) {
|
|
|
|
joinedGroupList, err := relation.GetJoinedGroupIDListByUserID(userID)
|
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
bytes, err := json.Marshal(joinedGroupList)
|
|
|
|
if err != nil {
|
|
|
|
return "", utils.Wrap(err, "")
|
|
|
|
}
|
|
|
|
return string(bytes), nil
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "joinedGroupIDs", joinedGroupIDs)
|
|
|
|
}()
|
|
|
|
joinedGroupIDListStr, err := g.rcClient.Fetch(g.getJoinedGroupsKey(userID), time.Second*30*60, getJoinedGroupIDList)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
err = json.Unmarshal([]byte(joinedGroupIDListStr), &joinedGroupIDs)
|
|
|
|
return joinedGroupIDs, utils.Wrap(err, "")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *GroupCache) DelJoinedGroupIDs(ctx context.Context, userID string) (err error) {
|
|
|
|
defer func() {
|
|
|
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID)
|
|
|
|
}()
|
|
|
|
return g.rcClient.TagAsDeleted(g.getJoinedGroupsKey(userID))
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetGroupMemberInfo
|
|
|
|
func (g *GroupCache) GetGroupMemberInfo(ctx context.Context, groupID, userID string) (groupMember *relation.GroupMember, err error) {
|
|
|
|
getGroupMemberInfo := func() (string, error) {
|
|
|
|
groupMemberInfo, err := relation.GetGroupMemberInfoByGroupIDAndUserID(groupID, userID)
|
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
bytes, err := json.Marshal(groupMemberInfo)
|
|
|
|
if err != nil {
|
|
|
|
return "", utils.Wrap(err, "")
|
|
|
|
}
|
|
|
|
return string(bytes), nil
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "userID", userID, "groupMember", *groupMember)
|
|
|
|
}()
|
|
|
|
groupMemberInfoStr, err := g.rcClient.Fetch(g.getGroupMemberInfoKey(groupID, userID), time.Second*30*60, getGroupMemberInfo)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
groupMember = &relation.GroupMember{}
|
|
|
|
err = json.Unmarshal([]byte(groupMemberInfoStr), groupMember)
|
|
|
|
return groupMember, utils.Wrap(err, "")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *GroupCache) DelGroupMemberInfo(ctx context.Context, groupID, userID string) (err error) {
|
|
|
|
defer func() {
|
|
|
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "userID", userID)
|
|
|
|
}()
|
|
|
|
return g.rcClient.TagAsDeleted(g.getGroupMemberInfoKey(groupID, userID))
|
|
|
|
}
|
|
|
|
|
|
|
|
// groupMemberNum
|
|
|
|
func (g *GroupCache) GetGroupMemberNum(ctx context.Context, groupID string) (num int, err error) {
|
|
|
|
getGroupMemberNum := func() (string, error) {
|
|
|
|
num, err := relation.GetGroupMemberNumByGroupID(groupID)
|
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
return strconv.Itoa(int(num)), nil
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "num", num)
|
|
|
|
}()
|
|
|
|
groupMember, err := g.rcClient.Fetch(g.getGroupMemberNumKey(groupID), time.Second*30*60, getGroupMemberNum)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
return strconv.Atoi(groupMember)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *GroupCache) DelGroupMemberNum(ctx context.Context, groupID string) (err error) {
|
|
|
|
defer func() {
|
|
|
|
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID)
|
|
|
|
}()
|
|
|
|
return g.rcClient.TagAsDeleted(g.getGroupMemberNumKey(groupID))
|
|
|
|
}
|