From 3ae353bfd7e44c20077067cfc56d88458a4685b5 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 25 Sep 2024 17:54:48 +0800 Subject: [PATCH] feat: jssdk GetConversations, GetActiveConversation --- .../storage/cache/redis/seq_conversation.go | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/pkg/common/storage/cache/redis/seq_conversation.go b/pkg/common/storage/cache/redis/seq_conversation.go index ad02a3342..71705cef7 100644 --- a/pkg/common/storage/cache/redis/seq_conversation.go +++ b/pkg/common/storage/cache/redis/seq_conversation.go @@ -207,7 +207,7 @@ func (s *seqConversationCacheRedis) getSeqMallocKey(conversationID string) strin return cachekey.GetMallocSeqKey(conversationID) } -func (s *seqConversationCacheRedis) setSeq(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) (int64, error) { +func (s *seqConversationCacheRedis) setSeq(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64, mill int64) (int64, error) { if lastSeq < currSeq { return 0, errs.New("lastSeq must be greater than currSeq") } @@ -220,8 +220,9 @@ local lockValue = ARGV[1] local dataSecond = ARGV[2] local curr_seq = tonumber(ARGV[3]) local last_seq = tonumber(ARGV[4]) +local mallocTime = ARGV[5] if redis.call("EXISTS", key) == 0 then - redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq) + redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq, "TIME", mallocTime) redis.call("EXPIRE", key, dataSecond) return 1 end @@ -229,11 +230,11 @@ if redis.call("HGET", key, "LOCK") ~= lockValue then return 2 end redis.call("HDEL", key, "LOCK") -redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq) +redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq, "TIME", mallocTime) redis.call("EXPIRE", key, dataSecond) return 0 ` - result, err := s.rdb.Eval(ctx, script, []string{key}, owner, int64(s.dataTime/time.Second), currSeq, lastSeq).Int64() + result, err := s.rdb.Eval(ctx, script, []string{key}, owner, int64(s.dataTime/time.Second), currSeq, lastSeq, mill).Int64() if err != nil { return 0, errs.Wrap(err) } @@ -259,6 +260,7 @@ if redis.call("EXISTS", key) == 0 then redis.call("EXPIRE", key, lockSecond) table.insert(result, 1) table.insert(result, lockValue) + table.insert(result, mallocTime) return result end if redis.call("HEXISTS", key, "LOCK") == 1 then @@ -321,9 +323,9 @@ func (s *seqConversationCacheRedis) wait(ctx context.Context) error { } } -func (s *seqConversationCacheRedis) setSeqRetry(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) { +func (s *seqConversationCacheRedis) setSeqRetry(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64, mill int64) { for i := 0; i < 10; i++ { - state, err := s.setSeq(ctx, key, owner, currSeq, lastSeq) + state, err := s.setSeq(ctx, key, owner, currSeq, lastSeq, mill) if err != nil { log.ZError(ctx, "set seq cache failed", err, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq, "count", i+1) if err := s.wait(ctx); err != nil { @@ -383,7 +385,7 @@ func (s *seqConversationCacheRedis) mallocTime(ctx context.Context, conversation if err != nil { return 0, 0, err } - s.setSeqRetry(ctx, key, states[1], seq+size, seq+mallocSize) + s.setSeqRetry(ctx, key, states[1], seq+size, seq+mallocSize, states[2]) return seq, 0, nil case 2: // locked if err := s.wait(ctx); err != nil { @@ -393,18 +395,19 @@ func (s *seqConversationCacheRedis) mallocTime(ctx context.Context, conversation case 3: // exceeded cache max value currSeq := states[1] lastSeq := states[2] + mill := states[4] mallocSize := s.getMallocSize(conversationID, size) seq, err := s.mgo.Malloc(ctx, conversationID, mallocSize) if err != nil { return 0, 0, err } if lastSeq == seq { - s.setSeqRetry(ctx, key, states[3], currSeq+size, seq+mallocSize) + s.setSeqRetry(ctx, key, states[3], currSeq+size, seq+mallocSize, mill) return currSeq, states[4], nil } else { log.ZWarn(ctx, "malloc seq not equal cache last seq", nil, "conversationID", conversationID, "currSeq", currSeq, "lastSeq", lastSeq, "mallocSeq", seq) - s.setSeqRetry(ctx, key, states[3], seq+size, seq+mallocSize) - return seq, states[4], nil + s.setSeqRetry(ctx, key, states[3], seq+size, seq+mallocSize, mill) + return seq, mill, nil } default: log.ZError(ctx, "malloc seq unknown state", nil, "state", states[0], "conversationID", conversationID, "size", size)