fix:get max seqs

pull/2387/head
icey-yu 1 year ago
parent e595bb0230
commit e7b4a24319

@ -63,11 +63,13 @@ func (c *seqCache) getSeq(ctx context.Context, conversationID string, getkey fun
func (c *seqCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) { func (c *seqCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) {
m = make(map[string]int64, len(items)) m = make(map[string]int64, len(items))
reverseMap := make(map[string]string, len(items))
var lock sync.Mutex var lock sync.Mutex
keys := make([]string, len(items)) keys := make([]string, len(items))
for i, v := range items { for i, v := range items {
keys[i] = getkey(v) keys[i] = getkey(v)
reverseMap[getkey(v)] = v
} }
manager := NewRedisShardManager(c.rdb) manager := NewRedisShardManager(c.rdb)
@ -86,7 +88,7 @@ func (c *seqCache) getSeqs(ctx context.Context, items []string, getkey func(s st
val := stringutil.StringToInt64(strRes) val := stringutil.StringToInt64(strRes)
if val != 0 { if val != 0 {
lock.Lock() lock.Lock()
m[items[i]] = val m[reverseMap[keys[i]]] = val
lock.Unlock() lock.Unlock()
} }
} }

@ -29,8 +29,8 @@ import (
) )
const ( const (
notificationWorkerCount = 2 notificationWorkerCount = 20
notificationBufferSize = 200 notificationBufferSize = 20000
) )
func NewConversationLocalCache(client rpcclient.ConversationRpcClient, localCache *config.LocalCache, cli redis.UniversalClient) *ConversationLocalCache { func NewConversationLocalCache(client rpcclient.ConversationRpcClient, localCache *config.LocalCache, cli redis.UniversalClient) *ConversationLocalCache {
@ -99,14 +99,17 @@ func (c *ConversationLocalCache) GetSingleConversationRecvMsgOpt(ctx context.Con
} }
func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) { func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) {
conversations := make([]*pbconversation.Conversation, 0, len(conversationIDs)) var (
conversations = make([]*pbconversation.Conversation, 0, len(conversationIDs))
errChan = make(chan error, 1)
conversationsChan = make(chan *pbconversation.Conversation, len(conversationIDs))
wg sync.WaitGroup
)
errChan := make(chan error, len(conversationIDs))
conversationsChan := make(chan *pbconversation.Conversation, len(conversationIDs))
var wg sync.WaitGroup
wg.Add(len(conversationIDs)) wg.Add(len(conversationIDs))
for _, conversationID := range conversationIDs { for _, conversationID := range conversationIDs {
conversationID := conversationID
err := c.queue.Push(func() { err := c.queue.Push(func() {
defer wg.Done() defer wg.Done()
conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) conversation, err := c.GetConversation(ctx, ownerUserID, conversationID)
@ -114,7 +117,10 @@ func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUser
if errs.ErrRecordNotFound.Is(err) { if errs.ErrRecordNotFound.Is(err) {
return return
} }
errChan <- err select {
case errChan <- err:
default:
}
return return
} }
conversationsChan <- conversation conversationsChan <- conversation

Loading…
Cancel
Save