From ef71d0cf6247b386964dfdd2a6a8b4206cb759af Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 14 Jun 2024 10:54:24 +0800 Subject: [PATCH] seq --- pkg/common/storage/cache/cachekey/seq1.go | 19 ++ pkg/common/storage/cache/redis/seq1.go.txt | 246 +++++++++++++++++++++ pkg/common/storage/controller/msg.go | 42 ++-- pkg/common/storage/database/mgo/seq.go | 75 +++++++ pkg/common/storage/database/name.go | 1 + pkg/common/storage/database/seq.go | 10 + pkg/common/storage/model/seq.go | 7 + pkg/msgprocessor/conversation.go | 4 + 8 files changed, 384 insertions(+), 20 deletions(-) create mode 100644 pkg/common/storage/cache/cachekey/seq1.go create mode 100644 pkg/common/storage/cache/redis/seq1.go.txt create mode 100644 pkg/common/storage/database/mgo/seq.go create mode 100644 pkg/common/storage/database/seq.go create mode 100644 pkg/common/storage/model/seq.go diff --git a/pkg/common/storage/cache/cachekey/seq1.go b/pkg/common/storage/cache/cachekey/seq1.go new file mode 100644 index 000000000..df3086d32 --- /dev/null +++ b/pkg/common/storage/cache/cachekey/seq1.go @@ -0,0 +1,19 @@ +package cachekey + +const ( + MallocSeq = "MALLOC_SEQ:" + MallocSeqLock = "MALLOC_SEQ_LOCK:" + MallocMinSeqLock = "MALLOC_MIN_SEQ:" +) + +func GetMallocSeqKey(conversationID string) string { + return MallocSeq + conversationID +} + +func GetMallocSeqLockKey(conversationID string) string { + return MallocSeqLock + conversationID +} + +func GetMallocMinSeqKey(conversationID string) string { + return MallocMinSeqLock + conversationID +} diff --git a/pkg/common/storage/cache/redis/seq1.go.txt b/pkg/common/storage/cache/redis/seq1.go.txt new file mode 100644 index 000000000..9ccb83e66 --- /dev/null +++ b/pkg/common/storage/cache/redis/seq1.go.txt @@ -0,0 +1,246 @@ +package redis + +import ( + "context" + "errors" + "fmt" + "github.com/dtm-labs/rockscache" + "github.com/google/uuid" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" + "github.com/openimsdk/tools/errs" + "github.com/redis/go-redis/v9" + "strconv" + "time" +) + +var errLock = errors.New("lock failed") + +type MallocSeq interface { + Malloc(ctx context.Context, conversationID string, size int64) ([]int64, error) + GetMaxSeq(ctx context.Context, conversationID string) (int64, error) + GetMinSeq(ctx context.Context, conversationID string) (int64, error) + SetMinSeq(ctx context.Context, conversationID string, seq int64) error +} + +func NewSeqCache1(rdb redis.UniversalClient, mgo database.Seq) MallocSeq { + opt := rockscache.NewDefaultOptions() + opt.EmptyExpire = time.Second * 3 + opt.Delay = time.Second / 2 + return &seqCache1{ + rdb: rdb, + mgo: mgo, + rocks: rockscache.NewClient(rdb, opt), + lockExpire: time.Minute * 1, + seqExpire: time.Hour * 24 * 7, + minSeqExpire: time.Hour * 1, + groupMinNum: 1000, + userMinNum: 100, + } +} + +type seqCache1 struct { + rdb redis.UniversalClient + rocks *rockscache.Client + mgo database.Seq + lockExpire time.Duration + seqExpire time.Duration + minSeqExpire time.Duration + groupMinNum int64 + userMinNum int64 +} + +/* +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +*/ + +func (s *seqCache1) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { + for i := 0; i < 10; i++ { + res, err := s.rdb.LIndex(ctx, cachekey.GetMallocSeqKey(conversationID), 0).Int64() + if err == nil { + return res, nil + } else if !errors.Is(err, redis.Nil) { + return 0, errs.Wrap(err) + } + + if err := s.mallocSeq(ctx, conversationID, 0, nil); err != nil { + return 0, err + } + } + return 0, errs.New("get max seq timeout") +} + +func (s *seqCache1) unlock(ctx context.Context, key string, owner string) error { + script := ` +local value = redis.call("GET", KEYS[1]) +if value == false then + return 0 +end +if value == ARGV[1] then + redis.call("DEL", KEYS[1]) + return 1 +end +return 2 +` + state, err := s.rdb.Eval(ctx, script, []string{key}, owner).Int() + if err != nil { + return errs.Wrap(err) + } + switch state { + case 0: + return errs.Wrap(redis.Nil) + case 1: + return nil + case 2: + return errs.New("not the lock holder") + default: + return errs.New(fmt.Sprintf("unknown state: %d", state)) + } +} + +func (s *seqCache1) initMallocSeq(ctx context.Context, conversationID string, size int64) ([]int64, error) { + owner := uuid.New().String() + ok, err := s.rdb.SetNX(ctx, cachekey.GetMallocSeqLockKey(conversationID), owner, s.lockExpire).Result() + if err != nil { + return nil, err + } + + seq, err := s.mgo.Malloc(ctx, conversationID, size) + if err != nil { + return nil, err + } + seqs := make([]int64, 0, size) + for i := seq - size + 1; i <= seq; i++ { + seqs = append(seqs, i) + } + return seqs, nil +} + +func (s *seqCache1) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { + return getCache[int64](ctx, s.rocks, cachekey.GetMallocMinSeqKey(conversationID), s.minSeqExpire, func(ctx context.Context) (int64, error) { + return s.mgo.GetMinSeq(ctx, conversationID) + }) +} + +func (s *seqCache1) SetMinSeq(ctx context.Context, conversationID string, seq int64) error { + if err := s.mgo.SetMinSeq(ctx, conversationID, seq); err != nil { + return err + } + return s.deleteMinSeqCache(ctx, conversationID) +} + +func (s *seqCache1) Malloc(ctx context.Context, conversationID string, size int64) ([]int64, error) { + if size <= 0 { + return nil, errs.Wrap(errors.New("size must be greater than 0")) + } + seqKey := cachekey.GetMallocSeqKey(conversationID) + lockKey := cachekey.GetMallocSeqLockKey(conversationID) + for i := 0; i < 10; i++ { + seqs, err := s.lpop(ctx, seqKey, lockKey, size) + if err != nil { + return nil, err + } + if len(seqs) < int(size) { + if err := s.mallocSeq(ctx, conversationID, size, &seqs); err != nil { + return nil, err + } + } + if len(seqs) >= int(size) { + return seqs, nil + } + } + return nil, errs.ErrInternalServer.WrapMsg("malloc seq failed") +} + +func (s *seqCache1) push(ctx context.Context, seqKey string, seqs []int64) error { + script := ` +redis.call("DEL", KEYS[1]) +for i = 2, #ARGV do + redis.call("RPUSH", KEYS[1], ARGV[i]) +end +redis.call("EXPIRE", KEYS[1], ARGV[1]) +return 1 +` + argv := make([]any, 0, 1+len(seqs)) + argv = append(argv, s.seqExpire.Seconds()) + for _, seq := range seqs { + argv = append(argv, seq) + } + err := s.rdb.Eval(ctx, script, []string{seqKey}, argv...).Err() + return errs.Wrap(err) +} + +func (s *seqCache1) lpop(ctx context.Context, seqKey, lockKey string, size int64) ([]int64, error) { + script := ` +local result = redis.call("LRANGE", KEYS[1], 0, ARGV[1]-1) +if #result == 0 then + return result +end +redis.call("LTRIM", KEYS[1], #result, -1) +if redis.call("LLEN", KEYS[1]) == 0 then + redis.call("DEL", KEYS[2]) +end +return result +` + res, err := s.rdb.Eval(ctx, script, []string{seqKey, lockKey}, size).Int64Slice() + if err != nil { + return nil, errs.Wrap(err) + } + return res, nil +} + +func (s *seqCache1) getMongoStepSize(conversationID string, size int64) int64 { + var num int64 + if msgprocessor.IsGroupConversationID(conversationID) { + num = s.groupMinNum + } else { + num = s.userMinNum + } + if size > num { + num += size + } + return num +} + +func (s *seqCache1) mallocSeq(ctx context.Context, conversationID string, size int64, seqs *[]int64) error { + var delMinSeqKey bool + _, err := getCache[string](ctx, s.rocks, cachekey.GetMallocSeqLockKey(conversationID), s.lockExpire, func(ctx context.Context) (string, error) { + res, err := s.mgo.Malloc(ctx, conversationID, s.getMongoStepSize(conversationID, size)) + if err != nil { + return "", err + } + delMinSeqKey = res[0] == 1 + if seqs != nil && size > 0 { + if len(*seqs) > 0 && (*seqs)[len(*seqs)-1]+1 == res[0] { + n := size - int64(len(*seqs)) + *seqs = append(*seqs, res[:n]...) + res = res[n:] + } else { + *seqs = res[:size] + res = res[size:] + } + } + if err := s.push(ctx, cachekey.GetMallocSeqKey(conversationID), res); err != nil { + return "", err + } + return strconv.Itoa(int(time.Now().UnixMicro())), nil + }) + if delMinSeqKey { + s.deleteMinSeqCache(ctx, conversationID) + } + return err +} + +func (s *seqCache1) deleteMinSeqCache(ctx context.Context, conversationID string) error { + return s.rocks.TagAsDeleted2(ctx, cachekey.GetMallocMinSeqKey(conversationID)) +} diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index 8eb9e8e6f..f0efe5efd 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -69,26 +69,26 @@ type CommonMsgDatabase interface { DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error // DeleteMsgsPhysicalBySeqs physically deletes messages by emptying them based on sequence numbers. DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, seqs []int64) error - SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error + //SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) - SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error + //SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error SetMinSeqs(ctx context.Context, seqs map[string]int64) error - GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) - GetMinSeq(ctx context.Context, conversationID string) (int64, error) - GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) - GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) - SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error - SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) + //GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) + //GetMinSeq(ctx context.Context, conversationID string) (int64, error) + //GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) + //GetConversationUserMinSeqs(ctx context.Context, conversationID string, userIDs []string) (map[string]int64, error) + //SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error + //SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error - GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) - GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) + //GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) + //GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) SetSendMsgStatus(ctx context.Context, id string, status int32) error GetSendMsgStatus(ctx context.Context, id string) (int32, error) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error) @@ -349,6 +349,8 @@ func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conver } func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { + + // TODO set SEQ currentMaxSeq, err := db.seq.GetMaxSeq(ctx, conversationID) if err != nil && errs.Unwrap(err) != redis.Nil { log.ZError(ctx, "storage.seq.GetMaxSeq", err) @@ -817,9 +819,9 @@ func (db *commonMsgDatabase) CleanUpUserConversationsMsgs(ctx context.Context, u } } -func (db *commonMsgDatabase) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error { - return db.seq.SetMaxSeq(ctx, conversationID, maxSeq) -} +//func (db *commonMsgDatabase) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error { +// return db.seq.SetMaxSeq(ctx, conversationID, maxSeq) +//} func (db *commonMsgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { return db.seq.GetMaxSeqs(ctx, conversationIDs) @@ -837,13 +839,13 @@ func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int return db.seq.SetMinSeqs(ctx, seqs) } -func (db *commonMsgDatabase) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { - return db.seq.GetMinSeqs(ctx, conversationIDs) -} - -func (db *commonMsgDatabase) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { - return db.seq.GetMinSeq(ctx, conversationID) -} +//func (db *commonMsgDatabase) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { +// return db.seq.GetMinSeqs(ctx, conversationIDs) +//} +// +//func (db *commonMsgDatabase) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { +// return db.seq.GetMinSeq(ctx, conversationID) +//} func (db *commonMsgDatabase) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { return db.seq.GetConversationUserMinSeq(ctx, conversationID, userID) diff --git a/pkg/common/storage/database/mgo/seq.go b/pkg/common/storage/database/mgo/seq.go new file mode 100644 index 000000000..8592bf63c --- /dev/null +++ b/pkg/common/storage/database/mgo/seq.go @@ -0,0 +1,75 @@ +package mgo + +import ( + "context" + "errors" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/tools/db/mongoutil" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func NewSeqMongo(db *mongo.Database) (*SeqMongo, error) { + coll := db.Collection("seq") + return &SeqMongo{coll: coll}, nil +} + +type SeqMongo struct { + coll *mongo.Collection +} + +func (s *SeqMongo) MallocSeq(ctx context.Context, conversationID string, size int64) (int64, error) { + if size <= 0 { + return 0, errors.New("size must be greater than 0") + } + filter := map[string]any{"conversation_id": conversationID} + update := map[string]any{ + "$inc": map[string]any{"max_seq": size}, + "$set": map[string]any{"min_seq": 1}, + } + opt := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After).SetProjection(map[string]any{"_id": 0, "max_seq": 1}) + return mongoutil.FindOneAndUpdate[int64](ctx, s.coll, filter, update, opt) +} + +func (s *SeqMongo) Malloc(ctx context.Context, conversationID string, size int64) ([]int64, error) { + seq, err := s.MallocSeq(ctx, conversationID, size) + if err != nil { + return nil, err + } + seqs := make([]int64, 0, size) + for i := seq - size + 1; i <= seq; i++ { + seqs = append(seqs, i) + } + return seqs, nil +} + +func (s *SeqMongo) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { + seq, err := mongoutil.FindOne[int64](ctx, s.coll, bson.M{"conversation_id": conversationID}, options.FindOne().SetProjection(map[string]any{"_id": 0, "max_seq": 1})) + if err == nil { + return seq, nil + } else if IsNotFound(err) { + return 0, nil + } else { + return 0, err + } +} + +func (s *SeqMongo) GetMinSeq(ctx context.Context, conversationID string) (int64, error) { + seq, err := mongoutil.FindOne[int64](ctx, s.coll, bson.M{"conversation_id": conversationID}, options.FindOne().SetProjection(map[string]any{"_id": 0, "min_seq": 1})) + if err == nil { + return seq, nil + } else if IsNotFound(err) { + return 0, nil + } else { + return 0, err + } +} + +func (s *SeqMongo) SetMinSeq(ctx context.Context, conversationID string, seq int64) error { + return mongoutil.UpdateOne(ctx, s.coll, bson.M{"conversation_id": conversationID}, bson.M{"$set": bson.M{"min_seq": seq}}, false) +} + +func (s *SeqMongo) GetConversation(ctx context.Context, conversationID string) (*model.Seq, error) { + return mongoutil.FindOne[*model.Seq](ctx, s.coll, bson.M{"conversation_id": conversationID}) +} diff --git a/pkg/common/storage/database/name.go b/pkg/common/storage/database/name.go index 986f22a1a..f496d4b41 100644 --- a/pkg/common/storage/database/name.go +++ b/pkg/common/storage/database/name.go @@ -14,4 +14,5 @@ const ( LogName = "log" ObjectName = "s3" UserName = "user" + SeqName = "seq" ) diff --git a/pkg/common/storage/database/seq.go b/pkg/common/storage/database/seq.go new file mode 100644 index 000000000..869b607a5 --- /dev/null +++ b/pkg/common/storage/database/seq.go @@ -0,0 +1,10 @@ +package database + +import "context" + +type Seq interface { + Malloc(ctx context.Context, conversationID string, size int64) ([]int64, error) + GetMaxSeq(ctx context.Context, conversationID string) (int64, error) + GetMinSeq(ctx context.Context, conversationID string) (int64, error) + SetMinSeq(ctx context.Context, conversationID string, seq int64) error +} diff --git a/pkg/common/storage/model/seq.go b/pkg/common/storage/model/seq.go new file mode 100644 index 000000000..3db009167 --- /dev/null +++ b/pkg/common/storage/model/seq.go @@ -0,0 +1,7 @@ +package model + +type Seq struct { + ConversationID string `bson:"conversation_id"` + MaxSeq int64 `bson:"max_seq"` + MinSeq int64 `bson:"min_seq"` +} diff --git a/pkg/msgprocessor/conversation.go b/pkg/msgprocessor/conversation.go index b369269cc..f8140cc7d 100644 --- a/pkg/msgprocessor/conversation.go +++ b/pkg/msgprocessor/conversation.go @@ -24,6 +24,10 @@ import ( "google.golang.org/protobuf/proto" ) +func IsGroupConversationID(conversationID string) bool { + return strings.HasPrefix(conversationID, "g_") || strings.HasPrefix(conversationID, "sg_") +} + func GetNotificationConversationIDByMsg(msg *sdkws.MsgData) string { switch msg.SessionType { case constant.SingleChatType: