From aff8322bc5ce2929b44666bea03e738bebad7b07 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Tue, 18 Jun 2024 21:04:03 +0800 Subject: [PATCH 01/13] fix: sort by id avoid unstable sort friends. --- pkg/common/storage/database/mgo/friend.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/common/storage/database/mgo/friend.go b/pkg/common/storage/database/mgo/friend.go index 18d80d47d..b74df1c19 100644 --- a/pkg/common/storage/database/mgo/friend.go +++ b/pkg/common/storage/database/mgo/friend.go @@ -53,7 +53,7 @@ func NewFriendMongo(db *mongo.Database) (database.Friend, error) { } func (f *FriendMgo) friendSort() any { - return bson.D{{"is_pinned", -1}, {"create_time", 1}} + return bson.D{{"is_pinned", -1}, {"create_time", 1}, {"_id", 1}} } // Create inserts multiple friend records. From 08cb4a948d63b3f88dcd5512989e4958d56e5310 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 19 Jun 2024 16:28:28 +0800 Subject: [PATCH 02/13] group --- go.mod | 6 +- go.sum | 4 +- internal/rpc/group/notification.go | 5 ++ internal/rpc/group/sync.go | 54 ++++++++++++-- pkg/common/storage/controller/group.go | 56 +++++++-------- pkg/common/storage/database/group_member.go | 1 + pkg/common/storage/database/mgo/friend.go | 70 +++++++++++++------ .../storage/database/mgo/group_member.go | 6 +- pkg/common/storage/model/friend.go | 18 ++--- 9 files changed, 148 insertions(+), 72 deletions(-) diff --git a/go.mod b/go.mod index 4ab39f751..d00f0b739 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.69-alpha.12 + github.com/openimsdk/protocol v0.0.69-alpha.15 github.com/openimsdk/tools v0.0.49-alpha.25 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 @@ -176,3 +176,7 @@ require ( golang.org/x/crypto v0.21.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) + +//replace ( +// github.com/openimsdk/protocol => /Users/chao/Desktop/project/protocol +//) diff --git a/go.sum b/go.sum index a26e59d9e..4c05500bd 100644 --- a/go.sum +++ b/go.sum @@ -270,8 +270,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0= github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.69-alpha.12 h1:3ZdwmD1y9vcduIC8o2EZS8Ds/fByqcuEFo+NkcBzgRo= -github.com/openimsdk/protocol v0.0.69-alpha.12/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.69-alpha.15 h1:iiVROC2nXTc9qPOSfE7NZckQAVCZiDxBm9yY36ULdyw= +github.com/openimsdk/protocol v0.0.69-alpha.15/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.49-alpha.25 h1:OpRPwDZ2xWX7Zj5kyfZhryu/NfZTrsRVr2GFwu1HQHI= github.com/openimsdk/tools v0.0.49-alpha.25/go.mod h1:rwsFI1G/nBHNfiNapbven41akRDPBbH4df0Cgy6xueU= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index 4fae1ed2b..5d385d8c1 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -324,6 +324,7 @@ func (g *GroupNotificationSender) GroupInfoSetNotification(ctx context.Context, if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return } + g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID) g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNotification, tips, rpcclient.WithRpcGetUserName()) } @@ -337,6 +338,7 @@ func (g *GroupNotificationSender) GroupInfoSetNameNotification(ctx context.Conte if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return } + g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID) g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNameNotification, tips) } @@ -350,6 +352,7 @@ func (g *GroupNotificationSender) GroupInfoSetAnnouncementNotification(ctx conte if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return } + g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID) g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetAnnouncementNotification, tips, rpcclient.WithRpcGetUserName()) } @@ -636,6 +639,7 @@ func (g *GroupNotificationSender) GroupMutedNotification(ctx context.Context, gr if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return } + g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, groupID) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMutedNotification, tips) } @@ -663,6 +667,7 @@ func (g *GroupNotificationSender) GroupCancelMutedNotification(ctx context.Conte if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return } + g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, groupID) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupCancelMutedNotification, tips) } diff --git a/internal/rpc/group/sync.go b/internal/rpc/group/sync.go index bd6fc6399..10d945a7e 100644 --- a/internal/rpc/group/sync.go +++ b/internal/rpc/group/sync.go @@ -4,10 +4,13 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion" "github.com/openimsdk/open-im-server/v3/pkg/authverify" + "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/util/hashutil" + "github.com/openimsdk/protocol/constant" pbgroup "github.com/openimsdk/protocol/group" "github.com/openimsdk/protocol/sdkws" + "slices" ) func (s *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgroup.GetFullGroupMemberUserIDsReq) (*pbgroup.GetFullGroupMemberUserIDsResp, error) { @@ -53,12 +56,36 @@ func (s *groupServer) GetFullJoinGroupIDs(ctx context.Context, req *pbgroup.GetF } func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgroup.GetIncrementalGroupMemberReq) (*pbgroup.GetIncrementalGroupMemberResp, error) { + group, err := s.db.TakeGroup(ctx, req.GroupID) + if err != nil { + return nil, err + } + if group.Status == constant.GroupStatusDismissed { + return nil, servererrs.ErrDismissedAlready.Wrap() + } opt := incrversion.Option[*sdkws.GroupMemberFullInfo, pbgroup.GetIncrementalGroupMemberResp]{ - Ctx: ctx, - VersionKey: req.GroupID, - VersionID: req.VersionID, - VersionNumber: req.Version, - Version: s.db.FindMemberIncrVersion, + Ctx: ctx, + VersionKey: req.GroupID, + VersionID: req.VersionID, + VersionNumber: req.Version, + Version: func(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error) { + vl, err := s.db.FindMemberIncrVersion(ctx, groupID, version, limit) + if err != nil { + return nil, err + } + var ok bool + vl.Logs = slices.DeleteFunc(vl.Logs, func(elem model.VersionLogElem) bool { + if elem.EID == "" { + ok = true + return true + } + return false + }) + if !ok { + group = nil + } + return vl, nil + }, CacheMaxVersion: s.db.FindMaxGroupMemberVersionCache, Find: func(ctx context.Context, ids []string) ([]*sdkws.GroupMemberFullInfo, error) { return s.getGroupMembersInfo(ctx, req.GroupID, ids) @@ -75,7 +102,22 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou } }, } - return opt.Build() + resp, err := opt.Build() + if err != nil { + return nil, err + } + if group != nil { + count, err := s.db.FindGroupMemberNum(ctx, group.GroupID) + if err != nil { + return nil, err + } + owner, err := s.db.TakeGroupOwner(ctx, group.GroupID) + if err != nil { + return nil, err + } + resp.Group = s.groupDB2PB(group, owner.UserID, count) + } + return resp, nil } func (s *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup.GetIncrementalJoinGroupReq) (*pbgroup.GetIncrementalJoinGroupResp, error) { diff --git a/pkg/common/storage/controller/group.go b/pkg/common/storage/controller/group.go index cb64bc73a..71bf6ead2 100644 --- a/pkg/common/storage/controller/group.go +++ b/pkg/common/storage/controller/group.go @@ -243,17 +243,10 @@ func (g *groupDatabase) UpdateGroup(ctx context.Context, groupID string, data ma if err := g.groupDB.UpdateMap(ctx, groupID, data); err != nil { return err } - userIDs, err := g.cache.GetGroupMemberIDs(ctx, groupID) - if err != nil { + if err := g.groupMemberDB.MemberGroupIncrVersion(ctx, groupID, []string{""}, model.VersionStateUpdate); err != nil { return err } - for _, userID := range userIDs { - if err := g.groupMemberDB.JoinGroupIncrVersion(ctx, userID, []string{groupID}, model.VersionStateUpdate); err != nil { - return err - } - - } - return g.cache.CloneGroupCache().DelGroupsInfo(groupID).DelMaxJoinGroupVersion(userIDs...).ChainExecDel(ctx) + return g.cache.CloneGroupCache().DelGroupsInfo(groupID).DelMaxGroupMemberVersion(groupID).ChainExecDel(ctx) }) } @@ -263,11 +256,11 @@ func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string, delete if err := g.groupDB.UpdateStatus(ctx, groupID, constant.GroupStatusDismissed); err != nil { return err } - userIDs, err := g.cache.GetGroupMemberIDs(ctx, groupID) - if err != nil { - return err - } if deleteMember { + userIDs, err := g.cache.GetGroupMemberIDs(ctx, groupID) + if err != nil { + return err + } if err := g.groupMemberDB.Delete(ctx, groupID, nil); err != nil { return err } @@ -277,13 +270,18 @@ func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string, delete DelGroupMembersHash(groupID). DelGroupAllRoleLevel(groupID). DelGroupMembersInfo(groupID, userIDs...). - DelMaxGroupMemberVersion(groupID) - } - c = c.DelMaxJoinGroupVersion(userIDs...) - if len(userIDs) > 0 { - if err := g.groupMemberDB.JoinGroupIncrVersion(ctx, groupID, userIDs, model.VersionStateDelete); err != nil { + DelMaxGroupMemberVersion(groupID). + DelMaxJoinGroupVersion(userIDs...) + for _, userID := range userIDs { + if err := g.groupMemberDB.JoinGroupIncrVersion(ctx, userID, []string{groupID}, model.VersionStateDelete); err != nil { + return err + } + } + } else { + if err := g.groupMemberDB.MemberGroupIncrVersion(ctx, groupID, []string{""}, model.VersionStateUpdate); err != nil { return err } + c = c.DelMaxGroupMemberVersion(groupID) } return c.DelGroupsInfo(groupID).ChainExecDel(ctx) }) @@ -411,12 +409,16 @@ func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string, return c.DelGroupMembersInfo(groupID, oldOwnerUserID, newOwnerUserID). DelGroupAllRoleLevel(groupID). DelGroupMembersHash(groupID). - DelJoinedGroupID(oldOwnerUserID, newOwnerUserID). + DelMaxGroupMemberVersion(groupID). + DelGroupMemberIDs(groupID). ChainExecDel(ctx) }) } func (g *groupDatabase) UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error { + if len(data) == 0 { + return nil + } return g.ctxTx.Transaction(ctx, func(ctx context.Context) error { if err := g.groupMemberDB.Update(ctx, groupID, userID, data); err != nil { return err @@ -424,9 +426,9 @@ func (g *groupDatabase) UpdateGroupMember(ctx context.Context, groupID string, u c := g.cache.CloneGroupCache() c = c.DelGroupMembersInfo(groupID, userID) if g.groupMemberDB.IsUpdateRoleLevel(data) { - c = c.DelGroupAllRoleLevel(groupID) + c = c.DelGroupAllRoleLevel(groupID).DelGroupMemberIDs(groupID) } - c = c.DelMaxGroupMemberVersion(groupID).DelMaxJoinGroupVersion(userID) + c = c.DelMaxGroupMemberVersion(groupID) return c.ChainExecDel(ctx) }) } @@ -439,9 +441,9 @@ func (g *groupDatabase) UpdateGroupMembers(ctx context.Context, data []*common.B return err } if g.groupMemberDB.IsUpdateRoleLevel(item.Map) { - c = c.DelGroupAllRoleLevel(item.GroupID) + c = c.DelGroupAllRoleLevel(item.GroupID).DelGroupMemberIDs(item.GroupID) } - c = c.DelGroupMembersInfo(item.GroupID, item.UserID).DelGroupMembersHash(item.GroupID) + c = c.DelGroupMembersInfo(item.GroupID, item.UserID).DelMaxGroupMemberVersion(item.GroupID).DelGroupMembersHash(item.GroupID) } return c.ChainExecDel(ctx) }) @@ -501,14 +503,6 @@ func (g *groupDatabase) FindJoinIncrVersion(ctx context.Context, userID string, return g.groupMemberDB.FindJoinIncrVersion(ctx, userID, version, limit) } -//func (g *groupDatabase) FindSortGroupMemberUserIDs(ctx context.Context, groupID string) ([]string, error) { -// return g.cache.FindSortGroupMemberUserIDs(ctx, groupID) -//} -// -//func (g *groupDatabase) FindSortJoinGroupIDs(ctx context.Context, userID string) ([]string, error) { -// return g.cache.FindSortJoinGroupIDs(ctx, userID) -//} - func (g *groupDatabase) FindMaxGroupMemberVersionCache(ctx context.Context, groupID string) (*model.VersionLog, error) { return g.cache.FindMaxGroupMemberVersion(ctx, groupID) } diff --git a/pkg/common/storage/database/group_member.go b/pkg/common/storage/database/group_member.go index 0051d694f..c49319649 100644 --- a/pkg/common/storage/database/group_member.go +++ b/pkg/common/storage/database/group_member.go @@ -35,6 +35,7 @@ type GroupMember interface { FindUserManagedGroupID(ctx context.Context, userID string) (groupIDs []string, err error) IsUpdateRoleLevel(data map[string]any) bool JoinGroupIncrVersion(ctx context.Context, userID string, groupIDs []string, state int32) error + MemberGroupIncrVersion(ctx context.Context, groupID string, userIDs []string, state int32) error FindMemberIncrVersion(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error) FindJoinIncrVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) } diff --git a/pkg/common/storage/database/mgo/friend.go b/pkg/common/storage/database/mgo/friend.go index b74df1c19..4c3c4b97b 100644 --- a/pkg/common/storage/database/mgo/friend.go +++ b/pkg/common/storage/database/mgo/friend.go @@ -18,6 +18,8 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "go.mongodb.org/mongo-driver/bson/primitive" + "time" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/pagination" @@ -53,11 +55,19 @@ func NewFriendMongo(db *mongo.Database) (database.Friend, error) { } func (f *FriendMgo) friendSort() any { - return bson.D{{"is_pinned", -1}, {"create_time", 1}, {"_id", 1}} + return bson.D{{"is_pinned", -1}, {"_id", 1}} } // Create inserts multiple friend records. func (f *FriendMgo) Create(ctx context.Context, friends []*model.Friend) error { + for i, friend := range friends { + if friend.ID.IsZero() { + friends[i].ID = primitive.NewObjectID() + } + if friend.CreateTime.IsZero() { + friends[i].CreateTime = time.Now() + } + } return mongoutil.IncrVersion(func() error { return mongoutil.InsertMany(ctx, f.coll, friends) }, func() error { @@ -108,13 +118,43 @@ func (f *FriendMgo) UpdateRemark(ctx context.Context, ownerUserID, friendUserID, return f.UpdateByMap(ctx, ownerUserID, friendUserID, map[string]any{"remark": remark}) } +func (f *FriendMgo) fillTime(friends ...*model.Friend) { + for i, friend := range friends { + if friend.CreateTime.IsZero() { + friends[i].CreateTime = friend.ID.Timestamp() + } + } +} + +func (f *FriendMgo) findOne(ctx context.Context, filter any) (*model.Friend, error) { + friend, err := mongoutil.FindOne[*model.Friend](ctx, f.coll, filter) + if err != nil { + return nil, err + } + f.fillTime(friend) + return friend, nil +} + +func (f *FriendMgo) find(ctx context.Context, filter any) ([]*model.Friend, error) { + friends, err := mongoutil.Find[*model.Friend](ctx, f.coll, filter) + if err != nil { + return nil, err + } + f.fillTime(friends...) + return friends, nil +} + +func (f *FriendMgo) findPage(ctx context.Context, filter any, pagination pagination.Pagination, opts ...*options.FindOptions) (int64, []*model.Friend, error) { + return mongoutil.FindPage[*model.Friend](ctx, f.coll, filter, pagination, opts...) +} + // Take retrieves a single friend document. Returns an error if not found. func (f *FriendMgo) Take(ctx context.Context, ownerUserID, friendUserID string) (*model.Friend, error) { filter := bson.M{ "owner_user_id": ownerUserID, "friend_user_id": friendUserID, } - return mongoutil.FindOne[*model.Friend](ctx, f.coll, filter) + return f.findOne(ctx, filter) } // FindUserState finds the friendship status between two users. @@ -125,7 +165,7 @@ func (f *FriendMgo) FindUserState(ctx context.Context, userID1, userID2 string) {"owner_user_id": userID2, "friend_user_id": userID1}, }, } - return mongoutil.Find[*model.Friend](ctx, f.coll, filter) + return f.find(ctx, filter) } // FindFriends retrieves a list of friends for a given owner. Missing friends do not cause an error. @@ -134,7 +174,7 @@ func (f *FriendMgo) FindFriends(ctx context.Context, ownerUserID string, friendU "owner_user_id": ownerUserID, "friend_user_id": bson.M{"$in": friendUserIDs}, } - return mongoutil.Find[*model.Friend](ctx, f.coll, filter) + return f.find(ctx, filter) } // FindReversalFriends finds users who have added the specified user as a friend. @@ -143,14 +183,14 @@ func (f *FriendMgo) FindReversalFriends(ctx context.Context, friendUserID string "owner_user_id": bson.M{"$in": ownerUserIDs}, "friend_user_id": friendUserID, } - return mongoutil.Find[*model.Friend](ctx, f.coll, filter) + return f.find(ctx, filter) } // FindOwnerFriends retrieves a paginated list of friends for a given owner. func (f *FriendMgo) FindOwnerFriends(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (int64, []*model.Friend, error) { filter := bson.M{"owner_user_id": ownerUserID} opt := options.Find().SetSort(f.friendSort()) - return mongoutil.FindPage[*model.Friend](ctx, f.coll, filter, pagination, opt) + return f.findPage(ctx, filter, pagination, opt) } func (f *FriendMgo) FindOwnerFriendUserIds(ctx context.Context, ownerUserID string, limit int) ([]string, error) { @@ -162,7 +202,8 @@ func (f *FriendMgo) FindOwnerFriendUserIds(ctx context.Context, ownerUserID stri // FindInWhoseFriends finds users who have added the specified user as a friend, with pagination. func (f *FriendMgo) FindInWhoseFriends(ctx context.Context, friendUserID string, pagination pagination.Pagination) (int64, []*model.Friend, error) { filter := bson.M{"friend_user_id": friendUserID} - return mongoutil.FindPage[*model.Friend](ctx, f.coll, filter, pagination) + opt := options.Find().SetSort(f.friendSort()) + return f.findPage(ctx, filter, pagination, opt) } // FindFriendUserIDs retrieves a list of friend user IDs for a given owner. @@ -203,18 +244,3 @@ func (f *FriendMgo) FindFriendUserID(ctx context.Context, friendUserID string) ( } return mongoutil.Find[string](ctx, f.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1}).SetSort(f.friendSort())) } - -//func (f *FriendMgo) SearchFriend(ctx context.Context, ownerUserID, keyword string, pagination pagination.Pagination) (int64, []*model.Friend, error) { -// filter := bson.M{ -// "owner_user_id": ownerUserID, -// } -// if keyword != "" { -// filter["$or"] = []bson.M{ -// {"remark": bson.M{"$regex": keyword, "$options": "i"}}, -// {"nickname": bson.M{"$regex": keyword, "$options": "i"}}, -// {"friend_user_id": bson.M{"$regex": keyword, "$options": "i"}}, -// } -// } -// opt := options.Find().SetSort(f.friendSort()) -// return mongoutil.FindPage[*model.Friend](ctx, f.coll, filter, pagination, opt) -//} diff --git a/pkg/common/storage/database/mgo/group_member.go b/pkg/common/storage/database/mgo/group_member.go index 95c6e51a6..99a5a3ed2 100644 --- a/pkg/common/storage/database/mgo/group_member.go +++ b/pkg/common/storage/database/mgo/group_member.go @@ -117,8 +117,6 @@ func (g *GroupMemberMgo) UpdateRoleLevel(ctx context.Context, groupID string, us return g.Update(ctx, groupID, userID, bson.M{"role_level": roleLevel}) }, func() error { return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate) - }, func() error { - return g.join.IncrVersion(ctx, userID, []string{groupID}, model.VersionStateUpdate) }) } @@ -184,6 +182,10 @@ func (g *GroupMemberMgo) JoinGroupIncrVersion(ctx context.Context, userID string return g.join.IncrVersion(ctx, userID, groupIDs, state) } +func (g *GroupMemberMgo) MemberGroupIncrVersion(ctx context.Context, groupID string, userIDs []string, state int32) error { + return g.member.IncrVersion(ctx, groupID, userIDs, state) +} + func (g *GroupMemberMgo) FindMemberIncrVersion(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error) { return g.member.FindChangeLog(ctx, groupID, version, limit) } diff --git a/pkg/common/storage/model/friend.go b/pkg/common/storage/model/friend.go index 60a40d9c2..abcca2f2b 100644 --- a/pkg/common/storage/model/friend.go +++ b/pkg/common/storage/model/friend.go @@ -15,17 +15,19 @@ package model import ( + "go.mongodb.org/mongo-driver/bson/primitive" "time" ) // Friend represents the data structure for a friend relationship in MongoDB. type Friend struct { - OwnerUserID string `bson:"owner_user_id"` - FriendUserID string `bson:"friend_user_id"` - Remark string `bson:"remark"` - CreateTime time.Time `bson:"create_time"` - AddSource int32 `bson:"add_source"` - OperatorUserID string `bson:"operator_user_id"` - Ex string `bson:"ex"` - IsPinned bool `bson:"is_pinned"` + ID primitive.ObjectID `bson:"_id"` + OwnerUserID string `bson:"owner_user_id"` + FriendUserID string `bson:"friend_user_id"` + Remark string `bson:"remark"` + CreateTime time.Time `bson:"create_time"` + AddSource int32 `bson:"add_source"` + OperatorUserID string `bson:"operator_user_id"` + Ex string `bson:"ex"` + IsPinned bool `bson:"is_pinned"` } From f613c6dfc9933e817e2b96a8e8831eb23543a75c Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 19 Jun 2024 16:52:44 +0800 Subject: [PATCH 03/13] group --- internal/rpc/group/sync.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/internal/rpc/group/sync.go b/internal/rpc/group/sync.go index 10d945a7e..3793d0c7e 100644 --- a/internal/rpc/group/sync.go +++ b/internal/rpc/group/sync.go @@ -63,6 +63,7 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou if group.Status == constant.GroupStatusDismissed { return nil, servererrs.ErrDismissedAlready.Wrap() } + var hasGroupUpdate bool opt := incrversion.Option[*sdkws.GroupMemberFullInfo, pbgroup.GetIncrementalGroupMemberResp]{ Ctx: ctx, VersionKey: req.GroupID, @@ -73,17 +74,14 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou if err != nil { return nil, err } - var ok bool vl.Logs = slices.DeleteFunc(vl.Logs, func(elem model.VersionLogElem) bool { if elem.EID == "" { - ok = true + vl.LogLen-- + hasGroupUpdate = true return true } return false }) - if !ok { - group = nil - } return vl, nil }, CacheMaxVersion: s.db.FindMaxGroupMemberVersionCache, @@ -106,7 +104,7 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou if err != nil { return nil, err } - if group != nil { + if resp.Full || hasGroupUpdate { count, err := s.db.FindGroupMemberNum(ctx, group.GroupID) if err != nil { return nil, err From 1ee33f36f71dba904cfe8dd75d02a84fcd347743 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 19 Jun 2024 17:38:30 +0800 Subject: [PATCH 04/13] group --- go.mod | 2 +- go.sum | 4 ++-- internal/rpc/group/notification.go | 9 +++++++-- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index d00f0b739..34987958f 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.69-alpha.15 + github.com/openimsdk/protocol v0.0.69-alpha.16 github.com/openimsdk/tools v0.0.49-alpha.25 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 4c05500bd..3ab5684a9 100644 --- a/go.sum +++ b/go.sum @@ -270,8 +270,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0= github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.69-alpha.15 h1:iiVROC2nXTc9qPOSfE7NZckQAVCZiDxBm9yY36ULdyw= -github.com/openimsdk/protocol v0.0.69-alpha.15/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.69-alpha.16 h1:ciSqm2rjBdpScpkQm3wPjAFv0YbIRp8MITRkDZWVv6c= +github.com/openimsdk/protocol v0.0.69-alpha.16/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.49-alpha.25 h1:OpRPwDZ2xWX7Zj5kyfZhryu/NfZTrsRVr2GFwu1HQHI= github.com/openimsdk/tools v0.0.49-alpha.25/go.mod h1:rwsFI1G/nBHNfiNapbven41akRDPBbH4df0Cgy6xueU= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index 5d385d8c1..ee4f00ad3 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -477,11 +477,16 @@ func (g *GroupNotificationSender) GroupOwnerTransferredNotification(ctx context. } opUserID := mcontext.GetOpUserID(ctx) var member map[string]*sdkws.GroupMemberFullInfo - member, err = g.getGroupMemberMap(ctx, req.GroupID, []string{opUserID, req.NewOwnerUserID}) + member, err = g.getGroupMemberMap(ctx, req.GroupID, []string{opUserID, req.NewOwnerUserID, req.OldOwnerUserID}) if err != nil { return } - tips := &sdkws.GroupOwnerTransferredTips{Group: group, OpUser: member[opUserID], NewGroupOwner: member[req.NewOwnerUserID]} + tips := &sdkws.GroupOwnerTransferredTips{ + Group: group, + OpUser: member[opUserID], + NewGroupOwner: member[req.NewOwnerUserID], + OldGroupOwnerInfo: member[req.OldOwnerUserID], + } if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return } From 924888dbd125de298cb2c4ea74d4307d21f57123 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Wed, 19 Jun 2024 20:17:28 +0800 Subject: [PATCH 05/13] fix: sort by id avoid unstable sort friends. --- pkg/common/storage/database/mgo/version_log.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/common/storage/database/mgo/version_log.go b/pkg/common/storage/database/mgo/version_log.go index 1d70f96f5..8836742f0 100644 --- a/pkg/common/storage/database/mgo/version_log.go +++ b/pkg/common/storage/database/mgo/version_log.go @@ -8,6 +8,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" @@ -169,7 +170,9 @@ func (l *VersionLogMgo) FindChangeLog(ctx context.Context, dId string, version u } else if !errors.Is(err, mongo.ErrNoDocuments) { return nil, err } + log.ZDebug(ctx, "init doc", "dId", dId) if res, err := l.initDoc(ctx, dId, nil, 0, time.Now()); err == nil { + log.ZDebug(ctx, "init doc success", "dId", dId) return res, nil } else if mongo.IsDuplicateKeyError(err) { return l.findChangeLog(ctx, dId, version, limit) From 5006af5b54ba47db5ada3b06b1298836dc5ee064 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Wed, 19 Jun 2024 20:28:09 +0800 Subject: [PATCH 06/13] fix: sort by id avoid unstable sort friends. --- pkg/common/storage/database/mgo/group_member.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/common/storage/database/mgo/group_member.go b/pkg/common/storage/database/mgo/group_member.go index 99a5a3ed2..30f8d63b9 100644 --- a/pkg/common/storage/database/mgo/group_member.go +++ b/pkg/common/storage/database/mgo/group_member.go @@ -18,6 +18,7 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/log" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/db/mongoutil" @@ -187,9 +188,11 @@ func (g *GroupMemberMgo) MemberGroupIncrVersion(ctx context.Context, groupID str } func (g *GroupMemberMgo) FindMemberIncrVersion(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error) { + log.ZDebug(ctx, "find member incr version", "groupID", groupID, "version", version) return g.member.FindChangeLog(ctx, groupID, version, limit) } func (g *GroupMemberMgo) FindJoinIncrVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) { + log.ZDebug(ctx, "find join incr version", "userID", userID, "version", version) return g.join.FindChangeLog(ctx, userID, version, limit) } From 4f359b733556290ca137b4529bc74bd7ea11c59d Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Wed, 19 Jun 2024 20:35:32 +0800 Subject: [PATCH 07/13] fix: sort by id avoid unstable sort friends. --- pkg/common/storage/cache/redis/group.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/common/storage/cache/redis/group.go b/pkg/common/storage/cache/redis/group.go index c4598567d..589678c50 100644 --- a/pkg/common/storage/cache/redis/group.go +++ b/pkg/common/storage/cache/redis/group.go @@ -446,7 +446,7 @@ func (g *GroupCacheRedis) DelMaxJoinGroupVersion(userIDs ...string) cache.GroupC func (g *GroupCacheRedis) FindMaxGroupMemberVersion(ctx context.Context, groupID string) (*model.VersionLog, error) { return getCache(ctx, g.rcClient, g.getGroupMemberMaxVersionKey(groupID), g.expireTime, func(ctx context.Context) (*model.VersionLog, error) { - return g.groupMemberDB.FindJoinIncrVersion(ctx, groupID, 0, 0) + return g.groupMemberDB.FindMemberIncrVersion(ctx, groupID, 0, 0) }) } From 03d4564596c746f81ab1e95e61a3cc6532aeace1 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Thu, 20 Jun 2024 14:55:59 +0800 Subject: [PATCH 08/13] user version --- go.mod | 6 +- go.sum | 4 +- internal/rpc/friend/sync.go | 16 +++ internal/rpc/group/group.go | 6 +- internal/rpc/user/user.go | 122 +++++++++++++++------- pkg/common/storage/controller/friend.go | 9 +- pkg/common/storage/controller/group.go | 8 ++ pkg/common/storage/database/friend.go | 2 + pkg/common/storage/database/mgo/friend.go | 4 + 9 files changed, 129 insertions(+), 48 deletions(-) diff --git a/go.mod b/go.mod index 34987958f..2614e0f32 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.69-alpha.16 + github.com/openimsdk/protocol v0.0.69-alpha.17 github.com/openimsdk/tools v0.0.49-alpha.25 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 @@ -177,6 +177,4 @@ require ( gopkg.in/ini.v1 v1.67.0 // indirect ) -//replace ( -// github.com/openimsdk/protocol => /Users/chao/Desktop/project/protocol -//) +//replace github.com/openimsdk/protocol => /Users/chao/Desktop/project/protocol diff --git a/go.sum b/go.sum index 3ab5684a9..fe4f0c390 100644 --- a/go.sum +++ b/go.sum @@ -270,8 +270,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0= github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.69-alpha.16 h1:ciSqm2rjBdpScpkQm3wPjAFv0YbIRp8MITRkDZWVv6c= -github.com/openimsdk/protocol v0.0.69-alpha.16/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.69-alpha.17 h1:pEag4ZdlovE+AyLsw1VYFU/3sk6ayvGdPzgufQfKf9M= +github.com/openimsdk/protocol v0.0.69-alpha.17/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.49-alpha.25 h1:OpRPwDZ2xWX7Zj5kyfZhryu/NfZTrsRVr2GFwu1HQHI= github.com/openimsdk/tools v0.0.49-alpha.25/go.mod h1:rwsFI1G/nBHNfiNapbven41akRDPBbH4df0Cgy6xueU= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= diff --git a/internal/rpc/friend/sync.go b/internal/rpc/friend/sync.go index faaa987f2..684894609 100644 --- a/internal/rpc/friend/sync.go +++ b/internal/rpc/friend/sync.go @@ -11,6 +11,22 @@ import ( "github.com/openimsdk/protocol/relation" ) +func (s *friendServer) NotificationUserInfoUpdate(ctx context.Context, req *relation.NotificationUserInfoUpdateReq) (*relation.NotificationUserInfoUpdateResp, error) { + userIDs, err := s.db.FindFriendUserIDs(ctx, req.UserID) + if err != nil { + return nil, err + } + for _, userID := range userIDs { + if err := s.db.OwnerIncrVersion(ctx, userID, []string{req.UserID}, model.VersionStateUpdate); err != nil { + return nil, err + } + } + for _, userID := range userIDs { + s.notificationSender.FriendInfoUpdatedNotification(ctx, req.UserID, userID) + } + return &relation.NotificationUserInfoUpdateResp{}, nil +} + func (s *friendServer) GetFullFriendUserIDs(ctx context.Context, req *relation.GetFullFriendUserIDsReq) (*relation.GetFullFriendUserIDsResp, error) { vl, err := s.db.FindMaxFriendVersionCache(ctx, req.UserID) if err != nil { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 4d429b3d3..5b82b8ceb 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -132,13 +132,17 @@ func (s *groupServer) NotificationUserInfoUpdate(ctx context.Context, req *pbgro } groupIDs = append(groupIDs, member.GroupID) } + for _, groupID := range groupIDs { + if err := s.db.MemberGroupIncrVersion(ctx, groupID, []string{req.UserID}, model.VersionStateUpdate); err != nil { + return nil, err + } + } for _, groupID := range groupIDs { s.notification.GroupMemberInfoSetNotification(ctx, groupID, req.UserID) } if err = s.db.DeleteGroupMemberHash(ctx, groupIDs); err != nil { return nil, err } - return &pbgroup.NotificationUserInfoUpdateResp{}, nil } diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 96f784fab..211b360b7 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -16,10 +16,7 @@ package user import ( "context" - "math/rand" - "strings" - "time" - + "errors" "github.com/openimsdk/open-im-server/v3/internal/rpc/friend" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" @@ -27,7 +24,13 @@ import ( tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/localcache" + "github.com/openimsdk/protocol/group" + friendpb "github.com/openimsdk/protocol/relation" "github.com/openimsdk/tools/db/redisutil" + "math/rand" + "strings" + "sync" + "time" "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" @@ -120,7 +123,8 @@ func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbuser.GetDesig // deprecated: -// UpdateUserInfo +//UpdateUserInfo + func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfoReq) (resp *pbuser.UpdateUserInfoResp, err error) { resp = &pbuser.UpdateUserInfoResp{} err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID, s.config.Share.IMAdminUserID) @@ -131,31 +135,33 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI if err := s.webhookBeforeUpdateUserInfo(ctx, &s.config.WebhooksConfig.BeforeUpdateUserInfo, req); err != nil { return nil, err } - data := convert.UserPb2DBMap(req.UserInfo) - if err := s.db.UpdateByMap(ctx, req.UserInfo.UserID, data); err != nil { - return nil, err - } - s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID) - friends, err := s.friendRpcClient.GetFriendIDs(ctx, req.UserInfo.UserID) + oldUser, err := s.db.GetUserByID(ctx, req.UserInfo.UserID) if err != nil { return nil, err } - if req.UserInfo.Nickname != "" || req.UserInfo.FaceURL != "" { - if err = s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil { - return nil, err - } - } - for _, friendID := range friends { - s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID) + if err := s.db.UpdateByMap(ctx, req.UserInfo.UserID, data); err != nil { + return nil, err } + s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID) + //friends, err := s.friendRpcClient.GetFriendIDs(ctx, req.UserInfo.UserID) + //if err != nil { + // return nil, err + //} + //if req.UserInfo.Nickname != "" || req.UserInfo.FaceURL != "" { + // if err = s.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID,oldUser); err != nil { + // return nil, err + // } + //} + //for _, friendID := range friends { + // s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID) + //} s.webhookAfterUpdateUserInfo(ctx, &s.config.WebhooksConfig.AfterUpdateUserInfo, req) - if err = s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil { + if err = s.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID, oldUser); err != nil { return nil, err } return resp, nil } - func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUserInfoExReq) (resp *pbuser.UpdateUserInfoExResp, err error) { resp = &pbuser.UpdateUserInfoExResp{} err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID, s.config.Share.IMAdminUserID) @@ -165,30 +171,33 @@ func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUse if err = s.webhookBeforeUpdateUserInfoEx(ctx, &s.config.WebhooksConfig.BeforeUpdateUserInfoEx, req); err != nil { return nil, err } + oldUser, err := s.db.GetUserByID(ctx, req.UserInfo.UserID) + if err != nil { + return nil, err + } data := convert.UserPb2DBMapEx(req.UserInfo) if err = s.db.UpdateByMap(ctx, req.UserInfo.UserID, data); err != nil { return nil, err } s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID) - friends, err := s.friendRpcClient.GetFriendIDs(ctx, req.UserInfo.UserID) - if err != nil { - return nil, err - } - if req.UserInfo.Nickname != nil || req.UserInfo.FaceURL != nil { - if err := s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil { - return nil, err - } - } - for _, friendID := range friends { - s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID) - } + //friends, err := s.friendRpcClient.GetFriendIDs(ctx, req.UserInfo.UserID) + //if err != nil { + // return nil, err + //} + //if req.UserInfo.Nickname != nil || req.UserInfo.FaceURL != nil { + // if err := s.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil { + // return nil, err + // } + //} + //for _, friendID := range friends { + // s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID) + //} s.webhookAfterUpdateUserInfoEx(ctx, &s.config.WebhooksConfig.AfterUpdateUserInfoEx, req) - if err := s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil { + if err := s.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID, oldUser); err != nil { return nil, err } return resp, nil } - func (s *userServer) SetGlobalRecvMessageOpt(ctx context.Context, req *pbuser.SetGlobalRecvMessageOptReq) (resp *pbuser.SetGlobalRecvMessageOptResp, err error) { resp = &pbuser.SetGlobalRecvMessageOptResp{} if _, err := s.db.FindWithError(ctx, []string{req.UserID}); err != nil { @@ -247,6 +256,7 @@ func (s *userServer) GetPaginationUsers(ctx context.Context, req *pbuser.GetPagi return &pbuser.GetPaginationUsersResp{Total: int32(total), Users: convert.UsersDB2Pb(users)}, err } + } func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterReq) (resp *pbuser.UserRegisterResp, err error) { @@ -343,8 +353,7 @@ func (s *userServer) SubscribeOrCancelUsersStatus(ctx context.Context, req *pbus // GetUserStatus Get the online status of the user. func (s *userServer) GetUserStatus(ctx context.Context, req *pbuser.GetUserStatusReq) (resp *pbuser.GetUserStatusResp, - err error, -) { + err error) { onlineStatusList, err := s.db.GetUserStatus(ctx, req.UserIDs) if err != nil { return nil, err @@ -354,8 +363,7 @@ func (s *userServer) GetUserStatus(ctx context.Context, req *pbuser.GetUserStatu // SetUserStatus Synchronize user's online status. func (s *userServer) SetUserStatus(ctx context.Context, req *pbuser.SetUserStatusReq) (resp *pbuser.SetUserStatusResp, - err error, -) { + err error) { err = s.db.SetUserStatus(ctx, req.UserID, req.Status, req.PlatformID) if err != nil { return nil, err @@ -379,8 +387,7 @@ func (s *userServer) SetUserStatus(ctx context.Context, req *pbuser.SetUserStatu // GetSubscribeUsersStatus Get the online status of subscribers. func (s *userServer) GetSubscribeUsersStatus(ctx context.Context, - req *pbuser.GetSubscribeUsersStatusReq, -) (*pbuser.GetSubscribeUsersStatusResp, error) { + req *pbuser.GetSubscribeUsersStatusReq) (*pbuser.GetSubscribeUsersStatusResp, error) { userList, err := s.db.GetAllSubscribeList(ctx, req.UserID) if err != nil { return nil, err @@ -469,6 +476,7 @@ func (s *userServer) ProcessUserCommandUpdate(ctx context.Context, req *pbuser.P } func (s *userServer) ProcessUserCommandGet(ctx context.Context, req *pbuser.ProcessUserCommandGetReq) (*pbuser.ProcessUserCommandGetResp, error) { + err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID) if err != nil { return nil, err @@ -687,6 +695,40 @@ func (s *userServer) userModelToResp(users []*tablerelation.User, pagination pag return &pbuser.SearchNotificationAccountResp{Total: total, NotificationAccounts: notificationAccounts} } +func (s *userServer) NotificationUserInfoUpdate(ctx context.Context, userID string, oldUser *tablerelation.User) error { + user, err := s.db.GetUserByID(ctx, userID) + if err != nil { + return err + } + if user.Nickname == oldUser.Nickname && user.FaceURL == oldUser.FaceURL { + return nil + } + oldUserInfo := convert.UserDB2Pb(oldUser) + newUserInfo := convert.UserDB2Pb(user) + var wg sync.WaitGroup + var es [2]error + wg.Add(len(es)) + go func() { + defer wg.Done() + _, es[0] = s.groupRpcClient.Client.NotificationUserInfoUpdate(ctx, &group.NotificationUserInfoUpdateReq{ + UserID: userID, + OldUserInfo: oldUserInfo, + NewUserInfo: newUserInfo, + }) + }() + + go func() { + defer wg.Done() + _, es[1] = s.friendRpcClient.Client.NotificationUserInfoUpdate(ctx, &friendpb.NotificationUserInfoUpdateReq{ + UserID: userID, + OldUserInfo: oldUserInfo, + NewUserInfo: newUserInfo, + }) + }() + wg.Wait() + return errors.Join(es[:]...) +} + func (s *userServer) SortQuery(ctx context.Context, req *pbuser.SortQueryReq) (*pbuser.SortQueryResp, error) { users, err := s.db.SortQuery(ctx, req.UserIDName, req.Asc) if err != nil { diff --git a/pkg/common/storage/controller/friend.go b/pkg/common/storage/controller/friend.go index 42c230598..4d3a96f12 100644 --- a/pkg/common/storage/controller/friend.go +++ b/pkg/common/storage/controller/friend.go @@ -86,7 +86,7 @@ type FriendDatabase interface { FindFriendUserID(ctx context.Context, friendUserID string) ([]string, error) - //SearchFriend(ctx context.Context, ownerUserID, keyword string, pagination pagination.Pagination) (int64, []*model.Friend, error) + OwnerIncrVersion(ctx context.Context, ownerUserID string, friendUserIDs []string, state int32) error } type friendDatabase struct { @@ -379,3 +379,10 @@ func (f *friendDatabase) FindFriendUserID(ctx context.Context, friendUserID stri //func (f *friendDatabase) SearchFriend(ctx context.Context, ownerUserID, keyword string, pagination pagination.Pagination) (int64, []*model.Friend, error) { // return f.friend.SearchFriend(ctx, ownerUserID, keyword, pagination) //} + +func (f *friendDatabase) OwnerIncrVersion(ctx context.Context, ownerUserID string, friendUserIDs []string, state int32) error { + if err := f.friend.IncrVersion(ctx, ownerUserID, friendUserIDs, state); err != nil { + return err + } + return f.cache.DelMaxFriendVersion(ownerUserID).ChainExecDel(ctx) +} diff --git a/pkg/common/storage/controller/group.go b/pkg/common/storage/controller/group.go index 71bf6ead2..27b6f4366 100644 --- a/pkg/common/storage/controller/group.go +++ b/pkg/common/storage/controller/group.go @@ -109,6 +109,7 @@ type GroupDatabase interface { FindMemberIncrVersion(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error) FindJoinIncrVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) + MemberGroupIncrVersion(ctx context.Context, groupID string, userIDs []string, state int32) error //FindSortGroupMemberUserIDs(ctx context.Context, groupID string) ([]string, error) //FindSortJoinGroupIDs(ctx context.Context, userID string) ([]string, error) @@ -518,3 +519,10 @@ func (g *groupDatabase) SearchJoinGroup(ctx context.Context, userID string, keyw } return g.groupDB.SearchJoin(ctx, groupIDs, keyword, pagination) } + +func (g *groupDatabase) MemberGroupIncrVersion(ctx context.Context, groupID string, userIDs []string, state int32) error { + if err := g.groupMemberDB.MemberGroupIncrVersion(ctx, groupID, userIDs, state); err != nil { + return err + } + return g.cache.DelMaxGroupMemberVersion(groupID).ChainExecDel(ctx) +} diff --git a/pkg/common/storage/database/friend.go b/pkg/common/storage/database/friend.go index 1c9cd1033..b596411fc 100644 --- a/pkg/common/storage/database/friend.go +++ b/pkg/common/storage/database/friend.go @@ -55,4 +55,6 @@ type Friend interface { //SearchFriend(ctx context.Context, ownerUserID, keyword string, pagination pagination.Pagination) (int64, []*model.Friend, error) FindOwnerFriendUserIds(ctx context.Context, ownerUserID string, limit int) ([]string, error) + + IncrVersion(ctx context.Context, ownerUserID string, friendUserIDs []string, state int32) error } diff --git a/pkg/common/storage/database/mgo/friend.go b/pkg/common/storage/database/mgo/friend.go index 4c3c4b97b..7f456fbda 100644 --- a/pkg/common/storage/database/mgo/friend.go +++ b/pkg/common/storage/database/mgo/friend.go @@ -244,3 +244,7 @@ func (f *FriendMgo) FindFriendUserID(ctx context.Context, friendUserID string) ( } return mongoutil.Find[string](ctx, f.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1}).SetSort(f.friendSort())) } + +func (f *FriendMgo) IncrVersion(ctx context.Context, ownerUserID string, friendUserIDs []string, state int32) error { + return f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, state) +} From 233c2f586cf8ee4a70617256fc4830266f4b7ff0 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Fri, 21 Jun 2024 09:56:19 +0800 Subject: [PATCH 09/13] fix: sort by id avoid unstable sort friends. --- go.mod | 2 +- go.sum | 4 ++-- internal/push/push_handler.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 2614e0f32..ce21fb281 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.69-alpha.17 - github.com/openimsdk/tools v0.0.49-alpha.25 + github.com/openimsdk/tools v0.0.49-alpha.27 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index fe4f0c390..829dedf0b 100644 --- a/go.sum +++ b/go.sum @@ -272,8 +272,8 @@ github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJ github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.69-alpha.17 h1:pEag4ZdlovE+AyLsw1VYFU/3sk6ayvGdPzgufQfKf9M= github.com/openimsdk/protocol v0.0.69-alpha.17/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.49-alpha.25 h1:OpRPwDZ2xWX7Zj5kyfZhryu/NfZTrsRVr2GFwu1HQHI= -github.com/openimsdk/tools v0.0.49-alpha.25/go.mod h1:rwsFI1G/nBHNfiNapbven41akRDPBbH4df0Cgy6xueU= +github.com/openimsdk/tools v0.0.49-alpha.27 h1:y1hkksh5HVUF/hHTfmVz0U6fIltP+GCYHT0vU+9sC5Q= +github.com/openimsdk/tools v0.0.49-alpha.27/go.mod h1:rwsFI1G/nBHNfiNapbven41akRDPBbH4df0Cgy6xueU= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 03c299b7a..dfe0e7b55 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -180,7 +180,7 @@ func (c *ConsumerHandler) shouldPushOffline(_ context.Context, msg *sdkws.MsgDat } func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { - log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) + log.ZDebug(ctx, "Get group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) var pushToUserIDs []string if err = c.webhookBeforeGroupOnlinePush(ctx, &c.config.WebhooksConfig.BeforeGroupOnlinePush, groupID, msg, &pushToUserIDs); err != nil { From f5721514750612e1095ac86e468e97e5a32b6ddd Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Fri, 21 Jun 2024 14:32:00 +0800 Subject: [PATCH 10/13] test: test log add. --- internal/rpc/group/group.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 5b82b8ceb..a7dc24a91 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -217,11 +217,12 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR return nil, err } userIDs := append(append(req.MemberUserIDs, req.AdminUserIDs...), req.OwnerUserID) + log.ZDebug(ctx, "userID test", "userIDs", userIDs) opUserID := mcontext.GetOpUserID(ctx) if !datautil.Contain(opUserID, userIDs...) { userIDs = append(userIDs, opUserID) } - + log.ZDebug(ctx, "userID test", "userIDs", userIDs) if datautil.Duplicate(userIDs) { return nil, errs.ErrArgs.WrapMsg("group member repeated") } From 6249445031b7fb5c7861af03f93ab60e1c181fff Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Fri, 21 Jun 2024 14:48:12 +0800 Subject: [PATCH 11/13] test: debug log remove. --- internal/rpc/group/group.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index a7dc24a91..5b82b8ceb 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -217,12 +217,11 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR return nil, err } userIDs := append(append(req.MemberUserIDs, req.AdminUserIDs...), req.OwnerUserID) - log.ZDebug(ctx, "userID test", "userIDs", userIDs) opUserID := mcontext.GetOpUserID(ctx) if !datautil.Contain(opUserID, userIDs...) { userIDs = append(userIDs, opUserID) } - log.ZDebug(ctx, "userID test", "userIDs", userIDs) + if datautil.Duplicate(userIDs) { return nil, errs.ErrArgs.WrapMsg("group member repeated") } From 0539a3831004680c08d0bb53af2b7edd252ab58e Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Fri, 21 Jun 2024 19:51:00 +0800 Subject: [PATCH 12/13] fix: transfer group owner incr version more than 1. --- pkg/common/storage/controller/group.go | 5 +---- pkg/common/storage/database/group_member.go | 1 + .../storage/database/mgo/group_member.go | 19 ++++++++++++++++++- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/pkg/common/storage/controller/group.go b/pkg/common/storage/controller/group.go index 27b6f4366..f2581bd92 100644 --- a/pkg/common/storage/controller/group.go +++ b/pkg/common/storage/controller/group.go @@ -400,10 +400,7 @@ func (g *groupDatabase) MapGroupMemberNum(ctx context.Context, groupIDs []string func (g *groupDatabase) TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error { return g.ctxTx.Transaction(ctx, func(ctx context.Context) error { - if err := g.groupMemberDB.UpdateRoleLevel(ctx, groupID, oldOwnerUserID, roleLevel); err != nil { - return err - } - if err := g.groupMemberDB.UpdateRoleLevel(ctx, groupID, newOwnerUserID, constant.GroupOwner); err != nil { + if err := g.groupMemberDB.UpdateUserRoleLevels(ctx, groupID, oldOwnerUserID, roleLevel, newOwnerUserID, constant.GroupOwner); err != nil { return err } c := g.cache.CloneGroupCache() diff --git a/pkg/common/storage/database/group_member.go b/pkg/common/storage/database/group_member.go index c49319649..c272b6ef6 100644 --- a/pkg/common/storage/database/group_member.go +++ b/pkg/common/storage/database/group_member.go @@ -25,6 +25,7 @@ type GroupMember interface { Delete(ctx context.Context, groupID string, userIDs []string) (err error) Update(ctx context.Context, groupID string, userID string, data map[string]any) (err error) UpdateRoleLevel(ctx context.Context, groupID string, userID string, roleLevel int32) error + UpdateUserRoleLevels(ctx context.Context, groupID string, firstUserID string, firstUserRoleLevel int32, secondUserID string, secondUserRoleLevel int32) error FindMemberUserID(ctx context.Context, groupID string) (userIDs []string, err error) Take(ctx context.Context, groupID string, userID string) (groupMember *model.GroupMember, err error) TakeOwner(ctx context.Context, groupID string) (groupMember *model.GroupMember, err error) diff --git a/pkg/common/storage/database/mgo/group_member.go b/pkg/common/storage/database/mgo/group_member.go index 30f8d63b9..3eb93a10e 100644 --- a/pkg/common/storage/database/mgo/group_member.go +++ b/pkg/common/storage/database/mgo/group_member.go @@ -115,11 +115,28 @@ func (g *GroupMemberMgo) Delete(ctx context.Context, groupID string, userIDs []s func (g *GroupMemberMgo) UpdateRoleLevel(ctx context.Context, groupID string, userID string, roleLevel int32) error { return mongoutil.IncrVersion(func() error { - return g.Update(ctx, groupID, userID, bson.M{"role_level": roleLevel}) + return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID}, + bson.M{"$set": bson.M{"role_level": roleLevel}}, true) }, func() error { return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate) }) } +func (g *GroupMemberMgo) UpdateUserRoleLevels(ctx context.Context, groupID string, firstUserID string, firstUserRoleLevel int32, secondUserID string, secondUserRoleLevel int32) error { + return mongoutil.IncrVersion(func() error { + if err := mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": firstUserID}, + bson.M{"$set": bson.M{"role_level": firstUserRoleLevel}}, true); err != nil { + return err + } + if err := mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": secondUserID}, + bson.M{"$set": bson.M{"role_level": secondUserRoleLevel}}, true); err != nil { + return err + } + + return nil + }, func() error { + return g.member.IncrVersion(ctx, groupID, []string{firstUserID, secondUserID}, model.VersionStateUpdate) + }) +} func (g *GroupMemberMgo) Update(ctx context.Context, groupID string, userID string, data map[string]any) (err error) { if len(data) == 0 { From b3510e285a4681f4babbfbeb0c838eed99991514 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Mon, 24 Jun 2024 10:13:46 +0800 Subject: [PATCH 13/13] fix: add condition to kick owner. --- internal/rpc/group/group.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 5b82b8ceb..1e36889c7 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -17,17 +17,18 @@ package group import ( "context" "fmt" + "math/big" + "math/rand" + "strconv" + "strings" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/common" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/localcache" - "math/big" - "math/rand" - "strconv" - "strings" - "time" "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" @@ -531,6 +532,14 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou if datautil.Contain(opUserID, req.KickedUserIDs...) { return nil, errs.ErrArgs.WrapMsg("opUserID in KickedUserIDs") } + owner, err := s.db.TakeGroupOwner(ctx, req.GroupID) + if err != nil { + return nil, err + } + if datautil.Contain(owner.UserID, req.KickedUserIDs...) { + return nil, errs.ErrArgs.WrapMsg("ownerUID can not Kick") + } + members, err := s.db.FindGroupMembers(ctx, req.GroupID, append(req.KickedUserIDs, opUserID)) if err != nil { return nil, err