From e7b4a2431984f7cb54d08b3a60737b73b83f0815 Mon Sep 17 00:00:00 2001 From: icey-yu <1186114839@qq.com> Date: Wed, 3 Jul 2024 10:06:33 +0800 Subject: [PATCH] fix:get max seqs --- pkg/common/storage/cache/redis/seq.go | 4 +++- pkg/rpccache/conversation.go | 20 +++++++++++++------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/pkg/common/storage/cache/redis/seq.go b/pkg/common/storage/cache/redis/seq.go index 2a450ca4d..606361ea8 100644 --- a/pkg/common/storage/cache/redis/seq.go +++ b/pkg/common/storage/cache/redis/seq.go @@ -63,11 +63,13 @@ func (c *seqCache) getSeq(ctx context.Context, conversationID string, getkey fun func (c *seqCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) { m = make(map[string]int64, len(items)) + reverseMap := make(map[string]string, len(items)) var lock sync.Mutex keys := make([]string, len(items)) for i, v := range items { keys[i] = getkey(v) + reverseMap[getkey(v)] = v } manager := NewRedisShardManager(c.rdb) @@ -86,7 +88,7 @@ func (c *seqCache) getSeqs(ctx context.Context, items []string, getkey func(s st val := stringutil.StringToInt64(strRes) if val != 0 { lock.Lock() - m[items[i]] = val + m[reverseMap[keys[i]]] = val lock.Unlock() } } diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go index 1d40a649c..cd985cd88 100644 --- a/pkg/rpccache/conversation.go +++ b/pkg/rpccache/conversation.go @@ -29,8 +29,8 @@ import ( ) const ( - notificationWorkerCount = 2 - notificationBufferSize = 200 + notificationWorkerCount = 20 + notificationBufferSize = 20000 ) func NewConversationLocalCache(client rpcclient.ConversationRpcClient, localCache *config.LocalCache, cli redis.UniversalClient) *ConversationLocalCache { @@ -99,14 +99,17 @@ func (c *ConversationLocalCache) GetSingleConversationRecvMsgOpt(ctx context.Con } func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) { - conversations := make([]*pbconversation.Conversation, 0, len(conversationIDs)) + var ( + conversations = make([]*pbconversation.Conversation, 0, len(conversationIDs)) + errChan = make(chan error, 1) + conversationsChan = make(chan *pbconversation.Conversation, len(conversationIDs)) + wg sync.WaitGroup + ) - errChan := make(chan error, len(conversationIDs)) - conversationsChan := make(chan *pbconversation.Conversation, len(conversationIDs)) - var wg sync.WaitGroup wg.Add(len(conversationIDs)) for _, conversationID := range conversationIDs { + conversationID := conversationID err := c.queue.Push(func() { defer wg.Done() conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) @@ -114,7 +117,10 @@ func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUser if errs.ErrRecordNotFound.Is(err) { return } - errChan <- err + select { + case errChan <- err: + default: + } return } conversationsChan <- conversation