From cd5e5dbeb26e7b18ac2435dcd9fd197d634703d3 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 4 Dec 2024 11:08:54 +0800 Subject: [PATCH] feat: seq user hook set conversation --- internal/msgtransfer/init.go | 2 +- internal/rpc/msg/server.go | 9 ++++- pkg/common/storage/database/mgo/seq_user.go | 37 +++++++++++++++++---- tools/seq/internal/main.go | 2 +- 4 files changed, 41 insertions(+), 9 deletions(-) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 92053931c..08d9d93a0 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -92,7 +92,7 @@ func Start(ctx context.Context, index int, config *Config) error { return err } seqConversationCache := redis.NewSeqConversationCacheRedis(rdb, seqConversation) - seqUser, err := mgo.NewSeqUserMongo(mgocli.GetDB()) + seqUser, err := mgo.NewSeqUserMongo(mgocli.GetDB(), nil) if err != nil { return err } diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 6d5922ce3..7eee2905f 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -99,7 +99,14 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg return err } seqConversationCache := redis.NewSeqConversationCacheRedis(rdb, seqConversation) - seqUser, err := mgo.NewSeqUserMongo(mgocli.GetDB()) + seqUser, err := mgo.NewSeqUserMongo(mgocli.GetDB(), &mgo.SeqUserHook{ + SetUserMaxSeq: func(ctx context.Context, conversationID string, userID string, seq int64) error { + return conversationClient.SetConversationMaxSeq(ctx, []string{userID}, conversationID, seq) + }, + SetUserMinSeq: func(ctx context.Context, conversationID string, userID string, seq int64) error { + return conversationClient.SetConversationMinSeq(ctx, []string{userID}, conversationID, seq) + }, + }) if err != nil { return err } diff --git a/pkg/common/storage/database/mgo/seq_user.go b/pkg/common/storage/database/mgo/seq_user.go index 244de3000..3a5a5cccb 100644 --- a/pkg/common/storage/database/mgo/seq_user.go +++ b/pkg/common/storage/database/mgo/seq_user.go @@ -11,7 +11,14 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" ) -func NewSeqUserMongo(db *mongo.Database) (database.SeqUser, error) { +type seqUserFunc func(ctx context.Context, conversationID string, userID string, seq int64) error + +type SeqUserHook struct { + SetUserMaxSeq func(ctx context.Context, conversationID string, userID string, seq int64) error + SetUserMinSeq func(ctx context.Context, conversationID string, userID string, seq int64) error +} + +func NewSeqUserMongo(db *mongo.Database, hook *SeqUserHook) (database.SeqUser, error) { coll := db.Collection(database.SeqUserName) _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ Keys: bson.D{ @@ -22,11 +29,15 @@ func NewSeqUserMongo(db *mongo.Database) (database.SeqUser, error) { if err != nil { return nil, err } - return &seqUserMongo{coll: coll}, nil + if hook == nil { + hook = &SeqUserHook{} + } + return &seqUserMongo{coll: coll, hook: hook}, nil } type seqUserMongo struct { coll *mongo.Collection + hook *SeqUserHook } func (s *seqUserMongo) setSeq(ctx context.Context, conversationID string, userID string, seq int64, field string) error { @@ -52,12 +63,12 @@ func (s *seqUserMongo) setSeq(ctx context.Context, conversationID string, userID return mongoutil.UpdateOne(ctx, s.coll, filter, update, false, opt) } -func (s *seqUserMongo) getSeq(ctx context.Context, conversationID string, userID string, failed string) (int64, error) { +func (s *seqUserMongo) getSeq(ctx context.Context, conversationID string, userID string, field string) (int64, error) { filter := map[string]any{ "user_id": userID, "conversation_id": conversationID, } - opt := options.FindOne().SetProjection(bson.M{"_id": 0, failed: 1}) + opt := options.FindOne().SetProjection(bson.M{"_id": 0, field: 1}) seq, err := mongoutil.FindOne[int64](ctx, s.coll, filter, opt) if err == nil { return seq, nil @@ -72,8 +83,21 @@ func (s *seqUserMongo) GetUserMaxSeq(ctx context.Context, conversationID string, return s.getSeq(ctx, conversationID, userID, "max_seq") } +func (s *seqUserMongo) withHook(ctx context.Context, conversationID string, userID string, seq int64, field string, hookFn seqUserFunc) error { + if err := s.setSeq(ctx, conversationID, userID, seq, field); err != nil { + return err + } + if hookFn != nil { + if err := hookFn(ctx, conversationID, userID, seq); err != nil { + return err + } + } + return nil +} + func (s *seqUserMongo) SetUserMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error { - return s.setSeq(ctx, conversationID, userID, seq, "max_seq") + //return s.setSeq(ctx, conversationID, userID, seq, "max_seq") + return s.withHook(ctx, conversationID, userID, seq, "max_seq", s.hook.SetUserMaxSeq) } func (s *seqUserMongo) GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { @@ -81,7 +105,8 @@ func (s *seqUserMongo) GetUserMinSeq(ctx context.Context, conversationID string, } func (s *seqUserMongo) SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error { - return s.setSeq(ctx, conversationID, userID, seq, "min_seq") + //return s.setSeq(ctx, conversationID, userID, seq, "min_seq") + return s.withHook(ctx, conversationID, userID, seq, "min_seq", s.hook.SetUserMinSeq) } func (s *seqUserMongo) GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) { diff --git a/tools/seq/internal/main.go b/tools/seq/internal/main.go index 2bec5a8f1..201fca387 100644 --- a/tools/seq/internal/main.go +++ b/tools/seq/internal/main.go @@ -87,7 +87,7 @@ func Main(conf string, del time.Duration) error { if err != nil { return err } - uSeq, err := mgo.NewSeqUserMongo(mgocli.GetDB()) + uSeq, err := mgo.NewSeqUserMongo(mgocli.GetDB(), nil) if err != nil { return err }