perf: broadcast msg to all gateway with concurrency (#1411)

Signed-off-by: rfyiamcool <rfyiamcool@163.com>
pull/1423/head
fengyun.rui 1 year ago committed by GitHub
parent ae048417ee
commit eeb16d4116
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -18,8 +18,9 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"sync"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "golang.org/x/sync/errgroup"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/conversation" "github.com/OpenIMSDK/protocol/conversation"
@ -40,6 +41,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
) )
@ -285,18 +287,44 @@ func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
if err != nil { if err != nil {
return nil, err return nil, err
} }
var (
mu sync.Mutex
wg = errgroup.Group{}
input = &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs}
maxWorkers = config.Config.Push.MaxConcurrentWorkers
)
if maxWorkers < 3 {
maxWorkers = 3
}
wg.SetLimit(maxWorkers)
// Online push message // Online push message
for _, v := range conns { for _, conn := range conns {
msgClient := msggateway.NewMsgGatewayClient(v) conn := conn // loop var safe
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs}) wg.Go(func() error {
msgClient := msggateway.NewMsgGatewayClient(conn)
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input)
if err != nil { if err != nil {
continue return nil
} }
log.ZDebug(ctx, "push result", "reply", reply) log.ZDebug(ctx, "push result", "reply", reply)
if reply != nil && reply.SinglePushResult != nil { if reply != nil && reply.SinglePushResult != nil {
mu.Lock()
wsResults = append(wsResults, reply.SinglePushResult...) wsResults = append(wsResults, reply.SinglePushResult...)
mu.Unlock()
} }
return nil
})
} }
_ = wg.Wait()
// always return nil
return wsResults, nil return wsResults, nil
} }

@ -199,6 +199,7 @@ type configStruct struct {
} `yaml:"longConnSvr"` } `yaml:"longConnSvr"`
Push struct { Push struct {
MaxConcurrentWorkers int `yaml:"maxConcurrentWorkers"`
Enable string `yaml:"enable"` Enable string `yaml:"enable"`
GeTui struct { GeTui struct {
PushUrl string `yaml:"pushUrl"` PushUrl string `yaml:"pushUrl"`

Loading…
Cancel
Save