From d55d416f6867fd62645895a771ceeb4222e8e3bc Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Mon, 17 Jun 2024 17:41:28 +0800 Subject: [PATCH] seq --- pkg/common/storage/cache/redis/seq.md | 36 ++++++++++++ pkg/common/storage/cache/redis/seq1.go | 9 +++ pkg/common/storage/cache/redis/seq_test.go | 64 +++++++++++++++++++--- pkg/common/storage/controller/msg.go | 1 - 4 files changed, 102 insertions(+), 8 deletions(-) create mode 100644 pkg/common/storage/cache/redis/seq.md diff --git a/pkg/common/storage/cache/redis/seq.md b/pkg/common/storage/cache/redis/seq.md new file mode 100644 index 000000000..9aa66a326 --- /dev/null +++ b/pkg/common/storage/cache/redis/seq.md @@ -0,0 +1,36 @@ + +### mongo +```go +type Seq struct { + ConversationID string `bson:"conversation_id"` + MaxSeq int64 `bson:"max_seq"` + MinSeq int64 `bson:"min_seq"` +} +``` + +```go +type Seq interface { + Malloc(ctx context.Context, conversationID string, size int64) (int64, error) + GetMaxSeq(ctx context.Context, conversationID string) (int64, error) + GetMinSeq(ctx context.Context, conversationID string) (int64, error) + SetMinSeq(ctx context.Context, conversationID string, seq int64) error +} +``` + +1. Malloc 申请seq数量,返回的已有seq的最大值,消息用的第一个seq是返回值+1 +2. GetMaxSeq 获取申请的seq的最大值,在发消息的seq小于这个值 +3. GetMinSeq 获取最小的seq,用于拉取历史消息 +4. SetMinSeq 设置最小的seq,用于拉取历史消息 + +### redis +```go +type RedisSeq struct { + Curr int64 // 当前的最大seq + Last int64 // mongodb中申请的最大seq + Lock *int64 // 锁,用于在mongodb中申请seq +} +``` + +1. Malloc 申请seq数量,返回的已有seq的最大值,消息用的第一个seq是返回值+1,如果redis中申请数量够用,直接返回,并自增对应数量。如果redis中申请数量不够用,加锁,从mongodb中申请seq。 +2. GetMaxSeq 获取已发消息的最大seq就是Curr的值。如果redis中缓存不存在就通过mongodb获取最大seq。存储在redis中。其中Curr和Last都是这个seq值。 +3. GetMinSeq, SetMinSeq用之前rockscache的方案。 \ No newline at end of file diff --git a/pkg/common/storage/cache/redis/seq1.go b/pkg/common/storage/cache/redis/seq1.go index 6374f53cc..316dea487 100644 --- a/pkg/common/storage/cache/redis/seq1.go +++ b/pkg/common/storage/cache/redis/seq1.go @@ -47,6 +47,12 @@ func NewTestSeq() *SeqMalloc { } } +type RedisSeq struct { + Curr int64 + Last int64 + Lock *int64 +} + type SeqMalloc struct { rdb redis.UniversalClient mgo database.Seq @@ -118,6 +124,7 @@ end local curr_seq = tonumber(redis.call("HGET", key, "CURR")) local last_seq = tonumber(redis.call("HGET", key, "LAST")) if size == 0 then + redis.call("EXPIRE", key, dataSecond) table.insert(result, 0) table.insert(result, curr_seq) table.insert(result, last_seq) @@ -136,6 +143,7 @@ if max_seq > last_seq then return result end redis.call("HSET", key, "CURR", max_seq) +redis.call("EXPIRE", key, dataSecond) table.insert(result, 0) table.insert(result, curr_seq) table.insert(result, last_seq) @@ -219,6 +227,7 @@ func (s *SeqMalloc) Malloc(ctx context.Context, conversationID string, size int6 s.setSeqRetry(ctx, key, states[1], seq+size, seq+mallocSize) return seq, nil case 2: // locked + fmt.Println("locked----->", "conversationID", conversationID, "size", size) if err := s.wait(ctx); err != nil { return 0, err } diff --git a/pkg/common/storage/cache/redis/seq_test.go b/pkg/common/storage/cache/redis/seq_test.go index 1c11bad0b..6fff6019a 100644 --- a/pkg/common/storage/cache/redis/seq_test.go +++ b/pkg/common/storage/cache/redis/seq_test.go @@ -2,22 +2,72 @@ package redis import ( "context" + "strconv" + "sync" + "sync/atomic" "testing" "time" ) func TestSeq(t *testing.T) { ts := NewTestSeq() - for i := 1; i < 1000000; i++ { - var size int64 = 100 - first, err := ts.Malloc(context.Background(), "1", size) - if err != nil { - t.Logf("[%d] %s", i, err) + var ( + wg sync.WaitGroup + speed atomic.Int64 + ) + + const count = 256 + wg.Add(count) + for i := 0; i < count; i++ { + index := i + 1 + go func() { + defer wg.Done() + var size int64 = 1 + cID := strconv.Itoa(index * 100) + for i := 1; ; i++ { + first, err := ts.mgo.Malloc(context.Background(), cID, size) // mongo + //first, err := ts.Malloc(context.Background(), cID, size) // redis + if err != nil { + t.Logf("[%d-%d] %s %s", index, i, cID, err) + return + } + speed.Add(size) + _ = first + //t.Logf("[%d] %d -> %d", i, first+1, first+size) + } + }() + } + + done := make(chan struct{}) + + go func() { + wg.Wait() + close(done) + }() + + ticker := time.NewTicker(time.Second) + + for { + select { + case <-done: + ticker.Stop() return + case <-ticker.C: + value := speed.Swap(0) + t.Logf("speed: %d/s", value) } - t.Logf("[%d] %d -> %d", i, first+1, first+size) - time.Sleep(time.Second / 4) } + + //for i := 1; i < 1000000; i++ { + // var size int64 = 100 + // first, err := ts.Malloc(context.Background(), "1", size) + // if err != nil { + // t.Logf("[%d] %s", i, err) + // return + // } + // t.Logf("[%d] %d -> %d", i, first+1, first+size) + // time.Sleep(time.Second / 4) + //} } func TestDel(t *testing.T) { diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index f0efe5efd..c5c5462aa 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -349,7 +349,6 @@ func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conver } func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) { - // TODO set SEQ currentMaxSeq, err := db.seq.GetMaxSeq(ctx, conversationID) if err != nil && errs.Unwrap(err) != redis.Nil {