diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go index cd985cd88..0109f1b1d 100644 --- a/pkg/rpccache/conversation.go +++ b/pkg/rpccache/conversation.go @@ -23,14 +23,12 @@ import ( pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mq/memamq" "github.com/redis/go-redis/v9" - "sync" + "golang.org/x/sync/errgroup" ) const ( - notificationWorkerCount = 20 - notificationBufferSize = 20000 + conversationWorkerCount = 20 ) func NewConversationLocalCache(client rpcclient.ConversationRpcClient, localCache *config.LocalCache, cli redis.UniversalClient) *ConversationLocalCache { @@ -45,7 +43,6 @@ func NewConversationLocalCache(client rpcclient.ConversationRpcClient, localCach localcache.WithLocalSuccessTTL(lc.Success()), localcache.WithLocalFailedTTL(lc.Failed()), ), - queue: memamq.NewMemoryQueue(notificationWorkerCount, notificationBufferSize), } if lc.Enable() { go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal) @@ -56,8 +53,6 @@ func NewConversationLocalCache(client rpcclient.ConversationRpcClient, localCach type ConversationLocalCache struct { client rpcclient.ConversationRpcClient local localcache.Cache[any] - - queue *memamq.MemoryQueue } func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUserID string) (val []string, err error) { @@ -101,48 +96,33 @@ func (c *ConversationLocalCache) GetSingleConversationRecvMsgOpt(ctx context.Con func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) { var ( conversations = make([]*pbconversation.Conversation, 0, len(conversationIDs)) - errChan = make(chan error, 1) conversationsChan = make(chan *pbconversation.Conversation, len(conversationIDs)) - wg sync.WaitGroup ) - wg.Add(len(conversationIDs)) + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(conversationWorkerCount) for _, conversationID := range conversationIDs { conversationID := conversationID - err := c.queue.Push(func() { - defer wg.Done() + g.Go(func() error { conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) if err != nil { if errs.ErrRecordNotFound.Is(err) { - return - } - select { - case errChan <- err: - default: + return nil } - return + return err } conversationsChan <- conversation + return nil }) - if err != nil { - // push err - return nil, errs.Wrap(err) - } } - wg.Wait() - close(errChan) - close(conversationsChan) - - err, ok := <-errChan - if ok { + if err := g.Wait(); err != nil { return nil, err } - + close(conversationsChan) for conversation := range conversationsChan { conversations = append(conversations, conversation) } - return conversations, nil }