From a9153afc38f2b7e79c6959777e548deddc5deb4d Mon Sep 17 00:00:00 2001 From: "fengyun.rui" Date: Wed, 22 Nov 2023 17:31:37 +0800 Subject: [PATCH] perf: control ws write buffer (#1451) Signed-off-by: rfyiamcool --- internal/msggateway/init.go | 4 +++- internal/msggateway/long_conn.go | 9 +++++++-- internal/msggateway/n_ws_server.go | 5 ++++- internal/msggateway/options.go | 8 ++++++++ pkg/common/config/config.go | 1 + 5 files changed, 23 insertions(+), 4 deletions(-) diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 12a6d3770..14c320c42 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -37,7 +37,9 @@ func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error { WithPort(wsPort), WithMaxConnNum(int64(config.Config.LongConnSvr.WebsocketMaxConnNum)), WithHandshakeTimeout(time.Duration(config.Config.LongConnSvr.WebsocketTimeout)*time.Second), - WithMessageMaxMsgLength(config.Config.LongConnSvr.WebsocketMaxMsgLen)) + WithMessageMaxMsgLength(config.Config.LongConnSvr.WebsocketMaxMsgLen), + WithWriteBufferSize(config.Config.LongConnSvr.WebsocketWriteBufferSize), + ) if err != nil { return err } diff --git a/internal/msggateway/long_conn.go b/internal/msggateway/long_conn.go index 604619eb5..93e5cc33f 100644 --- a/internal/msggateway/long_conn.go +++ b/internal/msggateway/long_conn.go @@ -50,10 +50,11 @@ type GWebSocket struct { protocolType int conn *websocket.Conn handshakeTimeout time.Duration + writeBufferSize int } -func newGWebSocket(protocolType int, handshakeTimeout time.Duration) *GWebSocket { - return &GWebSocket{protocolType: protocolType, handshakeTimeout: handshakeTimeout} +func newGWebSocket(protocolType int, handshakeTimeout time.Duration, wbs int) *GWebSocket { + return &GWebSocket{protocolType: protocolType, handshakeTimeout: handshakeTimeout, writeBufferSize: wbs} } func (d *GWebSocket) Close() error { @@ -65,6 +66,10 @@ func (d *GWebSocket) GenerateLongConn(w http.ResponseWriter, r *http.Request) er HandshakeTimeout: d.handshakeTimeout, CheckOrigin: func(r *http.Request) bool { return true }, } + if d.writeBufferSize > 0 { // default is 4kb. + upgrader.WriteBufferSize = d.writeBufferSize + } + conn, err := upgrader.Upgrade(w, r, nil) if err != nil { return err diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index 10dd988d1..a249ff70f 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -68,6 +68,7 @@ type WsServer struct { onlineUserNum atomic.Int64 onlineUserConnNum atomic.Int64 handshakeTimeout time.Duration + writeBufferSize int validate *validator.Validate cache cache.MsgModel userClient *rpcclient.UserRpcClient @@ -137,6 +138,7 @@ func NewWsServer(opts ...Option) (*WsServer, error) { return &WsServer{ port: config.port, wsMaxConnNum: config.maxConnNum, + writeBufferSize: config.writeBufferSize, handshakeTimeout: config.handshakeTimeout, clientPool: sync.Pool{ New: func() interface{} { @@ -430,7 +432,8 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) { httpError(connContext, errs.ErrTokenNotExist.Wrap()) return } - wsLongConn := newGWebSocket(WebSocket, ws.handshakeTimeout) + + wsLongConn := newGWebSocket(WebSocket, ws.handshakeTimeout, ws.writeBufferSize) err = wsLongConn.GenerateLongConn(w, r) if err != nil { httpError(connContext, err) diff --git a/internal/msggateway/options.go b/internal/msggateway/options.go index 24cbbe43f..6513ac5dc 100644 --- a/internal/msggateway/options.go +++ b/internal/msggateway/options.go @@ -27,6 +27,8 @@ type ( handshakeTimeout time.Duration // 允许消息最大长度 messageMaxMsgLength int + // websocket write buffer, default: 4096, 4kb. + writeBufferSize int } ) @@ -53,3 +55,9 @@ func WithMessageMaxMsgLength(length int) Option { opt.messageMaxMsgLength = length } } + +func WithWriteBufferSize(size int) Option { + return func(opt *configs) { + opt.writeBufferSize = size + } +} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index fdb1cee00..1e9df7813 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -196,6 +196,7 @@ type configStruct struct { WebsocketMaxConnNum int `yaml:"websocketMaxConnNum"` WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"` WebsocketTimeout int `yaml:"websocketTimeout"` + WebsocketWriteBufferSize int `yaml:"websocketWriteBufferSize"` } `yaml:"longConnSvr"` Push struct {