From ae06f16c5a32183d3b2034b4122bccb9d7d1306d Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Thu, 18 Jul 2024 16:58:46 +0800 Subject: [PATCH] sort version --- go.mod | 2 +- go.sum | 4 +- internal/rpc/friend/notification.go | 29 ++++++++++++++ internal/rpc/friend/sync.go | 39 +++++++++++++------ internal/rpc/group/notification.go | 17 +++++++- internal/rpc/group/sync.go | 35 ++++++++++------- .../storage/database/mgo/group_member.go | 13 +++++-- .../storage/database/mgo/version_log.go | 20 +++++++++- .../storage/database/mgo/version_test.go | 14 +++---- pkg/common/storage/model/version_log.go | 5 +++ 10 files changed, 137 insertions(+), 41 deletions(-) diff --git a/go.mod b/go.mod index a799f2c08..0637492eb 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.69-alpha.30 + github.com/openimsdk/protocol v0.0.69-alpha.38 github.com/openimsdk/tools v0.0.49-alpha.51 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index d9f948874..92a783c06 100644 --- a/go.sum +++ b/go.sum @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.69-alpha.30 h1:OXzCIpDpIY/GI6h1SDYWN51OS9Xv/BcHaOwq8whPKqI= -github.com/openimsdk/protocol v0.0.69-alpha.30/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.69-alpha.38 h1:kVZCHIXg/el8YJFoIBWhZu1sbbTUqmzgF4l0W3sUH24= +github.com/openimsdk/protocol v0.0.69-alpha.38/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.49-alpha.51 h1:JTPEetVSNOczw1n+XjiPozaH2SBPQAc+9VlPE41wEeY= github.com/openimsdk/tools v0.0.49-alpha.51/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= diff --git a/internal/rpc/friend/notification.go b/internal/rpc/friend/notification.go index ddee025bb..5fb34577f 100644 --- a/internal/rpc/friend/notification.go +++ b/internal/rpc/friend/notification.go @@ -16,6 +16,8 @@ package friend import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx" relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" @@ -191,10 +193,37 @@ func (f *FriendNotificationSender) FriendDeletedNotification(ctx context.Context f.Notification(ctx, req.OwnerUserID, req.FriendUserID, constant.FriendDeletedNotification, &tips) } +func (f *FriendNotificationSender) setVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string) { + versions := versionctx.GetVersionLog(ctx).Get() + for _, coll := range versions { + if coll.Name == collName && coll.Doc.DID == id { + *version = uint64(coll.Doc.Version) + *versionID = coll.Doc.ID.Hex() + return + } + } +} + +func (f *FriendNotificationSender) setSortVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string, sortVersion *uint64) { + versions := versionctx.GetVersionLog(ctx).Get() + for _, coll := range versions { + if coll.Name == collName && coll.Doc.DID == id { + *version = uint64(coll.Doc.Version) + *versionID = coll.Doc.ID.Hex() + for _, elem := range coll.Doc.Logs { + if elem.EID == relationtb.VersionSortChangeID { + *sortVersion = uint64(elem.Version) + } + } + } + } +} + func (f *FriendNotificationSender) FriendRemarkSetNotification(ctx context.Context, fromUserID, toUserID string) { tips := sdkws.FriendInfoChangedTips{FromToUserID: &sdkws.FromToUserID{}} tips.FromToUserID.FromUserID = fromUserID tips.FromToUserID.ToUserID = toUserID + f.setSortVersion(ctx, &tips.FriendVersion, &tips.FriendVersionID, database.FriendVersionName, toUserID, &tips.FriendSortVersion) f.Notification(ctx, fromUserID, toUserID, constant.FriendRemarkSetNotification, &tips) } diff --git a/internal/rpc/friend/sync.go b/internal/rpc/friend/sync.go index 684894609..eee9f2afd 100644 --- a/internal/rpc/friend/sync.go +++ b/internal/rpc/friend/sync.go @@ -4,6 +4,7 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/util/hashutil" "github.com/openimsdk/protocol/sdkws" + "slices" "github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion" "github.com/openimsdk/open-im-server/v3/pkg/authverify" @@ -52,12 +53,27 @@ func (s *friendServer) GetIncrementalFriends(ctx context.Context, req *relation. if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil { return nil, err } + var sortVersion uint64 opt := incrversion.Option[*sdkws.FriendInfo, relation.GetIncrementalFriendsResp]{ - Ctx: ctx, - VersionKey: req.UserID, - VersionID: req.VersionID, - VersionNumber: req.Version, - Version: s.db.FindFriendIncrVersion, + Ctx: ctx, + VersionKey: req.UserID, + VersionID: req.VersionID, + VersionNumber: req.Version, + Version: func(ctx context.Context, ownerUserID string, version uint, limit int) (*model.VersionLog, error) { + vl, err := s.db.FindFriendIncrVersion(ctx, ownerUserID, version, limit) + if err != nil { + return nil, err + } + vl.Logs = slices.DeleteFunc(vl.Logs, func(elem model.VersionLogElem) bool { + if elem.EID == model.VersionSortChangeID { + vl.LogLen-- + sortVersion = uint64(elem.Version) + return true + } + return false + }) + return vl, nil + }, CacheMaxVersion: s.db.FindMaxFriendVersionCache, Find: func(ctx context.Context, ids []string) ([]*sdkws.FriendInfo, error) { return s.getFriend(ctx, req.UserID, ids) @@ -65,12 +81,13 @@ func (s *friendServer) GetIncrementalFriends(ctx context.Context, req *relation. ID: func(elem *sdkws.FriendInfo) string { return elem.FriendUser.UserID }, Resp: func(version *model.VersionLog, deleteIds []string, insertList, updateList []*sdkws.FriendInfo, full bool) *relation.GetIncrementalFriendsResp { return &relation.GetIncrementalFriendsResp{ - VersionID: version.ID.Hex(), - Version: uint64(version.Version), - Full: full, - Delete: deleteIds, - Insert: insertList, - Update: updateList, + VersionID: version.ID.Hex(), + Version: uint64(version.Version), + Full: full, + Delete: deleteIds, + Insert: insertList, + Update: updateList, + SortVersion: sortVersion, } }, } diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index a8824962d..a7398795f 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -306,6 +306,21 @@ func (g *GroupNotificationSender) setVersion(ctx context.Context, version *uint6 } } +func (g *GroupNotificationSender) setSortVersion(ctx context.Context, version *uint64, versionID *string, collName string, id string, sortVersion *uint64) { + versions := versionctx.GetVersionLog(ctx).Get() + for _, coll := range versions { + if coll.Name == collName && coll.Doc.DID == id { + *version = uint64(coll.Doc.Version) + *versionID = coll.Doc.ID.Hex() + for _, elem := range coll.Doc.Logs { + if elem.EID == model.VersionSortChangeID { + *sortVersion = uint64(elem.Version) + } + } + } + } +} + func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context, tips *sdkws.GroupCreatedTips) { var err error defer func() { @@ -707,7 +722,7 @@ func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Con if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return } - g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID) + g.setSortVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID, &tips.GroupSortVersion) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberInfoSetNotification, tips) } diff --git a/internal/rpc/group/sync.go b/internal/rpc/group/sync.go index 75d060c0e..12598510b 100644 --- a/internal/rpc/group/sync.go +++ b/internal/rpc/group/sync.go @@ -10,7 +10,6 @@ import ( "github.com/openimsdk/protocol/constant" pbgroup "github.com/openimsdk/protocol/group" "github.com/openimsdk/protocol/sdkws" - "slices" ) func (s *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgroup.GetFullGroupMemberUserIDsReq) (*pbgroup.GetFullGroupMemberUserIDsResp, error) { @@ -63,7 +62,10 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou if group.Status == constant.GroupStatusDismissed { return nil, servererrs.ErrDismissedAlready.Wrap() } - var hasGroupUpdate bool + var ( + hasGroupUpdate bool + sortVersion uint64 + ) opt := incrversion.Option[*sdkws.GroupMemberFullInfo, pbgroup.GetIncrementalGroupMemberResp]{ Ctx: ctx, VersionKey: req.GroupID, @@ -74,14 +76,20 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou if err != nil { return nil, err } - vl.Logs = slices.DeleteFunc(vl.Logs, func(elem model.VersionLogElem) bool { - if elem.EID == "" { + logs := make([]model.VersionLogElem, 0, len(vl.Logs)) + for i, log := range vl.Logs { + switch log.EID { + case model.VersionGroupChangeID: vl.LogLen-- hasGroupUpdate = true - return true + case model.VersionSortChangeID: + vl.LogLen-- + sortVersion = uint64(log.Version) + default: + logs = append(logs, vl.Logs[i]) } - return false - }) + } + vl.Logs = logs if vl.LogLen > 0 { hasGroupUpdate = true } @@ -94,12 +102,13 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou ID: func(elem *sdkws.GroupMemberFullInfo) string { return elem.UserID }, Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupMemberFullInfo, full bool) *pbgroup.GetIncrementalGroupMemberResp { return &pbgroup.GetIncrementalGroupMemberResp{ - VersionID: version.ID.Hex(), - Version: uint64(version.Version), - Full: full, - Delete: delIDs, - Insert: insertList, - Update: updateList, + VersionID: version.ID.Hex(), + Version: uint64(version.Version), + Full: full, + Delete: delIDs, + Insert: insertList, + Update: updateList, + SortVersion: sortVersion, } }, } diff --git a/pkg/common/storage/database/mgo/group_member.go b/pkg/common/storage/database/mgo/group_member.go index 8de21e83a..42b3dd72b 100644 --- a/pkg/common/storage/database/mgo/group_member.go +++ b/pkg/common/storage/database/mgo/group_member.go @@ -118,7 +118,7 @@ func (g *GroupMemberMgo) UpdateRoleLevel(ctx context.Context, groupID string, us return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID}, bson.M{"$set": bson.M{"role_level": roleLevel}}, true) }, func() error { - return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate) + return g.member.IncrVersion(ctx, groupID, []string{model.VersionSortChangeID, userID}, model.VersionStateUpdate) }) } func (g *GroupMemberMgo) UpdateUserRoleLevels(ctx context.Context, groupID string, firstUserID string, firstUserRoleLevel int32, secondUserID string, secondUserRoleLevel int32) error { @@ -131,10 +131,9 @@ func (g *GroupMemberMgo) UpdateUserRoleLevels(ctx context.Context, groupID strin bson.M{"$set": bson.M{"role_level": secondUserRoleLevel}}, true); err != nil { return err } - return nil }, func() error { - return g.member.IncrVersion(ctx, groupID, []string{firstUserID, secondUserID}, model.VersionStateUpdate) + return g.member.IncrVersion(ctx, groupID, []string{model.VersionSortChangeID, firstUserID, secondUserID}, model.VersionStateUpdate) }) } @@ -145,7 +144,13 @@ func (g *GroupMemberMgo) Update(ctx context.Context, groupID string, userID stri return mongoutil.IncrVersion(func() error { return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID}, bson.M{"$set": data}, true) }, func() error { - return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate) + var userIDs []string + if g.IsUpdateRoleLevel(data) { + userIDs = []string{model.VersionSortChangeID, userID} + } else { + userIDs = []string{userID} + } + return g.member.IncrVersion(ctx, groupID, userIDs, model.VersionStateUpdate) }) } diff --git a/pkg/common/storage/database/mgo/version_log.go b/pkg/common/storage/database/mgo/version_log.go index 8836742f0..80b5658c6 100644 --- a/pkg/common/storage/database/mgo/version_log.go +++ b/pkg/common/storage/database/mgo/version_log.go @@ -152,8 +152,24 @@ func (l *VersionLogMgo) writeLogBatch2(ctx context.Context, dId string, eIds []s "$unset": "delete_e_ids", }, } - opt := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After).SetProjection(bson.M{"logs": 0}) - return mongoutil.FindOneAndUpdate[*model.VersionLog](ctx, l.coll, filter, pipeline, opt) + projection := bson.M{ + "logs": 0, + } + opt := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After).SetProjection(projection) + res, err := mongoutil.FindOneAndUpdate[*model.VersionLog](ctx, l.coll, filter, pipeline, opt) + if err != nil { + return nil, err + } + res.Logs = make([]model.VersionLogElem, 0, len(eIds)) + for _, id := range eIds { + res.Logs = append(res.Logs, model.VersionLogElem{ + EID: id, + State: state, + Version: res.Version, + LastUpdate: res.LastUpdate, + }) + } + return res, nil } func (l *VersionLogMgo) findDoc(ctx context.Context, dId string) (*model.VersionLog, error) { diff --git a/pkg/common/storage/database/mgo/version_test.go b/pkg/common/storage/database/mgo/version_test.go index 236c61a2c..4576e45bc 100644 --- a/pkg/common/storage/database/mgo/version_test.go +++ b/pkg/common/storage/database/mgo/version_test.go @@ -9,12 +9,12 @@ import ( "time" ) -func Result[V any](val V, err error) V { - if err != nil { - panic(err) - } - return val -} +//func Result[V any](val V, err error) V { +// if err != nil { +// panic(err) +// } +// return val +//} func Check(err error) { if err != nil { @@ -30,7 +30,7 @@ func TestName(t *testing.T) { panic(err) } vl := tmp.(*VersionLogMgo) - res, err := vl.writeLogBatch2(context.Background(), "100", []string{"1000", "1001", "1003"}, model.VersionStateInsert, time.Now()) + res, err := vl.incrVersionResult(context.Background(), "100", []string{"1000", "1001", "1003"}, model.VersionStateInsert) if err != nil { t.Log(err) return diff --git a/pkg/common/storage/model/version_log.go b/pkg/common/storage/model/version_log.go index 11a40ef24..2091a3a29 100644 --- a/pkg/common/storage/model/version_log.go +++ b/pkg/common/storage/model/version_log.go @@ -14,6 +14,11 @@ const ( VersionStateUpdate ) +const ( + VersionGroupChangeID = "" + VersionSortChangeID = "$" +) + type VersionLogElem struct { EID string `bson:"e_id"` State int32 `bson:"state"`