From 8e3890ed8824e5e61994adc50b8253457db7c0af Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 21 Jun 2024 18:24:30 +0800 Subject: [PATCH] seq --- pkg/common/storage/cache/redis/seq.go | 64 +++++++++++-------- pkg/common/storage/database/mgo/seq_user.go | 71 +++++++++++++++++++++ pkg/common/storage/database/name.go | 1 + pkg/common/storage/database/seq_user.go | 12 ++++ pkg/common/storage/model/seq_user.go | 9 +++ 5 files changed, 130 insertions(+), 27 deletions(-) create mode 100644 pkg/common/storage/database/mgo/seq_user.go create mode 100644 pkg/common/storage/database/seq_user.go create mode 100644 pkg/common/storage/model/seq_user.go diff --git a/pkg/common/storage/cache/redis/seq.go b/pkg/common/storage/cache/redis/seq.go index 76dd921a5..8624cc78f 100644 --- a/pkg/common/storage/cache/redis/seq.go +++ b/pkg/common/storage/cache/redis/seq.go @@ -75,21 +75,43 @@ func (c *seqCache) getSeqs(ctx context.Context, items []string, getkey func(s st return m, nil } -func (c *seqCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error { - return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey) -} - -func (c *seqCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) { - return c.getSeqs(ctx, conversationIDs, c.getMaxSeqKey) -} - -func (c *seqCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { - return c.getSeq(ctx, conversationID, c.getMaxSeqKey) -} - -func (c *seqCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { - return c.setSeq(ctx, conversationID, minSeq, c.getMinSeqKey) -} +//func (c *seqCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error { +// return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey) +//} +// +//func (c *seqCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) { +// return c.getSeqs(ctx, conversationIDs, c.getMaxSeqKey) +//} +// +//func (c *seqCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { +// return c.getSeq(ctx, conversationID, c.getMaxSeqKey) +//} +// +//func (c *seqCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { +// return c.setSeq(ctx, conversationID, minSeq, c.getMinSeqKey) +//} +// +//func (c *seqCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error { +// for conversationID, seq := range seqs { +// if err := c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err(); err != nil { +// return errs.Wrap(err) +// } +// } +// return nil +//} +// +// +//func (c *seqCache) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { +// return c.getSeqs(ctx, conversationIDs, c.getMinSeqKey) +//} +// +//func (c *seqCache) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { +// return c.getSeq(ctx, conversationID, c.getMinSeqKey) +//} +// +//func (c *seqCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { +// return c.setSeqs(ctx, seqs, c.getMinSeqKey) +//} func (c *seqCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error { for conversationID, seq := range seqs { @@ -100,18 +122,6 @@ func (c *seqCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey fu return nil } -func (c *seqCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { - return c.setSeqs(ctx, seqs, c.getMinSeqKey) -} - -func (c *seqCache) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { - return c.getSeqs(ctx, conversationIDs, c.getMinSeqKey) -} - -func (c *seqCache) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { - return c.getSeq(ctx, conversationID, c.getMinSeqKey) -} - func (c *seqCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { val, err := c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64() if err != nil { diff --git a/pkg/common/storage/database/mgo/seq_user.go b/pkg/common/storage/database/mgo/seq_user.go new file mode 100644 index 000000000..4fa299a7a --- /dev/null +++ b/pkg/common/storage/database/mgo/seq_user.go @@ -0,0 +1,71 @@ +package mgo + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/tools/db/mongoutil" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func NewSeqUserMongo(db *mongo.Database) (database.SeqUser, error) { + coll := db.Collection(database.SeqConversationName) + _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ + Keys: bson.D{ + {Key: "user_id", Value: 1}, + {Key: "conversation_id", Value: 1}, + }, + }) + if err != nil { + return nil, err + } + return &seqUserMongo{coll: coll}, nil +} + +type seqUserMongo struct { + coll *mongo.Collection +} + +func (s *seqUserMongo) setSeq(ctx context.Context, userID string, conversationID string, seq int64, field string) error { + filter := map[string]any{ + "user_id": userID, + "conversation_id": conversationID, + } + update := map[string]any{ + "$set": map[string]any{"field": int64(0)}, + } + opt := options.Update().SetUpsert(true) + return mongoutil.UpdateOne(ctx, s.coll, filter, update, false, opt) +} + +func (s *seqUserMongo) GetMaxSeq(ctx context.Context, userID string, conversationID string) (int64, error) { + + //TODO implement me + panic("implement me") +} + +func (s *seqUserMongo) SetMaxSeq(ctx context.Context, userID string, conversationID string, seq int64) error { + //TODO implement me + panic("implement me") +} + +func (s *seqUserMongo) GetMinSeq(ctx context.Context, userID string, conversationID string) (int64, error) { + //TODO implement me + panic("implement me") +} + +func (s *seqUserMongo) SetMinSeq(ctx context.Context, userID string, conversationID string, seq int64) error { + //TODO implement me + panic("implement me") +} + +func (s *seqUserMongo) GetReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) { + //TODO implement me + panic("implement me") +} + +func (s *seqUserMongo) SetReadSeq(ctx context.Context, userID string, conversationID string, seq int64) error { + //TODO implement me + panic("implement me") +} diff --git a/pkg/common/storage/database/name.go b/pkg/common/storage/database/name.go index 68fa5af02..f47b7a6ee 100644 --- a/pkg/common/storage/database/name.go +++ b/pkg/common/storage/database/name.go @@ -15,4 +15,5 @@ const ( ObjectName = "s3" UserName = "user" SeqConversationName = "seq" + SeqUserName = "seq_user" ) diff --git a/pkg/common/storage/database/seq_user.go b/pkg/common/storage/database/seq_user.go new file mode 100644 index 000000000..3e9b29ec2 --- /dev/null +++ b/pkg/common/storage/database/seq_user.go @@ -0,0 +1,12 @@ +package database + +import "context" + +type SeqUser interface { + GetMaxSeq(ctx context.Context, userID string, conversationID string) (int64, error) + SetMaxSeq(ctx context.Context, userID string, conversationID string, seq int64) error + GetMinSeq(ctx context.Context, userID string, conversationID string) (int64, error) + SetMinSeq(ctx context.Context, userID string, conversationID string, seq int64) error + GetReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) + SetReadSeq(ctx context.Context, userID string, conversationID string, seq int64) error +} diff --git a/pkg/common/storage/model/seq_user.go b/pkg/common/storage/model/seq_user.go new file mode 100644 index 000000000..845996bb8 --- /dev/null +++ b/pkg/common/storage/model/seq_user.go @@ -0,0 +1,9 @@ +package model + +type SeqUser struct { + UserID string `bson:"user_id"` + ConversationID string `bson:"conversation_id"` + MinSeq int64 `bson:"min_seq"` + MaxSeq int64 `bson:"max_seq"` + ReadSeq int64 `bson:"read_seq"` +}