From 95df4194ca47a0f99e0f0bbf06a9263965c93d5a Mon Sep 17 00:00:00 2001 From: icey-yu <119291641+icey-yu@users.noreply.github.com> Date: Wed, 3 Jul 2024 14:25:10 +0800 Subject: [PATCH 1/3] Optimize get conversation seq (#2387) * feat:optimize GetConversationsHasReadAndMaxSeq * fix:get max seqs * fix:get max seqs * fix:get max seqs * fix:get max seqs --- pkg/common/storage/cache/redis/seq.go | 39 ++++++++++++++++++++++---- pkg/rpccache/conversation.go | 40 +++++++++++++++++++++------ 2 files changed, 64 insertions(+), 15 deletions(-) diff --git a/pkg/common/storage/cache/redis/seq.go b/pkg/common/storage/cache/redis/seq.go index 76dd921a5..09ad5b609 100644 --- a/pkg/common/storage/cache/redis/seq.go +++ b/pkg/common/storage/cache/redis/seq.go @@ -16,11 +16,13 @@ package redis import ( "context" + "errors" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/utils/stringutil" "github.com/redis/go-redis/v9" + "sync" ) func NewSeqCache(rdb redis.UniversalClient) cache.SeqCache { @@ -61,15 +63,40 @@ func (c *seqCache) getSeq(ctx context.Context, conversationID string, getkey fun func (c *seqCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) { m = make(map[string]int64, len(items)) + var ( + reverseMap = make(map[string]string, len(items)) + keys = make([]string, len(items)) + lock sync.Mutex + ) + for i, v := range items { - res, err := c.rdb.Get(ctx, getkey(v)).Result() - if err != nil && err != redis.Nil { - return nil, errs.Wrap(err) + keys[i] = getkey(v) + reverseMap[getkey(v)] = v + } + + manager := NewRedisShardManager(c.rdb) + if err = manager.ProcessKeysBySlot(ctx, keys, func(ctx context.Context, _ int64, keys []string) error { + res, err := c.rdb.MGet(ctx, keys...).Result() + if err != nil && !errors.Is(err, redis.Nil) { + return errs.Wrap(err) } - val := stringutil.StringToInt64(res) - if val != 0 { - m[items[i]] = val + + // len(res) <= len(items) + for i := range res { + strRes, ok := res[i].(string) + if !ok { + continue + } + val := stringutil.StringToInt64(strRes) + if val != 0 { + lock.Lock() + m[reverseMap[keys[i]]] = val + lock.Unlock() + } } + return nil + }); err != nil { + return nil, err } return m, nil diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go index 4c00dd1f7..0109f1b1d 100644 --- a/pkg/rpccache/conversation.go +++ b/pkg/rpccache/conversation.go @@ -16,15 +16,19 @@ package rpccache import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/redis/go-redis/v9" + "golang.org/x/sync/errgroup" +) + +const ( + conversationWorkerCount = 20 ) func NewConversationLocalCache(client rpcclient.ConversationRpcClient, localCache *config.LocalCache, cli redis.UniversalClient) *ConversationLocalCache { @@ -90,15 +94,33 @@ func (c *ConversationLocalCache) GetSingleConversationRecvMsgOpt(ctx context.Con } func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) { - conversations := make([]*pbconversation.Conversation, 0, len(conversationIDs)) + var ( + conversations = make([]*pbconversation.Conversation, 0, len(conversationIDs)) + conversationsChan = make(chan *pbconversation.Conversation, len(conversationIDs)) + ) + + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(conversationWorkerCount) + for _, conversationID := range conversationIDs { - conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) - if err != nil { - if errs.ErrRecordNotFound.Is(err) { - continue + conversationID := conversationID + g.Go(func() error { + conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) + if err != nil { + if errs.ErrRecordNotFound.Is(err) { + return nil + } + return err } - return nil, err - } + conversationsChan <- conversation + return nil + }) + } + if err := g.Wait(); err != nil { + return nil, err + } + close(conversationsChan) + for conversation := range conversationsChan { conversations = append(conversations, conversation) } return conversations, nil From 644eaf996c8ae0875bcd2a257621c97b821ce61b Mon Sep 17 00:00:00 2001 From: chao <48119764+withchao@users.noreply.github.com> Date: Wed, 3 Jul 2024 14:27:16 +0800 Subject: [PATCH 2/3] update gomake version (#2386) * fix: GroupApplicationAcceptedNotification * fix: GroupApplicationAcceptedNotification * fix: NotificationUserInfoUpdate * cicd: robot automated Change * fix: component * fix: getConversationInfo * feat: cron task * feat: cron task * feat: cron task * feat: cron task * feat: cron task * fix: minio config url recognition error * update gomake version * update gomake version --------- Co-authored-by: withchao --- Dockerfile | 2 +- go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Dockerfile b/Dockerfile index 3f765805c..e082dd64c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -43,7 +43,7 @@ COPY --from=builder $SERVER_DIR/start-config.yml $SERVER_DIR/ COPY --from=builder $SERVER_DIR/go.mod $SERVER_DIR/ COPY --from=builder $SERVER_DIR/go.sum $SERVER_DIR/ -RUN go get github.com/openimsdk/gomake@v0.0.13 +RUN go get github.com/openimsdk/gomake@v0.0.14-alpha.5 # Set the command to run when the container starts ENTRYPOINT ["sh", "-c", "mage start && tail -f /dev/null"] diff --git a/go.mod b/go.mod index 245214b93..365933de6 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/kelindar/bitmap v1.5.2 github.com/likexian/gokit v0.25.13 - github.com/openimsdk/gomake v0.0.13 + github.com/openimsdk/gomake v0.0.14-alpha.5 github.com/redis/go-redis/v9 v9.4.0 github.com/robfig/cron/v3 v3.0.1 github.com/shirou/gopsutil v3.21.11+incompatible diff --git a/go.sum b/go.sum index 664f2366a..a5a620a14 100644 --- a/go.sum +++ b/go.sum @@ -268,8 +268,8 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= -github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0= -github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= +github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= +github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.69-alpha.17 h1:pEag4ZdlovE+AyLsw1VYFU/3sk6ayvGdPzgufQfKf9M= github.com/openimsdk/protocol v0.0.69-alpha.17/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.49-alpha.28 h1:1CfdFxvKzyOIvgNMVMq4ZB2upAJ0evLbbigOhWQzhu8= From f231ea1f2132b96a91d333c467cabb7832cb8260 Mon Sep 17 00:00:00 2001 From: icey-yu <119291641+icey-yu@users.noreply.github.com> Date: Wed, 3 Jul 2024 15:39:59 +0800 Subject: [PATCH 3/3] fix:start (#2389) --- pkg/common/startrpc/start.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 069c92012..b531daa47 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -158,7 +158,6 @@ func Start[T any](ctx context.Context, discovery *config2.Discovery, prometheusC } return nil case <-netDone: - close(netDone) return netErr } }