From eeb16d41162d8f5de11ce29afc5367ccb5e30a4e Mon Sep 17 00:00:00 2001 From: "fengyun.rui" Date: Wed, 15 Nov 2023 22:24:17 +0800 Subject: [PATCH] perf: broadcast msg to all gateway with concurrency (#1411) Signed-off-by: rfyiamcool --- internal/push/push_to_client.go | 50 +++++++++++++++++++++++++-------- pkg/common/config/config.go | 5 ++-- 2 files changed, 42 insertions(+), 13 deletions(-) diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index b4fb35d8e..75a1c1380 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -18,8 +18,9 @@ import ( "context" "encoding/json" "errors" + "sync" - "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" + "golang.org/x/sync/errgroup" "github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/conversation" @@ -40,6 +41,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" + "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" ) @@ -285,18 +287,44 @@ func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, if err != nil { return nil, err } + + var ( + mu sync.Mutex + wg = errgroup.Group{} + input = &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs} + maxWorkers = config.Config.Push.MaxConcurrentWorkers + ) + + if maxWorkers < 3 { + maxWorkers = 3 + } + + wg.SetLimit(maxWorkers) + // Online push message - for _, v := range conns { - msgClient := msggateway.NewMsgGatewayClient(v) - reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs}) - if err != nil { - continue - } - log.ZDebug(ctx, "push result", "reply", reply) - if reply != nil && reply.SinglePushResult != nil { - wsResults = append(wsResults, reply.SinglePushResult...) - } + for _, conn := range conns { + conn := conn // loop var safe + wg.Go(func() error { + msgClient := msggateway.NewMsgGatewayClient(conn) + reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input) + if err != nil { + return nil + } + + log.ZDebug(ctx, "push result", "reply", reply) + if reply != nil && reply.SinglePushResult != nil { + mu.Lock() + wsResults = append(wsResults, reply.SinglePushResult...) + mu.Unlock() + } + + return nil + }) } + + _ = wg.Wait() + + // always return nil return wsResults, nil } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 94688b0fb..fdb1cee00 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -199,8 +199,9 @@ type configStruct struct { } `yaml:"longConnSvr"` Push struct { - Enable string `yaml:"enable"` - GeTui struct { + MaxConcurrentWorkers int `yaml:"maxConcurrentWorkers"` + Enable string `yaml:"enable"` + GeTui struct { PushUrl string `yaml:"pushUrl"` AppKey string `yaml:"appKey"` Intent string `yaml:"intent"`