diff --git a/pkg/common/storage/cache/redis/seq.go b/pkg/common/storage/cache/redis/seq.go index 76dd921a5..09ad5b609 100644 --- a/pkg/common/storage/cache/redis/seq.go +++ b/pkg/common/storage/cache/redis/seq.go @@ -16,11 +16,13 @@ package redis import ( "context" + "errors" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/utils/stringutil" "github.com/redis/go-redis/v9" + "sync" ) func NewSeqCache(rdb redis.UniversalClient) cache.SeqCache { @@ -61,15 +63,40 @@ func (c *seqCache) getSeq(ctx context.Context, conversationID string, getkey fun func (c *seqCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) { m = make(map[string]int64, len(items)) + var ( + reverseMap = make(map[string]string, len(items)) + keys = make([]string, len(items)) + lock sync.Mutex + ) + for i, v := range items { - res, err := c.rdb.Get(ctx, getkey(v)).Result() - if err != nil && err != redis.Nil { - return nil, errs.Wrap(err) + keys[i] = getkey(v) + reverseMap[getkey(v)] = v + } + + manager := NewRedisShardManager(c.rdb) + if err = manager.ProcessKeysBySlot(ctx, keys, func(ctx context.Context, _ int64, keys []string) error { + res, err := c.rdb.MGet(ctx, keys...).Result() + if err != nil && !errors.Is(err, redis.Nil) { + return errs.Wrap(err) } - val := stringutil.StringToInt64(res) - if val != 0 { - m[items[i]] = val + + // len(res) <= len(items) + for i := range res { + strRes, ok := res[i].(string) + if !ok { + continue + } + val := stringutil.StringToInt64(strRes) + if val != 0 { + lock.Lock() + m[reverseMap[keys[i]]] = val + lock.Unlock() + } } + return nil + }); err != nil { + return nil, err } return m, nil diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go index 4c00dd1f7..0109f1b1d 100644 --- a/pkg/rpccache/conversation.go +++ b/pkg/rpccache/conversation.go @@ -16,15 +16,19 @@ package rpccache import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/redis/go-redis/v9" + "golang.org/x/sync/errgroup" +) + +const ( + conversationWorkerCount = 20 ) func NewConversationLocalCache(client rpcclient.ConversationRpcClient, localCache *config.LocalCache, cli redis.UniversalClient) *ConversationLocalCache { @@ -90,15 +94,33 @@ func (c *ConversationLocalCache) GetSingleConversationRecvMsgOpt(ctx context.Con } func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) { - conversations := make([]*pbconversation.Conversation, 0, len(conversationIDs)) + var ( + conversations = make([]*pbconversation.Conversation, 0, len(conversationIDs)) + conversationsChan = make(chan *pbconversation.Conversation, len(conversationIDs)) + ) + + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(conversationWorkerCount) + for _, conversationID := range conversationIDs { - conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) - if err != nil { - if errs.ErrRecordNotFound.Is(err) { - continue + conversationID := conversationID + g.Go(func() error { + conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) + if err != nil { + if errs.ErrRecordNotFound.Is(err) { + return nil + } + return err } - return nil, err - } + conversationsChan <- conversation + return nil + }) + } + if err := g.Wait(); err != nil { + return nil, err + } + close(conversationsChan) + for conversation := range conversationsChan { conversations = append(conversations, conversation) } return conversations, nil