pull/1427/head
withchao 2 years ago
parent 9e8791a4f4
commit 55fade28db

@ -16,12 +16,10 @@ package group
import (
"context"
"crypto/md5"
"encoding/binary"
"encoding/json"
"fmt"
"github.com/OpenIMSDK/tools/tx"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/newmgo"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/grouphash"
"math/big"
"math/rand"
"strconv"
@ -91,7 +89,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
msgRpcClient := rpcclient.NewMessageRpcClient(client)
conversationRpcClient := rpcclient.NewConversationRpcClient(client)
var gs groupServer
database := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, tx.NewMongo(mongo.GetClient()), gs.groupMemberHashCode)
database := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, tx.NewMongo(mongo.GetClient()), grouphash.NewGroupHashFromGroupServer(&gs))
gs.db = database
gs.User = userRpcClient
gs.Notification = notification.NewGroupNotificationSender(database, &msgRpcClient, &userRpcClient, func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) {
@ -427,14 +425,6 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
}
func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbgroup.GetGroupAllMemberReq) (*pbgroup.GetGroupAllMemberResp, error) {
resp := &pbgroup.GetGroupAllMemberResp{}
group, err := s.db.TakeGroup(ctx, req.GroupID)
if err != nil {
return nil, err
}
if group.Status == constant.GroupStatusDismissed {
return nil, errs.ErrData.Wrap("group dismissed")
}
members, err := s.db.FindGroupMemberAll(ctx, req.GroupID)
if err != nil {
return nil, err
@ -442,6 +432,7 @@ func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbgroup.GetGro
if err := s.PopulateGroupMember(ctx, members...); err != nil {
return nil, err
}
resp := &pbgroup.GetGroupAllMemberResp{}
resp.Members = utils.Slice(members, func(e *relationtb.GroupMemberModel) *sdkws.GroupMemberFullInfo {
return convert.Db2PbGroupMember(e)
})
@ -1601,36 +1592,36 @@ func (s *groupServer) GetGroupUsersReqApplicationList(ctx context.Context, req *
return resp, nil
}
func (s *groupServer) groupMemberHashCode(ctx context.Context, groupID string) (uint64, error) {
userIDs, err := s.db.FindGroupMemberUserID(ctx, groupID)
if err != nil {
return 0, err
}
var members []*sdkws.GroupMemberFullInfo
if len(userIDs) > 0 {
resp, err := s.GetGroupMembersInfo(ctx, &pbgroup.GetGroupMembersInfoReq{GroupID: groupID, UserIDs: userIDs})
if err != nil {
return 0, err
}
members = resp.Members
utils.Sort(userIDs, true)
}
memberMap := utils.SliceToMap(members, func(e *sdkws.GroupMemberFullInfo) string {
return e.UserID
})
res := make([]*sdkws.GroupMemberFullInfo, 0, len(members))
for _, userID := range userIDs {
member, ok := memberMap[userID]
if !ok {
continue
}
member.AppMangerLevel = 0
res = append(res, member)
}
data, err := json.Marshal(res)
if err != nil {
return 0, err
}
sum := md5.Sum(data)
return binary.BigEndian.Uint64(sum[:]), nil
}
//func (s *groupServer) groupMemberHashCode(ctx context.Context, groupID string) (uint64, error) {
// userIDs, err := s.db.FindGroupMemberUserID(ctx, groupID)
// if err != nil {
// return 0, err
// }
// var members []*sdkws.GroupMemberFullInfo
// if len(userIDs) > 0 {
// resp, err := s.GetGroupMembersInfo(ctx, &pbgroup.GetGroupMembersInfoReq{GroupID: groupID, UserIDs: userIDs})
// if err != nil {
// return 0, err
// }
// members = resp.Members
// utils.Sort(userIDs, true)
// }
// memberMap := utils.SliceToMap(members, func(e *sdkws.GroupMemberFullInfo) string {
// return e.UserID
// })
// res := make([]*sdkws.GroupMemberFullInfo, 0, len(members))
// for _, userID := range userIDs {
// member, ok := memberMap[userID]
// if !ok {
// continue
// }
// member.AppMangerLevel = 0
// res = append(res, member)
// }
// data, err := json.Marshal(res)
// if err != nil {
// return 0, err
// }
// sum := md5.Sum(data)
// return binary.BigEndian.Uint64(sum[:]), nil
//}

@ -17,6 +17,7 @@ package tools
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/newmgo"
"math"
"github.com/redis/go-redis/v9"
@ -77,24 +78,35 @@ func InitMsgTool() (*MsgTool, error) {
return nil, err
}
discov, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery)
/*
discov, err := zookeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
zookeeper.WithFreq(time.Hour), zookeeper.WithRoundRobin(), zookeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username,
config.Config.Zookeeper.Password), zookeeper.WithTimeout(10), zookeeper.WithLogger(log.NewZkLogger()))*/
if err != nil {
return nil, err
}
discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
userDB := relation.NewUserGorm(db)
userDB, err := newmgo.NewUserMongo(mongo.GetDatabase())
if err != nil {
return nil, err
}
msgDatabase := controller.InitCommonMsgDatabase(rdb, mongo.GetDatabase())
userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase())
userDatabase := controller.NewUserDatabase(
userDB,
cache.NewUserCacheRedis(rdb, relation.NewUserGorm(db), cache.GetDefaultOpt()),
cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt()),
tx.NewMongo(mongo.GetClient()),
userMongoDB,
)
groupDatabase := controller.InitGroupDatabase(db, rdb, mongo.GetDatabase(), nil)
groupDB, err := newmgo.NewGroupMongo(mongo.GetDatabase())
if err != nil {
return nil, err
}
groupMemberDB, err := newmgo.NewGroupMember(mongo.GetDatabase())
if err != nil {
return nil, err
}
groupRequestDB, err := newmgo.NewGroupRequestMgo(mongo.GetDatabase())
if err != nil {
return nil, err
}
groupDatabase := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, tx.NewMongo(mongo.GetClient()), nil)
conversationDatabase := controller.NewConversationDatabase(
relation.NewConversationGorm(db),
cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), relation.NewConversationGorm(db)),

