From caebdf32caf3e9eb051cf49e8cf19cde3f1aecfe Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Tue, 11 Jun 2024 18:38:36 +0800 Subject: [PATCH] sync --- internal/rpc/group/notification.go | 22 +++++++ pkg/common/cmd/group.go | 3 +- pkg/common/storage/database/mgo/black.go | 2 +- .../storage/database/mgo/conversation.go | 3 +- pkg/common/storage/database/mgo/friend.go | 4 +- .../storage/database/mgo/friend_request.go | 2 +- pkg/common/storage/database/mgo/group.go | 2 +- .../storage/database/mgo/group_member.go | 6 +- .../storage/database/mgo/group_request.go | 2 +- pkg/common/storage/database/mgo/log.go | 2 +- pkg/common/storage/database/mgo/object.go | 2 +- pkg/common/storage/database/mgo/user.go | 2 +- .../storage/database/mgo/version_log.go | 62 +++++++++++-------- .../storage/database/mgo/version_test.go | 39 ++++++++++++ pkg/common/storage/database/name.go | 17 +++++ pkg/common/storage/model/version_log.go | 2 +- pkg/common/storage/versionctx/rpc.go | 14 +++++ pkg/common/storage/versionctx/version.go | 48 ++++++++++++++ 18 files changed, 193 insertions(+), 41 deletions(-) create mode 100644 pkg/common/storage/database/mgo/version_test.go create mode 100644 pkg/common/storage/database/name.go create mode 100644 pkg/common/storage/versionctx/rpc.go create mode 100644 pkg/common/storage/versionctx/version.go diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index cfa62c85d..13e593c71 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -17,7 +17,9 @@ package group import ( "context" "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" "github.com/openimsdk/open-im-server/v3/pkg/authverify" @@ -287,6 +289,15 @@ func (g *GroupNotificationSender) fillOpUser(ctx context.Context, opUser **sdkws return nil } +func (g *GroupNotificationSender) setVersion(ctx context.Context, version *uint64, collName string, id string) { + for _, coll := range versionctx.GetVersionLog(ctx).Get() { + if coll.Name == collName && coll.Doc.DID == id { + *version = uint64(coll.Doc.Version) + return + } + } +} + func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context, tips *sdkws.GroupCreatedTips) { var err error defer func() { @@ -297,6 +308,7 @@ func (g *GroupNotificationSender) GroupCreatedNotification(ctx context.Context, if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return } + g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, tips.Group.GroupID) g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupCreatedNotification, tips) } @@ -380,6 +392,7 @@ func (g *GroupNotificationSender) MemberQuitNotification(ctx context.Context, me return } tips := &sdkws.MemberQuitTips{Group: group, QuitUser: member} + g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, member.GroupID) g.Notification(ctx, mcontext.GetOpUserID(ctx), member.GroupID, constant.MemberQuitNotification, tips) } @@ -467,6 +480,7 @@ func (g *GroupNotificationSender) GroupOwnerTransferredNotification(ctx context. if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return } + g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, req.GroupID) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupOwnerTransferredNotification, tips) } @@ -480,6 +494,7 @@ func (g *GroupNotificationSender) MemberKickedNotification(ctx context.Context, if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return } + g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, tips.Group.GroupID) g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.MemberKickedNotification, tips) } @@ -503,6 +518,7 @@ func (g *GroupNotificationSender) MemberInvitedNotification(ctx context.Context, } tips := &sdkws.MemberInvitedTips{Group: group, InvitedUserList: users} err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID) + g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, tips.Group.GroupID) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberInvitedNotification, tips) } @@ -524,6 +540,7 @@ func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, g return } tips := &sdkws.MemberEnterTips{Group: group, EntrantUser: user} + g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, tips.Group.GroupID) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberEnterNotification, tips) } @@ -564,6 +581,7 @@ func (g *GroupNotificationSender) GroupMemberMutedNotification(ctx context.Conte if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return } + g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, tips.Group.GroupID) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberMutedNotification, tips) } @@ -588,6 +606,7 @@ func (g *GroupNotificationSender) GroupMemberCancelMutedNotification(ctx context if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return } + g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, tips.Group.GroupID) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberCancelMutedNotification, tips) } @@ -666,6 +685,7 @@ func (g *GroupNotificationSender) GroupMemberInfoSetNotification(ctx context.Con if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return } + g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, tips.Group.GroupID) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberInfoSetNotification, tips) } @@ -689,6 +709,7 @@ func (g *GroupNotificationSender) GroupMemberSetToAdminNotification(ctx context. if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return } + g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, tips.Group.GroupID) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToAdminNotification, tips) } @@ -713,5 +734,6 @@ func (g *GroupNotificationSender) GroupMemberSetToOrdinaryUserNotification(ctx c if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return } + g.setVersion(ctx, &tips.GroupMemberVersion, database.GroupMemberVersionName, tips.Group.GroupID) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToOrdinaryUserNotification, tips) } diff --git a/pkg/common/cmd/group.go b/pkg/common/cmd/group.go index f158b8c62..20124be95 100644 --- a/pkg/common/cmd/group.go +++ b/pkg/common/cmd/group.go @@ -19,6 +19,7 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/rpc/group" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx" "github.com/openimsdk/tools/system/program" "github.com/spf13/cobra" ) @@ -58,5 +59,5 @@ func (a *GroupRpcCmd) Exec() error { func (a *GroupRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.Ports, - a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, group.Start) + a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, group.Start, versionctx.EnableVersionCtx()) } diff --git a/pkg/common/storage/database/mgo/black.go b/pkg/common/storage/database/mgo/black.go index cf74cfab1..4a7a35e6f 100644 --- a/pkg/common/storage/database/mgo/black.go +++ b/pkg/common/storage/database/mgo/black.go @@ -27,7 +27,7 @@ import ( ) func NewBlackMongo(db *mongo.Database) (database.Black, error) { - coll := db.Collection("black") + coll := db.Collection(database.BlackName) _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ Keys: bson.D{ {Key: "owner_user_id", Value: 1}, diff --git a/pkg/common/storage/database/mgo/conversation.go b/pkg/common/storage/database/mgo/conversation.go index 9c35f841b..b462d3958 100644 --- a/pkg/common/storage/database/mgo/conversation.go +++ b/pkg/common/storage/database/mgo/conversation.go @@ -16,6 +16,7 @@ package mgo import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "time" @@ -29,7 +30,7 @@ import ( ) func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) { - coll := db.Collection("conversation") + coll := db.Collection(database.ConversationName) _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ Keys: bson.D{ {Key: "owner_user_id", Value: 1}, diff --git a/pkg/common/storage/database/mgo/friend.go b/pkg/common/storage/database/mgo/friend.go index 699d9cff6..18d80d47d 100644 --- a/pkg/common/storage/database/mgo/friend.go +++ b/pkg/common/storage/database/mgo/friend.go @@ -34,7 +34,7 @@ type FriendMgo struct { // NewFriendMongo creates a new instance of FriendMgo with the provided MongoDB database. func NewFriendMongo(db *mongo.Database) (database.Friend, error) { - coll := db.Collection("friend") + coll := db.Collection(database.FriendName) _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ Keys: bson.D{ {Key: "owner_user_id", Value: 1}, @@ -45,7 +45,7 @@ func NewFriendMongo(db *mongo.Database) (database.Friend, error) { if err != nil { return nil, err } - owner, err := NewVersionLog(db.Collection("friend_version")) + owner, err := NewVersionLog(db.Collection(database.FriendVersionName)) if err != nil { return nil, err } diff --git a/pkg/common/storage/database/mgo/friend_request.go b/pkg/common/storage/database/mgo/friend_request.go index 0d60b213d..4eed2f4a2 100644 --- a/pkg/common/storage/database/mgo/friend_request.go +++ b/pkg/common/storage/database/mgo/friend_request.go @@ -27,7 +27,7 @@ import ( ) func NewFriendRequestMongo(db *mongo.Database) (database.FriendRequest, error) { - coll := db.Collection("friend_request") + coll := db.Collection(database.FriendRequestName) _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ Keys: bson.D{ {Key: "from_user_id", Value: 1}, diff --git a/pkg/common/storage/database/mgo/group.go b/pkg/common/storage/database/mgo/group.go index 630bc0291..3be7883af 100644 --- a/pkg/common/storage/database/mgo/group.go +++ b/pkg/common/storage/database/mgo/group.go @@ -30,7 +30,7 @@ import ( ) func NewGroupMongo(db *mongo.Database) (database.Group, error) { - coll := db.Collection("group") + coll := db.Collection(database.GroupName) _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ Keys: bson.D{ {Key: "group_id", Value: 1}, diff --git a/pkg/common/storage/database/mgo/group_member.go b/pkg/common/storage/database/mgo/group_member.go index ece1d7941..a9ac262ab 100644 --- a/pkg/common/storage/database/mgo/group_member.go +++ b/pkg/common/storage/database/mgo/group_member.go @@ -29,7 +29,7 @@ import ( ) func NewGroupMember(db *mongo.Database) (database.GroupMember, error) { - coll := db.Collection("group_member") + coll := db.Collection(database.GroupMemberName) _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ Keys: bson.D{ {Key: "group_id", Value: 1}, @@ -40,11 +40,11 @@ func NewGroupMember(db *mongo.Database) (database.GroupMember, error) { if err != nil { return nil, errs.Wrap(err) } - member, err := NewVersionLog(db.Collection("group_member_version")) + member, err := NewVersionLog(db.Collection(database.GroupMemberVersionName)) if err != nil { return nil, err } - join, err := NewVersionLog(db.Collection("group_join_version")) + join, err := NewVersionLog(db.Collection(database.GroupJoinVersionName)) if err != nil { return nil, err } diff --git a/pkg/common/storage/database/mgo/group_request.go b/pkg/common/storage/database/mgo/group_request.go index 4ae778527..b1942b708 100644 --- a/pkg/common/storage/database/mgo/group_request.go +++ b/pkg/common/storage/database/mgo/group_request.go @@ -28,7 +28,7 @@ import ( ) func NewGroupRequestMgo(db *mongo.Database) (database.GroupRequest, error) { - coll := db.Collection("group_request") + coll := db.Collection(database.GroupRequestName) _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ Keys: bson.D{ {Key: "group_id", Value: 1}, diff --git a/pkg/common/storage/database/mgo/log.go b/pkg/common/storage/database/mgo/log.go index 51715bd77..6ff4c6039 100644 --- a/pkg/common/storage/database/mgo/log.go +++ b/pkg/common/storage/database/mgo/log.go @@ -28,7 +28,7 @@ import ( ) func NewLogMongo(db *mongo.Database) (database.Log, error) { - coll := db.Collection("log") + coll := db.Collection(database.LogName) _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ { Keys: bson.D{ diff --git a/pkg/common/storage/database/mgo/object.go b/pkg/common/storage/database/mgo/object.go index 8ed7b3a56..df4d10ec4 100644 --- a/pkg/common/storage/database/mgo/object.go +++ b/pkg/common/storage/database/mgo/object.go @@ -27,7 +27,7 @@ import ( ) func NewS3Mongo(db *mongo.Database) (database.ObjectInfo, error) { - coll := db.Collection("s3") + coll := db.Collection(database.ObjectName) _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ Keys: bson.D{ {Key: "name", Value: 1}, diff --git a/pkg/common/storage/database/mgo/user.go b/pkg/common/storage/database/mgo/user.go index 793b8cdc8..8978e64eb 100644 --- a/pkg/common/storage/database/mgo/user.go +++ b/pkg/common/storage/database/mgo/user.go @@ -31,7 +31,7 @@ import ( ) func NewUserMongo(db *mongo.Database) (database.User, error) { - coll := db.Collection("user") + coll := db.Collection(database.UserName) _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ Keys: bson.D{ {Key: "user_id", Value: 1}, diff --git a/pkg/common/storage/database/mgo/version_log.go b/pkg/common/storage/database/mgo/version_log.go index 8ab11007d..b53057fe4 100644 --- a/pkg/common/storage/database/mgo/version_log.go +++ b/pkg/common/storage/database/mgo/version_log.go @@ -5,9 +5,9 @@ import ( "errors" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/utils/datautil" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" @@ -37,34 +37,41 @@ func (l *VersionLogMgo) initIndex(ctx context.Context) error { } func (l *VersionLogMgo) IncrVersion(ctx context.Context, dId string, eIds []string, state int32) error { - if len(eIds) == 0 { - return errs.ErrArgs.WrapMsg("elem id is empty", "dId", dId) + _, err := l.IncrVersionResult(ctx, dId, eIds, state) + return err +} + +func (l *VersionLogMgo) IncrVersionResult(ctx context.Context, dId string, eIds []string, state int32) (*model.VersionLog, error) { + vl, err := l.incrVersionResult(ctx, dId, eIds, state) + if err != nil { + return nil, err } - if datautil.Duplicate(eIds) { - return errs.ErrArgs.WrapMsg("elem id is duplicate", "dId", dId, "eIds", eIds) + versionctx.GetVersionLog(ctx).Append(versionctx.Collection{ + Name: l.coll.Name(), + Doc: vl, + }) + return vl, nil +} + +func (l *VersionLogMgo) incrVersionResult(ctx context.Context, dId string, eIds []string, state int32) (*model.VersionLog, error) { + if len(eIds) == 0 { + return nil, errs.ErrArgs.WrapMsg("elem id is empty", "dId", dId) } now := time.Now() - res, err := l.writeLogBatch(ctx, dId, eIds, state, now) - if err != nil { - return err - } - if res.MatchedCount > 0 { - return nil + if res, err := l.writeLogBatch2(ctx, dId, eIds, state, now); err == nil { + return res, nil + } else if !errors.Is(err, mongo.ErrNoDocuments) { + return nil, err } - if _, err := l.initDoc(ctx, dId, eIds, state, now); err == nil { - return nil + if res, err := l.initDoc(ctx, dId, eIds, state, now); err == nil { + return res, nil } else if !mongo.IsDuplicateKeyError(err) { - return err - } - if res, err := l.writeLogBatch(ctx, dId, eIds, state, now); err != nil { - return err - } else if res.MatchedCount == 0 { - return errs.ErrInternalServer.WrapMsg("mongodb return value that should not occur", "coll", l.coll.Name(), "dId", dId, "eIds", eIds) + return nil, err } - return nil + return l.writeLogBatch2(ctx, dId, eIds, state, now) } -func (l *VersionLogMgo) initDoc(ctx context.Context, dId string, eIds []string, state int32, now time.Time) (*model.VersionLogTable, error) { +func (l *VersionLogMgo) initDoc(ctx context.Context, dId string, eIds []string, state int32, now time.Time) (*model.VersionLog, error) { wl := model.VersionLogTable{ ID: primitive.NewObjectID(), DID: dId, @@ -81,11 +88,13 @@ func (l *VersionLogMgo) initDoc(ctx context.Context, dId string, eIds []string, LastUpdate: now, }) } - _, err := l.coll.InsertOne(ctx, &wl) - return &wl, err + if _, err := l.coll.InsertOne(ctx, &wl); err != nil { + return nil, err + } + return wl.VersionLog(), nil } -func (l *VersionLogMgo) writeLogBatch(ctx context.Context, dId string, eIds []string, state int32, now time.Time) (*mongo.UpdateResult, error) { +func (l *VersionLogMgo) writeLogBatch2(ctx context.Context, dId string, eIds []string, state int32, now time.Time) (*model.VersionLog, error) { if eIds == nil { eIds = []string{} } @@ -142,7 +151,8 @@ func (l *VersionLogMgo) writeLogBatch(ctx context.Context, dId string, eIds []st "$unset": "delete_e_ids", }, } - return mongoutil.UpdateMany(ctx, l.coll, filter, pipeline) + opt := options.FindOneAndUpdate().SetUpsert(false).SetReturnDocument(options.After).SetProjection(bson.M{"logs": 0}) + return mongoutil.FindOneAndUpdate[*model.VersionLog](ctx, l.coll, filter, pipeline, opt) } func (l *VersionLogMgo) findDoc(ctx context.Context, dId string) (*model.VersionLog, error) { @@ -160,7 +170,7 @@ func (l *VersionLogMgo) FindChangeLog(ctx context.Context, dId string, version u return nil, err } if res, err := l.initDoc(ctx, dId, nil, 0, time.Now()); err == nil { - return res.VersionLog(), nil + return res, nil } else if mongo.IsDuplicateKeyError(err) { return l.findChangeLog(ctx, dId, version, limit) } else { diff --git a/pkg/common/storage/database/mgo/version_test.go b/pkg/common/storage/database/mgo/version_test.go new file mode 100644 index 000000000..236c61a2c --- /dev/null +++ b/pkg/common/storage/database/mgo/version_test.go @@ -0,0 +1,39 @@ +package mgo + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "testing" + "time" +) + +func Result[V any](val V, err error) V { + if err != nil { + panic(err) + } + return val +} + +func Check(err error) { + if err != nil { + panic(err) + } +} + +func TestName(t *testing.T) { + cli := Result(mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) + coll := cli.Database("openim_v3").Collection("version_test") + tmp, err := NewVersionLog(coll) + if err != nil { + panic(err) + } + vl := tmp.(*VersionLogMgo) + res, err := vl.writeLogBatch2(context.Background(), "100", []string{"1000", "1001", "1003"}, model.VersionStateInsert, time.Now()) + if err != nil { + t.Log(err) + return + } + t.Logf("%+v", res) +} diff --git a/pkg/common/storage/database/name.go b/pkg/common/storage/database/name.go new file mode 100644 index 000000000..986f22a1a --- /dev/null +++ b/pkg/common/storage/database/name.go @@ -0,0 +1,17 @@ +package database + +const ( + BlackName = "black" + ConversationName = "conversation" + FriendName = "friend" + FriendVersionName = "friend_version" + FriendRequestName = "friend_request" + GroupName = "group" + GroupMemberName = "group_member" + GroupMemberVersionName = "group_member_version" + GroupJoinVersionName = "group_join_version" + GroupRequestName = "group_request" + LogName = "log" + ObjectName = "s3" + UserName = "user" +) diff --git a/pkg/common/storage/model/version_log.go b/pkg/common/storage/model/version_log.go index e1b5fe7c5..11a40ef24 100644 --- a/pkg/common/storage/model/version_log.go +++ b/pkg/common/storage/model/version_log.go @@ -38,7 +38,7 @@ func (v *VersionLogTable) VersionLog() *VersionLog { Version: v.Version, Deleted: v.Deleted, LastUpdate: v.LastUpdate, - LogLen: 0, + LogLen: len(v.Logs), } } diff --git a/pkg/common/storage/versionctx/rpc.go b/pkg/common/storage/versionctx/rpc.go new file mode 100644 index 000000000..67b95aebd --- /dev/null +++ b/pkg/common/storage/versionctx/rpc.go @@ -0,0 +1,14 @@ +package versionctx + +import ( + "context" + "google.golang.org/grpc" +) + +func EnableVersionCtx() grpc.ServerOption { + return grpc.ChainUnaryInterceptor(enableVersionCtxInterceptor) +} + +func enableVersionCtxInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + return handler(WithVersionLog(ctx), req) +} diff --git a/pkg/common/storage/versionctx/version.go b/pkg/common/storage/versionctx/version.go new file mode 100644 index 000000000..5db885640 --- /dev/null +++ b/pkg/common/storage/versionctx/version.go @@ -0,0 +1,48 @@ +package versionctx + +import ( + "context" + tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "sync" +) + +type Collection struct { + Name string + Doc *tablerelation.VersionLog +} + +type versionKey struct{} + +func WithVersionLog(ctx context.Context) context.Context { + return context.WithValue(ctx, versionKey{}, &VersionLog{}) +} + +func GetVersionLog(ctx context.Context) *VersionLog { + if v, ok := ctx.Value(versionKey{}).(*VersionLog); ok { + return v + } + return nil +} + +type VersionLog struct { + lock sync.Mutex + data []Collection +} + +func (v *VersionLog) Append(data ...Collection) { + if v == nil || len(data) == 0 { + return + } + v.lock.Lock() + defer v.lock.Unlock() + v.data = append(v.data, data...) +} + +func (v *VersionLog) Get() []Collection { + if v == nil { + return nil + } + v.lock.Lock() + defer v.lock.Unlock() + return v.data +}