diff --git a/pkg/common/storage/cache/redis/seq.go b/pkg/common/storage/cache/redis/seq.go index c15eb6647..2a450ca4d 100644 --- a/pkg/common/storage/cache/redis/seq.go +++ b/pkg/common/storage/cache/redis/seq.go @@ -22,6 +22,7 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/utils/stringutil" "github.com/redis/go-redis/v9" + "sync" ) func NewSeqCache(rdb redis.UniversalClient) cache.SeqCache { @@ -62,27 +63,38 @@ func (c *seqCache) getSeq(ctx context.Context, conversationID string, getkey fun func (c *seqCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) { m = make(map[string]int64, len(items)) + var lock sync.Mutex + keys := make([]string, len(items)) for i, v := range items { keys[i] = getkey(v) } - res, err := c.rdb.MGet(ctx, keys...).Result() - if err != nil && !errors.Is(err, redis.Nil) { - return nil, errs.Wrap(err) - } - - // len(res) == len(items) - for i := range res { - strRes, ok := res[i].(string) - if !ok { - continue + manager := NewRedisShardManager(c.rdb) + if err = manager.ProcessKeysBySlot(ctx, keys, func(ctx context.Context, _ int64, keys []string) error { + res, err := c.rdb.MGet(ctx, keys...).Result() + if err != nil && !errors.Is(err, redis.Nil) { + return errs.Wrap(err) } - val := stringutil.StringToInt64(strRes) - if val != 0 { - m[items[i]] = val + + // len(res) <= len(items) + for i := range res { + strRes, ok := res[i].(string) + if !ok { + continue + } + val := stringutil.StringToInt64(strRes) + if val != 0 { + lock.Lock() + m[items[i]] = val + lock.Unlock() + } } + return nil + }); err != nil { + return nil, err } + return m, nil }