@ -44,6 +44,10 @@ const (
groupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:"
)
type GroupHash interface {
GetGroupHash(ctx context.Context, groupID string) (uint64, error)
}
type GroupCache interface {
metaCache
NewCache() GroupCache
@ -87,7 +91,7 @@ type GroupCacheRedis struct {
groupRequestDB relationtb.GroupRequestModelInterface
expireTime time.Duration
rcClient *rockscache.Client
hashCode func(ctx context.Context, groupID string) (uint64, error)
groupHash GroupHash
}
func NewGroupCacheRedis(
@ -95,7 +99,7 @@ func NewGroupCacheRedis(
groupDB relationtb.GroupModelInterface,
groupMemberDB relationtb.GroupMemberModelInterface,
groupRequestDB relationtb.GroupRequestModelInterface,
hashCode func(ctx context.Context, groupID string) (uint64, error),
hashCode GroupHash,
opts rockscache.Options,
) GroupCache {
rcClient := rockscache.NewClient(rdb, opts)
@ -103,7 +107,7 @@ func NewGroupCacheRedis(
return &GroupCacheRedis{
rcClient: rcClient, expireTime: groupExpireTime,
groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB,
hashCode: hashCode,
groupHash: hashCode,
metaCache: NewMetaCacheRedis(rcClient),
}
}
@ -169,7 +173,6 @@ func (g *GroupCacheRedis) GetGroupMemberIndex(groupMember *relationtb.GroupMembe
return 0, errIndex
}
// groupInfo.
func (g *GroupCacheRedis) GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relationtb.GroupModel, err error) {
return batchGetCache2(ctx, g.rcClient, g.expireTime, groupIDs, func(groupID string) string {
return g.getGroupInfoKey(groupID)
@ -220,14 +223,19 @@ func (g *GroupCacheRedis) DelGroupAllRoleLevel(groupID string) GroupCache {
return g.DelGroupRoleLevel(groupID, []int32{constant.GroupOwner, constant.GroupAdmin, constant.GroupOrdinaryUsers})
}
// groupMembersHash.
func (g *GroupCacheRedis) GetGroupMembersHash(ctx context.Context, groupID string) (hashCode uint64, err error) {
if g.groupHash == nil {
return 0, errs.ErrInternalServer.Wrap("group hash is nil")
}
return getCache(ctx, g.rcClient, g.getGroupMembersHashKey(groupID), g.expireTime, func(ctx context.Context) (uint64, error) {
return g.hashCode(ctx, groupID)
return g.groupHash.GetGroupHash(ctx, groupID)
})
}
func (g *GroupCacheRedis) GetGroupMemberHashMap(ctx context.Context, groupIDs []string) (map[string]*relationtb.GroupSimpleUserID, error) {
if g.groupHash == nil {
return nil, errs.ErrInternalServer.Wrap("group hash is nil")
}
res := make(map[string]*relationtb.GroupSimpleUserID)
for _, groupID := range groupIDs {
hash, err := g.GetGroupMembersHash(ctx, groupID)

@ -73,7 +73,7 @@ type GroupDatabase interface {
DeleteGroupMemberHash(ctx context.Context, groupIDs []string) error
}
func NewGroupDatabase(rdb redis.UniversalClient, groupDB relationtb.GroupModelInterface, groupMemberDB relationtb.GroupMemberModelInterface, groupRequestDB relationtb.GroupRequestModelInterface, ctxTx tx.CtxTx, hashCode func(ctx context.Context, groupID string) (uint64, error)) GroupDatabase {
func NewGroupDatabase(rdb redis.UniversalClient, groupDB relationtb.GroupModelInterface, groupMemberDB relationtb.GroupMemberModelInterface, groupRequestDB relationtb.GroupRequestModelInterface, ctxTx tx.CtxTx, groupHash cache.GroupHash) GroupDatabase {
rcOptions := rockscache.NewDefaultOptions()
rcOptions.StrongConsistency = true
rcOptions.RandomExpireAdjustment = 0.2
@ -82,7 +82,7 @@ func NewGroupDatabase(rdb redis.UniversalClient, groupDB relationtb.GroupModelIn
groupMemberDB: groupMemberDB,
groupRequestDB: groupRequestDB,
ctxTx: ctxTx,
cache: cache.NewGroupCacheRedis(rdb, groupDB, groupMemberDB, groupRequestDB, hashCode, rcOptions),
cache: cache.NewGroupCacheRedis(rdb, groupDB, groupMemberDB, groupRequestDB, groupHash, rcOptions),
}
}

@ -14,184 +14,185 @@
package relation
import (
"context"
"gorm.io/gorm"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/ormutil"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
)
var _ relation.GroupMemberModelInterface = (*GroupMemberGorm)(nil)
type GroupMemberGorm struct {
*MetaDB
}
func NewGroupMemberDB(db *gorm.DB) relation.GroupMemberModelInterface {
return &GroupMemberGorm{NewMetaDB(db, &relation.GroupMemberModel{})}
}
func (g *GroupMemberGorm) NewTx(tx any) relation.GroupMemberModelInterface {
return &GroupMemberGorm{NewMetaDB(tx.(*gorm.DB), &relation.GroupMemberModel{})}
}
func (g *GroupMemberGorm) Create(ctx context.Context, groupMemberList []*relation.GroupMemberModel) (err error) {
return utils.Wrap(g.db(ctx).Create(&groupMemberList).Error, "")
}
func (g *GroupMemberGorm) Delete(ctx context.Context, groupID string, userIDs []string) (err error) {
return utils.Wrap(
g.db(ctx).Where("group_id = ? and user_id in (?)", groupID, userIDs).Delete(&relation.GroupMemberModel{}).Error,
"",
)
}
func (g *GroupMemberGorm) DeleteGroup(ctx context.Context, groupIDs []string) (err error) {
return utils.Wrap(g.db(ctx).Where("group_id in (?)", groupIDs).Delete(&relation.GroupMemberModel{}).Error, "")
}
func (g *GroupMemberGorm) Update(ctx context.Context, groupID string, userID string, data map[string]any) (err error) {
return utils.Wrap(g.db(ctx).Where("group_id = ? and user_id = ?", groupID, userID).Updates(data).Error, "")
}
func (g *GroupMemberGorm) UpdateRoleLevel(
ctx context.Context,
groupID string,
userID string,
roleLevel int32,
) (rowsAffected int64, err error) {
db := g.db(ctx).Where("group_id = ? and user_id = ?", groupID, userID).Updates(map[string]any{
"role_level": roleLevel,
})
return db.RowsAffected, utils.Wrap(db.Error, "")
}
func (g *GroupMemberGorm) Find(
ctx context.Context,
groupIDs []string,
userIDs []string,
roleLevels []int32,
) (groupMembers []*relation.GroupMemberModel, err error) {
db := g.db(ctx)
if len(groupIDs) > 0 {
db = db.Where("group_id in (?)", groupIDs)
}
if len(userIDs) > 0 {
db = db.Where("user_id in (?)", userIDs)
}
if len(roleLevels) > 0 {
db = db.Where("role_level in (?)", roleLevels)
}
return groupMembers, utils.Wrap(db.Find(&groupMembers).Error, "")
}
func (g *GroupMemberGorm) Take(
ctx context.Context,
groupID string,
userID string,
) (groupMember *relation.GroupMemberModel, err error) {
groupMember = &relation.GroupMemberModel{}
return groupMember, utils.Wrap(
g.db(ctx).Where("group_id = ? and user_id = ?", groupID, userID).Take(groupMember).Error,
"",
)
}
func (g *GroupMemberGorm) TakeOwner(
ctx context.Context,
groupID string,
) (groupMember *relation.GroupMemberModel, err error) {
groupMember = &relation.GroupMemberModel{}
return groupMember, utils.Wrap(
g.db(ctx).Where("group_id = ? and role_level = ?", groupID, constant.GroupOwner).Take(groupMember).Error,
"",
)
}
func (g *GroupMemberGorm) SearchMember(
ctx context.Context,
keyword string,
groupIDs []string,
userIDs []string,
roleLevels []int32,
pageNumber, showNumber int32,
) (total uint32, groupList []*relation.GroupMemberModel, err error) {
db := g.db(ctx)
ormutil.GormIn(&db, "group_id", groupIDs)
ormutil.GormIn(&db, "user_id", userIDs)
ormutil.GormIn(&db, "role_level", roleLevels)
return ormutil.GormSearch[relation.GroupMemberModel](db, []string{"nickname"}, keyword, pageNumber, showNumber)
}
func (g *GroupMemberGorm) MapGroupMemberNum(
ctx context.Context,
groupIDs []string,
) (count map[string]uint32, err error) {
return ormutil.MapCount(g.db(ctx).Where("group_id in (?)", groupIDs), "group_id")
}
func (g *GroupMemberGorm) FindJoinUserID(
ctx context.Context,
groupIDs []string,
) (groupUsers map[string][]string, err error) {
var groupMembers []*relation.GroupMemberModel
if err := g.db(ctx).Select("group_id, user_id").Where("group_id in (?)", groupIDs).Find(&groupMembers).Error; err != nil {
return nil, utils.Wrap(err, "")
}
groupUsers = make(map[string][]string)
for _, item := range groupMembers {
v, ok := groupUsers[item.GroupID]
if !ok {
groupUsers[item.GroupID] = []string{item.UserID}
} else {
groupUsers[item.GroupID] = append(v, item.UserID)
}
}
return groupUsers, nil
}
func (g *GroupMemberGorm) FindMemberUserID(ctx context.Context, groupID string) (userIDs []string, err error) {
return userIDs, utils.Wrap(g.db(ctx).Where("group_id = ?", groupID).Pluck("user_id", &userIDs).Error, "")
}
func (g *GroupMemberGorm) FindUserJoinedGroupID(ctx context.Context, userID string) (groupIDs []string, err error) {
return groupIDs, utils.Wrap(g.db(ctx).Where("user_id = ?", userID).Pluck("group_id", &groupIDs).Error, "")
}
func (g *GroupMemberGorm) TakeGroupMemberNum(ctx context.Context, groupID string) (count int64, err error) {
return count, utils.Wrap(g.db(ctx).Where("group_id = ?", groupID).Count(&count).Error, "")
}
func (g *GroupMemberGorm) FindUsersJoinedGroupID(ctx context.Context, userIDs []string) (map[string][]string, error) {
var groupMembers []*relation.GroupMemberModel
err := g.db(ctx).Select("group_id, user_id").Where("user_id IN (?)", userIDs).Find(&groupMembers).Error
if err != nil {
return nil, err
}
result := make(map[string][]string)
for _, groupMember := range groupMembers {
v, ok := result[groupMember.UserID]
if !ok {
result[groupMember.UserID] = []string{groupMember.GroupID}
} else {
result[groupMember.UserID] = append(v, groupMember.GroupID)
}
}
return result, nil
}
func (g *GroupMemberGorm) FindUserManagedGroupID(ctx context.Context, userID string) (groupIDs []string, err error) {
return groupIDs, utils.Wrap(
g.db(ctx).
Model(&relation.GroupMemberModel{}).
Where("user_id = ? and (role_level = ? or role_level = ?)", userID, constant.GroupOwner, constant.GroupAdmin).
Pluck("group_id", &groupIDs).
Error,
"",
)
}
//
//import (
// "context"
//
// "gorm.io/gorm"
//
// "github.com/OpenIMSDK/protocol/constant"
// "github.com/OpenIMSDK/tools/ormutil"
// "github.com/OpenIMSDK/tools/utils"
//
// "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
//)
//
//var _ relation.GroupMemberModelInterface = (*GroupMemberGorm)(nil)
//
//type GroupMemberGorm struct {
// *MetaDB
//}
//
//func NewGroupMemberDB(db *gorm.DB) relation.GroupMemberModelInterface {
// return &GroupMemberGorm{NewMetaDB(db, &relation.GroupMemberModel{})}
//}
//
//func (g *GroupMemberGorm) NewTx(tx any) relation.GroupMemberModelInterface {
// return &GroupMemberGorm{NewMetaDB(tx.(*gorm.DB), &relation.GroupMemberModel{})}
//}
//
//func (g *GroupMemberGorm) Create(ctx context.Context, groupMemberList []*relation.GroupMemberModel) (err error) {
// return utils.Wrap(g.db(ctx).Create(&groupMemberList).Error, "")
//}
//
//func (g *GroupMemberGorm) Delete(ctx context.Context, groupID string, userIDs []string) (err error) {
// return utils.Wrap(
// g.db(ctx).Where("group_id = ? and user_id in (?)", groupID, userIDs).Delete(&relation.GroupMemberModel{}).Error,
// "",
// )
//}
//
//func (g *GroupMemberGorm) DeleteGroup(ctx context.Context, groupIDs []string) (err error) {
// return utils.Wrap(g.db(ctx).Where("group_id in (?)", groupIDs).Delete(&relation.GroupMemberModel{}).Error, "")
//}
//
//func (g *GroupMemberGorm) Update(ctx context.Context, groupID string, userID string, data map[string]any) (err error) {
// return utils.Wrap(g.db(ctx).Where("group_id = ? and user_id = ?", groupID, userID).Updates(data).Error, "")
//}
//
//func (g *GroupMemberGorm) UpdateRoleLevel(
// ctx context.Context,
// groupID string,
// userID string,
// roleLevel int32,
//) (rowsAffected int64, err error) {
// db := g.db(ctx).Where("group_id = ? and user_id = ?", groupID, userID).Updates(map[string]any{
// "role_level": roleLevel,
// })
// return db.RowsAffected, utils.Wrap(db.Error, "")
//}
//
//func (g *GroupMemberGorm) Find(
// ctx context.Context,
// groupIDs []string,
// userIDs []string,
// roleLevels []int32,
//) (groupMembers []*relation.GroupMemberModel, err error) {
// db := g.db(ctx)
// if len(groupIDs) > 0 {
// db = db.Where("group_id in (?)", groupIDs)
// }
// if len(userIDs) > 0 {
// db = db.Where("user_id in (?)", userIDs)
// }
// if len(roleLevels) > 0 {
// db = db.Where("role_level in (?)", roleLevels)
// }
// return groupMembers, utils.Wrap(db.Find(&groupMembers).Error, "")
//}
//
//func (g *GroupMemberGorm) Take(
// ctx context.Context,
// groupID string,
// userID string,
//) (groupMember *relation.GroupMemberModel, err error) {
// groupMember = &relation.GroupMemberModel{}
// return groupMember, utils.Wrap(
// g.db(ctx).Where("group_id = ? and user_id = ?", groupID, userID).Take(groupMember).Error,
// "",
// )
//}
//
//func (g *GroupMemberGorm) TakeOwner(
// ctx context.Context,
// groupID string,
//) (groupMember *relation.GroupMemberModel, err error) {
// groupMember = &relation.GroupMemberModel{}
// return groupMember, utils.Wrap(
// g.db(ctx).Where("group_id = ? and role_level = ?", groupID, constant.GroupOwner).Take(groupMember).Error,
// "",
// )
//}
//
//func (g *GroupMemberGorm) SearchMember(
// ctx context.Context,
// keyword string,
// groupIDs []string,
// userIDs []string,
// roleLevels []int32,
// pageNumber, showNumber int32,
//) (total uint32, groupList []*relation.GroupMemberModel, err error) {
// db := g.db(ctx)
// ormutil.GormIn(&db, "group_id", groupIDs)
// ormutil.GormIn(&db, "user_id", userIDs)
// ormutil.GormIn(&db, "role_level", roleLevels)
// return ormutil.GormSearch[relation.GroupMemberModel](db, []string{"nickname"}, keyword, pageNumber, showNumber)
//}
//
//func (g *GroupMemberGorm) MapGroupMemberNum(
// ctx context.Context,
// groupIDs []string,
//) (count map[string]uint32, err error) {
// return ormutil.MapCount(g.db(ctx).Where("group_id in (?)", groupIDs), "group_id")
//}
//
//func (g *GroupMemberGorm) FindJoinUserID(
// ctx context.Context,
// groupIDs []string,
//) (groupUsers map[string][]string, err error) {
// var groupMembers []*relation.GroupMemberModel
// if err := g.db(ctx).Select("group_id, user_id").Where("group_id in (?)", groupIDs).Find(&groupMembers).Error; err != nil {
// return nil, utils.Wrap(err, "")
// }
// groupUsers = make(map[string][]string)
// for _, item := range groupMembers {
// v, ok := groupUsers[item.GroupID]
// if !ok {
// groupUsers[item.GroupID] = []string{item.UserID}
// } else {
// groupUsers[item.GroupID] = append(v, item.UserID)
// }
// }
// return groupUsers, nil
//}
//
//func (g *GroupMemberGorm) FindMemberUserID(ctx context.Context, groupID string) (userIDs []string, err error) {
// return userIDs, utils.Wrap(g.db(ctx).Where("group_id = ?", groupID).Pluck("user_id", &userIDs).Error, "")
//}
//
//func (g *GroupMemberGorm) FindUserJoinedGroupID(ctx context.Context, userID string) (groupIDs []string, err error) {
// return groupIDs, utils.Wrap(g.db(ctx).Where("user_id = ?", userID).Pluck("group_id", &groupIDs).Error, "")
//}
//
//func (g *GroupMemberGorm) TakeGroupMemberNum(ctx context.Context, groupID string) (count int64, err error) {
// return count, utils.Wrap(g.db(ctx).Where("group_id = ?", groupID).Count(&count).Error, "")
//}
//
//func (g *GroupMemberGorm) FindUsersJoinedGroupID(ctx context.Context, userIDs []string) (map[string][]string, error) {
// var groupMembers []*relation.GroupMemberModel
// err := g.db(ctx).Select("group_id, user_id").Where("user_id IN (?)", userIDs).Find(&groupMembers).Error
// if err != nil {
// return nil, err
// }
// result := make(map[string][]string)
// for _, groupMember := range groupMembers {
// v, ok := result[groupMember.UserID]
// if !ok {
// result[groupMember.UserID] = []string{groupMember.GroupID}
// } else {
// result[groupMember.UserID] = append(v, groupMember.GroupID)
// }
// }
// return result, nil
//}
//
//func (g *GroupMemberGorm) FindUserManagedGroupID(ctx context.Context, userID string) (groupIDs []string, err error) {
// return groupIDs, utils.Wrap(
// g.db(ctx).
// Model(&relation.GroupMemberModel{}).
// Where("user_id = ? and (role_level = ? or role_level = ?)", userID, constant.GroupOwner, constant.GroupAdmin).
// Pluck("group_id", &groupIDs).
// Error,
// "",
// )
//}

@ -14,124 +14,125 @@
package relation
import (
"context"
"time"
"github.com/OpenIMSDK/tools/errs"
"gorm.io/gorm"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
)
type UserGorm struct {
*MetaDB
}
func NewUserGorm(db *gorm.DB) relation.UserModelInterface {
//return &UserGorm{NewMetaDB(db, &relation.UserModel{})}
return nil
}
// 插入多条.
func (u *UserGorm) Create(ctx context.Context, users []*relation.UserModel) (err error) {
return utils.Wrap(u.db(ctx).Create(&users).Error, "")
}
// 更新用户信息 零值.
func (u *UserGorm) UpdateByMap(ctx context.Context, userID string, args map[string]any) (err error) {
return utils.Wrap(u.db(ctx).Model(&relation.UserModel{}).Where("user_id = ?", userID).Updates(args).Error, "")
}
// 更新多个用户信息 非零值.
func (u *UserGorm) Update(ctx context.Context, user *relation.UserModel) (err error) {
return utils.Wrap(u.db(ctx).Model(user).Updates(user).Error, "")
}
// 获取指定用户信息 不存在,也不返回错误.
func (u *UserGorm) Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) {
err = utils.Wrap(u.db(ctx).Where("user_id in (?)", userIDs).Find(&users).Error, "")
return users, err
}
// 获取某个用户信息 不存在,则返回错误.
func (u *UserGorm) Take(ctx context.Context, userID string) (user *relation.UserModel, err error) {
user = &relation.UserModel{}
err = utils.Wrap(u.db(ctx).Where("user_id = ?", userID).Take(&user).Error, "")
return user, err
}
// 获取用户信息 不存在,不返回错误.
func (u *UserGorm) Page(
ctx context.Context,
pageNumber, showNumber int32,
) (users []*relation.UserModel, count int64, err error) {
err = utils.Wrap(u.db(ctx).Count(&count).Error, "")
if err != nil {
return
}
err = utils.Wrap(
u.db(ctx).
Limit(int(showNumber)).
Offset(int((pageNumber-1)*showNumber)).
Find(&users).
Order("create_time DESC").
Error,
"",
)
return
}
// 获取所有用户ID.
func (u *UserGorm) GetAllUserID(ctx context.Context, pageNumber, showNumber int32) (userIDs []string, err error) {
if pageNumber == 0 || showNumber == 0 {
return userIDs, errs.Wrap(u.db(ctx).Pluck("user_id", &userIDs).Error)
} else {
return userIDs, errs.Wrap(u.db(ctx).Limit(int(showNumber)).Offset(int((pageNumber-1)*showNumber)).Pluck("user_id", &userIDs).Error)
}
}
func (u *UserGorm) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) {
err = u.db(ctx).Model(&relation.UserModel{}).Where("user_id = ?", userID).Pluck("global_recv_msg_opt", &opt).Error
return opt, err
}
func (u *UserGorm) CountTotal(ctx context.Context, before *time.Time) (count int64, err error) {
db := u.db(ctx).Model(&relation.UserModel{})
if before != nil {
db = db.Where("create_time < ?", before)
}
if err := db.Count(&count).Error; err != nil {
return 0, err
}
return count, nil
}
func (u *UserGorm) CountRangeEverydayTotal(
ctx context.Context,
start time.Time,
end time.Time,
) (map[string]int64, error) {
var res []struct {
Date time.Time `gorm:"column:date"`
Count int64 `gorm:"column:count"`
}
err := u.db(ctx).
Model(&relation.UserModel{}).
Select("DATE(create_time) AS date, count(1) AS count").
Where("create_time >= ? and create_time < ?", start, end).
Group("date").
Find(&res).
Error
if err != nil {
return nil, errs.Wrap(err)
}
v := make(map[string]int64)
for _, r := range res {
v[r.Date.Format("2006-01-02")] = r.Count
}
return v, nil
}
//
//import (
// "context"
// "time"
//
// "github.com/OpenIMSDK/tools/errs"
//
// "gorm.io/gorm"
//
// "github.com/OpenIMSDK/tools/utils"
//
// "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
//)
//
//type UserGorm struct {
// *MetaDB
//}
//
//func NewUserGorm(db *gorm.DB) relation.UserModelInterface {
// //return &UserGorm{NewMetaDB(db, &relation.UserModel{})}
// return nil
//}
//
//// 插入多条.
//func (u *UserGorm) Create(ctx context.Context, users []*relation.UserModel) (err error) {
// return utils.Wrap(u.db(ctx).Create(&users).Error, "")
//}
//
//// 更新用户信息 零值.
//func (u *UserGorm) UpdateByMap(ctx context.Context, userID string, args map[string]any) (err error) {
// return utils.Wrap(u.db(ctx).Model(&relation.UserModel{}).Where("user_id = ?", userID).Updates(args).Error, "")
//}
//
//// 更新多个用户信息 非零值.
//func (u *UserGorm) Update(ctx context.Context, user *relation.UserModel) (err error) {
// return utils.Wrap(u.db(ctx).Model(user).Updates(user).Error, "")
//}
//
//// 获取指定用户信息 不存在,也不返回错误.
//func (u *UserGorm) Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) {
// err = utils.Wrap(u.db(ctx).Where("user_id in (?)", userIDs).Find(&users).Error, "")
// return users, err
//}
//
//// 获取某个用户信息 不存在,则返回错误.
//func (u *UserGorm) Take(ctx context.Context, userID string) (user *relation.UserModel, err error) {
// user = &relation.UserModel{}
// err = utils.Wrap(u.db(ctx).Where("user_id = ?", userID).Take(&user).Error, "")
// return user, err
//}
//
//// 获取用户信息 不存在,不返回错误.
//func (u *UserGorm) Page(
// ctx context.Context,
// pageNumber, showNumber int32,
//) (users []*relation.UserModel, count int64, err error) {
// err = utils.Wrap(u.db(ctx).Count(&count).Error, "")
// if err != nil {
// return
// }
// err = utils.Wrap(
// u.db(ctx).
// Limit(int(showNumber)).
// Offset(int((pageNumber-1)*showNumber)).
// Find(&users).
// Order("create_time DESC").
// Error,
// "",
// )
// return
//}
//
//// 获取所有用户ID.
//func (u *UserGorm) GetAllUserID(ctx context.Context, pageNumber, showNumber int32) (userIDs []string, err error) {
// if pageNumber == 0 || showNumber == 0 {
// return userIDs, errs.Wrap(u.db(ctx).Pluck("user_id", &userIDs).Error)
// } else {
// return userIDs, errs.Wrap(u.db(ctx).Limit(int(showNumber)).Offset(int((pageNumber-1)*showNumber)).Pluck("user_id", &userIDs).Error)
// }
//}
//
//func (u *UserGorm) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) {
// err = u.db(ctx).Model(&relation.UserModel{}).Where("user_id = ?", userID).Pluck("global_recv_msg_opt", &opt).Error
// return opt, err
//}
//
//func (u *UserGorm) CountTotal(ctx context.Context, before *time.Time) (count int64, err error) {
// db := u.db(ctx).Model(&relation.UserModel{})
// if before != nil {
// db = db.Where("create_time < ?", before)
// }
// if err := db.Count(&count).Error; err != nil {
// return 0, err
// }
// return count, nil
//}
//
//func (u *UserGorm) CountRangeEverydayTotal(
// ctx context.Context,
// start time.Time,
// end time.Time,
//) (map[string]int64, error) {
// var res []struct {
// Date time.Time `gorm:"column:date"`
// Count int64 `gorm:"column:count"`
// }
// err := u.db(ctx).
// Model(&relation.UserModel{}).
// Select("DATE(create_time) AS date, count(1) AS count").
// Where("create_time >= ? and create_time < ?", start, end).
// Group("date").
// Find(&res).
// Error
// if err != nil {
// return nil, errs.Wrap(err)
// }
// v := make(map[string]int64)
// for _, r := range res {
// v[r.Date.Format("2006-01-02")] = r.Count
// }
// return v, nil
//}

@ -99,16 +99,6 @@ func (m *Mongo) CreateMsgIndex() error {
return m.createMongoIndex(unrelation.Msg, true, "doc_id")
}
func (m *Mongo) CreateSuperGroupIndex() error {
if err := m.createMongoIndex(unrelation.CSuperGroup, true, "group_id"); err != nil {
return err
}
if err := m.createMongoIndex(unrelation.CUserToSuperGroup, true, "user_id"); err != nil {
return err
}
return nil
}
func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...string) error {
db := m.db.Database(config.Config.Mongo.Database).Collection(collection)
opts := options.CreateIndexes().SetMaxTime(10 * time.Second)

@ -14,149 +14,150 @@
package unrelation
import (
"context"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
)
func NewSuperGroupMongoDriver(database *mongo.Database) unrelation.SuperGroupModelInterface {
return &SuperGroupMongoDriver{
superGroupCollection: database.Collection(unrelation.CSuperGroup),
userToSuperGroupCollection: database.Collection(unrelation.CUserToSuperGroup),
}
}
type SuperGroupMongoDriver struct {
superGroupCollection *mongo.Collection
userToSuperGroupCollection *mongo.Collection
}
func (s *SuperGroupMongoDriver) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDs []string) error {
_, err := s.superGroupCollection.InsertOne(ctx, &unrelation.SuperGroupModel{
GroupID: groupID,
MemberIDs: initMemberIDs,
})
if err != nil {
return err
}
for _, userID := range initMemberIDs {
_, err = s.userToSuperGroupCollection.UpdateOne(
ctx,
bson.M{"user_id": userID},
bson.M{"$addToSet": bson.M{"group_id_list": groupID}},
&options.UpdateOptions{
Upsert: utils.ToPtr(true),
},
)
if err != nil {
return err
}
}
return nil
}
func (s *SuperGroupMongoDriver) TakeSuperGroup(
ctx context.Context,
groupID string,
) (group *unrelation.SuperGroupModel, err error) {
if err := s.superGroupCollection.FindOne(ctx, bson.M{"group_id": groupID}).Decode(&group); err != nil {
return nil, utils.Wrap(err, "")
}
return group, nil
}
func (s *SuperGroupMongoDriver) FindSuperGroup(
ctx context.Context,
groupIDs []string,
) (groups []*unrelation.SuperGroupModel, err error) {
cursor, err := s.superGroupCollection.Find(ctx, bson.M{"group_id": bson.M{
"$in": groupIDs,
}})
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
if err := cursor.All(ctx, &groups); err != nil {
return nil, utils.Wrap(err, "")
}
return groups, nil
}
func (s *SuperGroupMongoDriver) AddUserToSuperGroup(ctx context.Context, groupID string, userIDs []string) error {
_, err := s.superGroupCollection.UpdateOne(
ctx,
bson.M{"group_id": groupID},
bson.M{"$addToSet": bson.M{"member_id_list": bson.M{"$each": userIDs}}},
)
if err != nil {
return err
}
upsert := true
opts := &options.UpdateOptions{
Upsert: &upsert,
}
for _, userID := range userIDs {
_, err = s.userToSuperGroupCollection.UpdateOne(
ctx,
bson.M{"user_id": userID},
bson.M{"$addToSet": bson.M{"group_id_list": groupID}},
opts,
)
if err != nil {
return utils.Wrap(err, "transaction failed")
}
}
return nil
}
func (s *SuperGroupMongoDriver) RemoverUserFromSuperGroup(ctx context.Context, groupID string, userIDs []string) error {
_, err := s.superGroupCollection.UpdateOne(
ctx,
bson.M{"group_id": groupID},
bson.M{"$pull": bson.M{"member_id_list": bson.M{"$in": userIDs}}},
)
if err != nil {
return err
}
err = s.RemoveGroupFromUser(ctx, groupID, userIDs)
if err != nil {
return err
}
return nil
}
func (s *SuperGroupMongoDriver) GetSuperGroupByUserID(
ctx context.Context,
userID string,
) (*unrelation.UserToSuperGroupModel, error) {
var user unrelation.UserToSuperGroupModel
err := s.userToSuperGroupCollection.FindOne(ctx, bson.M{"user_id": userID}).Decode(&user)
return &user, utils.Wrap(err, "")
}
func (s *SuperGroupMongoDriver) DeleteSuperGroup(ctx context.Context, groupID string) error {
group, err := s.TakeSuperGroup(ctx, groupID)
if err != nil {
return err
}
if _, err := s.superGroupCollection.DeleteOne(ctx, bson.M{"group_id": groupID}); err != nil {
return utils.Wrap(err, "")
}
return s.RemoveGroupFromUser(ctx, groupID, group.MemberIDs)
}
func (s *SuperGroupMongoDriver) RemoveGroupFromUser(ctx context.Context, groupID string, userIDs []string) error {
_, err := s.userToSuperGroupCollection.UpdateOne(
ctx,
bson.M{"user_id": bson.M{"$in": userIDs}},
bson.M{"$pull": bson.M{"group_id_list": groupID}},
)
return utils.Wrap(err, "")
}
//
//import (
// "context"
//
// "go.mongodb.org/mongo-driver/bson"
// "go.mongodb.org/mongo-driver/mongo"
// "go.mongodb.org/mongo-driver/mongo/options"
//
// "github.com/OpenIMSDK/tools/utils"
//
// "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
//)
//
//func NewSuperGroupMongoDriver(database *mongo.Database) unrelation.SuperGroupModelInterface {
// return &SuperGroupMongoDriver{
// superGroupCollection: database.Collection(unrelation.CSuperGroup),
// userToSuperGroupCollection: database.Collection(unrelation.CUserToSuperGroup),
// }
//}
//
//type SuperGroupMongoDriver struct {
// superGroupCollection *mongo.Collection
// userToSuperGroupCollection *mongo.Collection
//}
//
//func (s *SuperGroupMongoDriver) CreateSuperGroup(ctx context.Context, groupID string, initMemberIDs []string) error {
// _, err := s.superGroupCollection.InsertOne(ctx, &unrelation.SuperGroupModel{
// GroupID: groupID,
// MemberIDs: initMemberIDs,
// })
// if err != nil {
// return err
// }
// for _, userID := range initMemberIDs {
// _, err = s.userToSuperGroupCollection.UpdateOne(
// ctx,
// bson.M{"user_id": userID},
// bson.M{"$addToSet": bson.M{"group_id_list": groupID}},
// &options.UpdateOptions{
// Upsert: utils.ToPtr(true),
// },
// )
// if err != nil {
// return err
// }
// }
// return nil
//}
//
//func (s *SuperGroupMongoDriver) TakeSuperGroup(
// ctx context.Context,
// groupID string,
//) (group *unrelation.SuperGroupModel, err error) {
// if err := s.superGroupCollection.FindOne(ctx, bson.M{"group_id": groupID}).Decode(&group); err != nil {
// return nil, utils.Wrap(err, "")
// }
// return group, nil
//}
//
//func (s *SuperGroupMongoDriver) FindSuperGroup(
// ctx context.Context,
// groupIDs []string,
//) (groups []*unrelation.SuperGroupModel, err error) {
// cursor, err := s.superGroupCollection.Find(ctx, bson.M{"group_id": bson.M{
// "$in": groupIDs,
// }})
// if err != nil {
// return nil, err
// }
// defer cursor.Close(ctx)
// if err := cursor.All(ctx, &groups); err != nil {
// return nil, utils.Wrap(err, "")
// }
// return groups, nil
//}
//
//func (s *SuperGroupMongoDriver) AddUserToSuperGroup(ctx context.Context, groupID string, userIDs []string) error {
// _, err := s.superGroupCollection.UpdateOne(
// ctx,
// bson.M{"group_id": groupID},
// bson.M{"$addToSet": bson.M{"member_id_list": bson.M{"$each": userIDs}}},
// )
// if err != nil {
// return err
// }
// upsert := true
// opts := &options.UpdateOptions{
// Upsert: &upsert,
// }
// for _, userID := range userIDs {
// _, err = s.userToSuperGroupCollection.UpdateOne(
// ctx,
// bson.M{"user_id": userID},
// bson.M{"$addToSet": bson.M{"group_id_list": groupID}},
// opts,
// )
// if err != nil {
// return utils.Wrap(err, "transaction failed")
// }
// }
// return nil
//}
//
//func (s *SuperGroupMongoDriver) RemoverUserFromSuperGroup(ctx context.Context, groupID string, userIDs []string) error {
// _, err := s.superGroupCollection.UpdateOne(
// ctx,
// bson.M{"group_id": groupID},
// bson.M{"$pull": bson.M{"member_id_list": bson.M{"$in": userIDs}}},
// )
// if err != nil {
// return err
// }
// err = s.RemoveGroupFromUser(ctx, groupID, userIDs)
// if err != nil {
// return err
// }
// return nil
//}
//
//func (s *SuperGroupMongoDriver) GetSuperGroupByUserID(
// ctx context.Context,
// userID string,
//) (*unrelation.UserToSuperGroupModel, error) {
// var user unrelation.UserToSuperGroupModel
// err := s.userToSuperGroupCollection.FindOne(ctx, bson.M{"user_id": userID}).Decode(&user)
// return &user, utils.Wrap(err, "")
//}
//
//func (s *SuperGroupMongoDriver) DeleteSuperGroup(ctx context.Context, groupID string) error {
// group, err := s.TakeSuperGroup(ctx, groupID)
// if err != nil {
// return err
// }
// if _, err := s.superGroupCollection.DeleteOne(ctx, bson.M{"group_id": groupID}); err != nil {
// return utils.Wrap(err, "")
// }
// return s.RemoveGroupFromUser(ctx, groupID, group.MemberIDs)
//}
//
//func (s *SuperGroupMongoDriver) RemoveGroupFromUser(ctx context.Context, groupID string, userIDs []string) error {
// _, err := s.userToSuperGroupCollection.UpdateOne(
// ctx,
// bson.M{"user_id": bson.M{"$in": userIDs}},
// bson.M{"$pull": bson.M{"group_id_list": groupID}},
// )
// return utils.Wrap(err, "")
//}

@ -0,0 +1,87 @@
package grouphash
import (
"context"
"crypto/md5"
"encoding/binary"
"encoding/json"
"github.com/OpenIMSDK/protocol/group"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/utils"
)
func NewGroupHashFromGroupClient(x group.GroupClient) *GroupHash {
return &GroupHash{
getGroupAllUserIDs: func(ctx context.Context, groupID string) ([]string, error) {
resp, err := x.GetGroupMemberUserIDs(ctx, &group.GetGroupMemberUserIDsReq{GroupID: groupID})
if err != nil {
return nil, err
}
return resp.UserIDs, nil
},
getGroupMemberInfo: func(ctx context.Context, groupID string, userIDs []string) ([]*sdkws.GroupMemberFullInfo, error) {
resp, err := x.GetGroupMembersInfo(ctx, &group.GetGroupMembersInfoReq{GroupID: groupID, UserIDs: userIDs})
if err != nil {
return nil, err
}
return resp.Members, nil
},
}
}
func NewGroupHashFromGroupServer(x group.GroupServer) *GroupHash {
return &GroupHash{
getGroupAllUserIDs: func(ctx context.Context, groupID string) ([]string, error) {
resp, err := x.GetGroupMemberUserIDs(ctx, &group.GetGroupMemberUserIDsReq{GroupID: groupID})
if err != nil {
return nil, err
}
return resp.UserIDs, nil
},
getGroupMemberInfo: func(ctx context.Context, groupID string, userIDs []string) ([]*sdkws.GroupMemberFullInfo, error) {
resp, err := x.GetGroupMembersInfo(ctx, &group.GetGroupMembersInfoReq{GroupID: groupID, UserIDs: userIDs})
if err != nil {
return nil, err
}
return resp.Members, nil
},
}
}
type GroupHash struct {
getGroupAllUserIDs func(ctx context.Context, groupID string) ([]string, error)
getGroupMemberInfo func(ctx context.Context, groupID string, userIDs []string) ([]*sdkws.GroupMemberFullInfo, error)
}
func (gh *GroupHash) GetGroupHash(ctx context.Context, groupID string) (uint64, error) {
userIDs, err := gh.getGroupAllUserIDs(ctx, groupID)
if err != nil {
return 0, err
}
var members []*sdkws.GroupMemberFullInfo
if len(userIDs) > 0 {
members, err = gh.getGroupMemberInfo(ctx, groupID, userIDs)
if err != nil {
return 0, err
}
utils.Sort(userIDs, true)
}
memberMap := utils.SliceToMap(members, func(e *sdkws.GroupMemberFullInfo) string {
return e.UserID
})
res := make([]*sdkws.GroupMemberFullInfo, 0, len(members))
for _, userID := range userIDs {
member, ok := memberMap[userID]
if !ok {
continue
}
member.AppMangerLevel = 0
res = append(res, member)
}
data, err := json.Marshal(res)
if err != nil {
return 0, err
}
sum := md5.Sum(data)
return binary.BigEndian.Uint64(sum[:]), nil
}
Loading…
Cancel
Save