Optimize get conversation seq (#2387)

* feat:optimize GetConversationsHasReadAndMaxSeq

* fix:get max seqs

* fix:get max seqs

* fix:get max seqs

* fix:get max seqs
pull/2396/head
icey-yu 5 days ago committed by GitHub
parent 88c0d5f5ad
commit 95df4194ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -16,11 +16,13 @@ package redis
import (
"context"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/stringutil"
"github.com/redis/go-redis/v9"
"sync"
)
func NewSeqCache(rdb redis.UniversalClient) cache.SeqCache {
@ -61,15 +63,40 @@ func (c *seqCache) getSeq(ctx context.Context, conversationID string, getkey fun
func (c *seqCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) {
m = make(map[string]int64, len(items))
var (
reverseMap = make(map[string]string, len(items))
keys = make([]string, len(items))
lock sync.Mutex
)
for i, v := range items {
res, err := c.rdb.Get(ctx, getkey(v)).Result()
if err != nil && err != redis.Nil {
return nil, errs.Wrap(err)
keys[i] = getkey(v)
reverseMap[getkey(v)] = v
}
manager := NewRedisShardManager(c.rdb)
if err = manager.ProcessKeysBySlot(ctx, keys, func(ctx context.Context, _ int64, keys []string) error {
res, err := c.rdb.MGet(ctx, keys...).Result()
if err != nil && !errors.Is(err, redis.Nil) {
return errs.Wrap(err)
}
val := stringutil.StringToInt64(res)
if val != 0 {
m[items[i]] = val
// len(res) <= len(items)
for i := range res {
strRes, ok := res[i].(string)
if !ok {
continue
}
val := stringutil.StringToInt64(strRes)
if val != 0 {
lock.Lock()
m[reverseMap[keys[i]]] = val
lock.Unlock()
}
}
return nil
}); err != nil {
return nil, err
}
return m, nil

@ -16,15 +16,19 @@ package rpccache
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
)
const (
conversationWorkerCount = 20
)
func NewConversationLocalCache(client rpcclient.ConversationRpcClient, localCache *config.LocalCache, cli redis.UniversalClient) *ConversationLocalCache {
@ -90,15 +94,33 @@ func (c *ConversationLocalCache) GetSingleConversationRecvMsgOpt(ctx context.Con
}
func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) {
conversations := make([]*pbconversation.Conversation, 0, len(conversationIDs))
var (
conversations = make([]*pbconversation.Conversation, 0, len(conversationIDs))
conversationsChan = make(chan *pbconversation.Conversation, len(conversationIDs))
)
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(conversationWorkerCount)
for _, conversationID := range conversationIDs {
conversation, err := c.GetConversation(ctx, ownerUserID, conversationID)
if err != nil {
if errs.ErrRecordNotFound.Is(err) {
continue
conversationID := conversationID
g.Go(func() error {
conversation, err := c.GetConversation(ctx, ownerUserID, conversationID)
if err != nil {
if errs.ErrRecordNotFound.Is(err) {
return nil
}
return err
}
return nil, err
}
conversationsChan <- conversation
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
close(conversationsChan)
for conversation := range conversationsChan {
conversations = append(conversations, conversation)
}
return conversations, nil

Loading…
Cancel
Save