diff --git a/internal/utils/convert.go b/internal/utils/convert.go index 690c6eedc..8ee23a4ae 100644 --- a/internal/utils/convert.go +++ b/internal/utils/convert.go @@ -41,7 +41,7 @@ func (db *DBFriend) convert() (*sdk.FriendInfo, error) { return pbFriend, nil } -func (pb *PBFriend) convert() (*imdb.Friend, error) { +func (pb *PBFriend) Convert() (*imdb.Friend, error) { dbFriend := &imdb.Friend{} utils2.CopyStructFields(dbFriend, pb) dbFriend.FriendUserID = pb.FriendUser.UserID @@ -57,14 +57,14 @@ type PBFriendRequest struct { *sdk.FriendRequest } -func (pb *PBFriendRequest) convert() (*imdb.FriendRequest, error) { +func (pb *PBFriendRequest) Convert() (*imdb.FriendRequest, error) { dbFriendRequest := &imdb.FriendRequest{} utils.CopyStructFields(dbFriendRequest, pb) dbFriendRequest.CreateTime = utils.UnixSecondToTime(int64(pb.CreateTime)) dbFriendRequest.HandleTime = utils.UnixSecondToTime(int64(pb.HandleTime)) return dbFriendRequest, nil } -func (db *DBFriendRequest) convert() (*sdk.FriendRequest, error) { +func (db *DBFriendRequest) Convert() (*sdk.FriendRequest, error) { pbFriendRequest := &sdk.FriendRequest{} utils.CopyStructFields(pbFriendRequest, db) user, err := getUsersInfo([]string{db.FromUserID}) @@ -94,13 +94,13 @@ type PBBlack struct { *sdk.BlackInfo } -func (pb *PBBlack) convert() (*imdb.Black, error) { +func (pb *PBBlack) Convert() (*imdb.Black, error) { dbBlack := &imdb.Black{} dbBlack.BlockUserID = pb.BlackUserInfo.UserID dbBlack.CreateTime = utils.UnixSecondToTime(int64(pb.CreateTime)) return dbBlack, nil } -func (db *DBBlack) convert() (*sdk.BlackInfo, error) { +func (db *DBBlack) Convert() (*sdk.BlackInfo, error) { pbBlack := &sdk.BlackInfo{} utils.CopyStructFields(pbBlack, db) pbBlack.CreateTime = uint32(db.CreateTime.Unix()) @@ -125,7 +125,7 @@ func (pb *PBGroup) Convert() (*imdb.Group, error) { utils.CopyStructFields(dst, pb) return dst, nil } -func (db *DBGroup) convert() (*sdk.GroupInfo, error) { +func (db *DBGroup) Convert() (*sdk.GroupInfo, error) { dst := &sdk.GroupInfo{} utils.CopyStructFields(dst, db) user, err := getGroupOwnerInfo(db.GroupID) @@ -155,14 +155,14 @@ type PBGroupMember struct { *sdk.GroupMemberFullInfo } -func (pb *PBGroupMember) convert() (*imdb.GroupMember, error) { +func (pb *PBGroupMember) Convert() (*imdb.GroupMember, error) { dst := &imdb.GroupMember{} utils.CopyStructFields(dst, pb) dst.JoinTime = utils.UnixSecondToTime(int64(pb.JoinTime)) dst.MuteEndTime = utils.UnixSecondToTime(int64(pb.MuteEndTime)) return dst, nil } -func (db *DBGroupMember) convert() (*sdk.GroupMemberFullInfo, error) { +func (db *DBGroupMember) Convert() (*sdk.GroupMemberFullInfo, error) { dst := &sdk.GroupMemberFullInfo{} utils.CopyStructFields(dst, db) @@ -191,14 +191,14 @@ type PBGroupRequest struct { *sdk.GroupRequest } -func (pb *PBGroupRequest) convert() (*imdb.GroupRequest, error) { +func (pb *PBGroupRequest) Convert() (*imdb.GroupRequest, error) { dst := &imdb.GroupRequest{} utils.CopyStructFields(dst, pb) dst.ReqTime = utils.UnixSecondToTime(int64(pb.ReqTime)) dst.HandledTime = utils.UnixSecondToTime(int64(pb.HandleTime)) return dst, nil } -func (db *DBGroupRequest) convert() (*sdk.GroupRequest, error) { +func (db *DBGroupRequest) Convert() (*sdk.GroupRequest, error) { dst := &sdk.GroupRequest{} utils.CopyStructFields(dst, db) dst.ReqTime = uint32(db.ReqTime.Unix()) @@ -214,7 +214,7 @@ type PBUser struct { *sdk.UserInfo } -func (pb *PBUser) convert() (*imdb.User, error) { +func (pb *PBUser) Convert() (*imdb.User, error) { dst := &imdb.User{} utils.CopyStructFields(dst, pb) dst.Birth = utils.UnixSecondToTime(pb.Birthday) @@ -222,7 +222,7 @@ func (pb *PBUser) convert() (*imdb.User, error) { return dst, nil } -func (db *DBUser) convert() (*sdk.UserInfo, error) { +func (db *DBUser) Convert() (*sdk.UserInfo, error) { dst := &sdk.UserInfo{} utils.CopyStructFields(dst, db) dst.CreateTime = uint32(db.CreateTime.Unix()) @@ -230,7 +230,7 @@ func (db *DBUser) convert() (*sdk.UserInfo, error) { return dst, nil } -func (db *DBUser) convertPublic() (*sdk.PublicUserInfo, error) { +func (db *DBUser) ConvertPublic() (*sdk.PublicUserInfo, error) { dst := &sdk.PublicUserInfo{} utils.CopyStructFields(dst, db) return dst, nil diff --git a/pkg/common/db/cache/group.go b/pkg/common/db/cache/group.go index 04f4837b4..425d7255a 100644 --- a/pkg/common/db/cache/group.go +++ b/pkg/common/db/cache/group.go @@ -22,11 +22,7 @@ type GroupCache struct { } func NewGroupCache(rdb redis.UniversalClient, db mysql.GroupModelInterface, opts rockscache.Options) *GroupCache { - rcClient := &rockscache.Client{ - Options: rockscache.Options{}, - } - redisClient := NewRedisClient(rdb) - return &GroupCache{rcClient: rcClient, expireTime: GroupExpireTime, db: db, redisClient: redisClient} + return &GroupCache{rcClient: rockscache.NewClient(rdb, opts), expireTime: GroupExpireTime, db: db, redisClient: NewRedisClient(rdb)} } func (g *GroupCache) getRedisClient() *RedisClient { diff --git a/pkg/common/db/model/group.go b/pkg/common/db/model/group.go index c1c2a1d69..60f1cd88f 100644 --- a/pkg/common/db/model/group.go +++ b/pkg/common/db/model/group.go @@ -32,6 +32,8 @@ func NewGroupModel(db mysql.GroupModelInterface, rdb redis.UniversalClient, mdb DisableCacheRead: false, StrongConsistency: true, }) + sg := mdb.Database().Collection() + sg.Find() groupModel.mongo = mongoDB.NewMongoClient(mdb) return &groupModel } @@ -60,3 +62,20 @@ func (g *GroupModel) Delete(ctx context.Context, groupIDs []string) error { func (g *GroupModel) Take(ctx context.Context, groupID string) (group *mysql.Group, err error) { return g.cache.GetGroupInfoFromCache(ctx, groupID) } + +func (g *GroupModel) Update(ctx context.Context, groups []*mysql.Group) error { + err := g.db.DB.Transaction(func(tx *gorm.DB) error { + if err := g.db.Update(ctx, groups, tx); err != nil { + return err + } + var groupIDs []string + for _, group := range groups { + groupIDs = append(groupIDs, group.GroupID) + } + if err := g.cache.DelGroupsInfoFromCache(ctx, groupIDs); err != nil { + return err + } + return nil + }) + return err +} diff --git a/pkg/common/db/mongoDB/super_group.go b/pkg/common/db/mongoDB/super_group.go index a28289b80..980e39490 100644 --- a/pkg/common/db/mongoDB/super_group.go +++ b/pkg/common/db/mongoDB/super_group.go @@ -1,5 +1,11 @@ package mongoDB +import ( + "Open_IM/pkg/utils" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + type SuperGroup struct { GroupID string `bson:"group_id" json:"groupID"` MemberIDList []string `bson:"member_id_list" json:"memberIDList"` @@ -9,3 +15,167 @@ type UserToSuperGroup struct { UserID string `bson:"user_id" json:"userID"` GroupIDList []string `bson:"group_id_list" json:"groupIDList"` } + +func New + +func (d *db.DataBases) CreateSuperGroup(groupID string, initMemberIDList []string, memberNumCount int) error { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup) + session, err := d.mongoClient.StartSession() + if err != nil { + return utils.Wrap(err, "start session failed") + } + defer session.EndSession(ctx) + sCtx := mongo.NewSessionContext(ctx, session) + superGroup := SuperGroup{ + GroupID: groupID, + MemberIDList: initMemberIDList, + } + _, err = c.InsertOne(sCtx, superGroup) + if err != nil { + _ = session.AbortTransaction(ctx) + return utils.Wrap(err, "transaction failed") + } + var users []UserToSuperGroup + for _, v := range initMemberIDList { + users = append(users, UserToSuperGroup{ + UserID: v, + }) + } + upsert := true + opts := &options.UpdateOptions{ + Upsert: &upsert, + } + c = d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup) + //_, err = c.UpdateMany(sCtx, bson.M{"user_id": bson.M{"$in": initMemberIDList}}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts) + //if err != nil { + // session.AbortTransaction(ctx) + // return utils.Wrap(err, "transaction failed") + //} + for _, userID := range initMemberIDList { + _, err = c.UpdateOne(sCtx, bson.M{"user_id": userID}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts) + if err != nil { + _ = session.AbortTransaction(ctx) + return utils.Wrap(err, "transaction failed") + } + + } + return err +} + +func (d *db.DataBases) GetSuperGroup(groupID string) (SuperGroup, error) { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup) + superGroup := SuperGroup{} + err := c.FindOne(ctx, bson.M{"group_id": groupID}).Decode(&superGroup) + return superGroup, err +} + +func (d *db.DataBases) AddUserToSuperGroup(groupID string, userIDList []string) error { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup) + session, err := d.mongoClient.StartSession() + if err != nil { + return utils.Wrap(err, "start session failed") + } + defer session.EndSession(ctx) + sCtx := mongo.NewSessionContext(ctx, session) + if err != nil { + return utils.Wrap(err, "start transaction failed") + } + _, err = c.UpdateOne(sCtx, bson.M{"group_id": groupID}, bson.M{"$addToSet": bson.M{"member_id_list": bson.M{"$each": userIDList}}}) + if err != nil { + _ = session.AbortTransaction(ctx) + return utils.Wrap(err, "transaction failed") + } + c = d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup) + var users []UserToSuperGroup + for _, v := range userIDList { + users = append(users, UserToSuperGroup{ + UserID: v, + }) + } + upsert := true + opts := &options.UpdateOptions{ + Upsert: &upsert, + } + for _, userID := range userIDList { + _, err = c.UpdateOne(sCtx, bson.M{"user_id": userID}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts) + if err != nil { + _ = session.AbortTransaction(ctx) + return utils.Wrap(err, "transaction failed") + } + } + _ = session.CommitTransaction(ctx) + return err +} + +func (d *db.DataBases) RemoverUserFromSuperGroup(groupID string, userIDList []string) error { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup) + session, err := d.mongoClient.StartSession() + if err != nil { + return utils.Wrap(err, "start session failed") + } + defer session.EndSession(ctx) + sCtx := mongo.NewSessionContext(ctx, session) + _, err = c.UpdateOne(ctx, bson.M{"group_id": groupID}, bson.M{"$pull": bson.M{"member_id_list": bson.M{"$in": userIDList}}}) + if err != nil { + _ = session.AbortTransaction(ctx) + return utils.Wrap(err, "transaction failed") + } + err = d.RemoveGroupFromUser(ctx, sCtx, groupID, userIDList) + if err != nil { + _ = session.AbortTransaction(ctx) + return utils.Wrap(err, "transaction failed") + } + _ = session.CommitTransaction(ctx) + return err +} + +func (d *db.DataBases) GetSuperGroupByUserID(userID string) (UserToSuperGroup, error) { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup) + var user UserToSuperGroup + _ = c.FindOne(ctx, bson.M{"user_id": userID}).Decode(&user) + return user, nil +} + +func (d *db.DataBases) DeleteSuperGroup(groupID string) error { + ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup) + session, err := d.mongoClient.StartSession() + if err != nil { + return utils.Wrap(err, "start session failed") + } + defer session.EndSession(ctx) + sCtx := mongo.NewSessionContext(ctx, session) + superGroup := &SuperGroup{} + result := c.FindOneAndDelete(sCtx, bson.M{"group_id": groupID}) + err = result.Decode(superGroup) + if err != nil { + session.AbortTransaction(ctx) + return utils.Wrap(err, "transaction failed") + } + if err = d.RemoveGroupFromUser(ctx, sCtx, groupID, superGroup.MemberIDList); err != nil { + session.AbortTransaction(ctx) + return utils.Wrap(err, "transaction failed") + } + session.CommitTransaction(ctx) + return nil +} + +func (d *db.DataBases) RemoveGroupFromUser(ctx, sCtx context.Context, groupID string, userIDList []string) error { + var users []UserToSuperGroup + for _, v := range userIDList { + users = append(users, UserToSuperGroup{ + UserID: v, + }) + } + c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup) + _, err := c.UpdateOne(sCtx, bson.M{"user_id": bson.M{"$in": userIDList}}, bson.M{"$pull": bson.M{"group_id_list": groupID}}) + if err != nil { + return utils.Wrap(err, "UpdateOne transaction failed") + } + return err +} \ No newline at end of file diff --git a/pkg/common/db/mysql/group_model_k.go b/pkg/common/db/mysql/group_model_k.go index a10bd2a41..987ace740 100644 --- a/pkg/common/db/mysql/group_model_k.go +++ b/pkg/common/db/mysql/group_model_k.go @@ -20,6 +20,7 @@ type GroupModelInterface interface { //mongo } + type Group struct { GroupID string `gorm:"column:group_id;primary_key;size:64" json:"groupID" binding:"required"` GroupName string `gorm:"column:name;size:255" json:"groupName"` @@ -68,11 +69,11 @@ func (*Group) UpdateByMap(ctx context.Context, groupID string, args map[string]i return utils.Wrap(GroupDB.Where("group_id = ?", groupID).Updates(args).Error, "") } -func (*Group) Update(ctx context.Context, groups []*Group) (err error) { +func (g *Group) Update(ctx context.Context, groups []*Group, tx ...*gorm.DB) (err error) { defer func() { trace_log.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groups", groups) }() - return utils.Wrap(GroupDB.Updates(&groups).Error, "") + return utils.Wrap(getDBConn(g.DB, tx...).Updates(&groups).Error, "") } func (*Group) Find(ctx context.Context, groupIDs []string) (groups []*Group, err error) {