|
|
@ -1,22 +1,24 @@
|
|
|
|
package new
|
|
|
|
package new
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"Open_IM/pkg/common/constant"
|
|
|
|
|
|
|
|
"Open_IM/pkg/utils"
|
|
|
|
"errors"
|
|
|
|
"errors"
|
|
|
|
|
|
|
|
"fmt"
|
|
|
|
|
|
|
|
"github.com/go-playground/validator/v10"
|
|
|
|
"github.com/gorilla/websocket"
|
|
|
|
"github.com/gorilla/websocket"
|
|
|
|
"net/http"
|
|
|
|
"net/http"
|
|
|
|
"open_im_sdk/pkg/utils"
|
|
|
|
|
|
|
|
"sync"
|
|
|
|
"sync"
|
|
|
|
"sync/atomic"
|
|
|
|
"sync/atomic"
|
|
|
|
"time"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var bufferPool = sync.Pool{
|
|
|
|
var bufferPool = sync.Pool{
|
|
|
|
New: func() interface{} {
|
|
|
|
New: func() interface{} {
|
|
|
|
return make([]byte, 1000)
|
|
|
|
return make([]byte, 1000)
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
type LongConnServer interface {
|
|
|
|
type LongConnServer interface {
|
|
|
|
Run() error
|
|
|
|
Run() error
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -29,7 +31,7 @@ type Server struct {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
type WsServer struct {
|
|
|
|
type WsServer struct {
|
|
|
|
port int
|
|
|
|
port int
|
|
|
|
wsMaxConnNum int
|
|
|
|
wsMaxConnNum int64
|
|
|
|
wsUpGrader *websocket.Upgrader
|
|
|
|
wsUpGrader *websocket.Upgrader
|
|
|
|
registerChan chan *Client
|
|
|
|
registerChan chan *Client
|
|
|
|
unregisterChan chan *Client
|
|
|
|
unregisterChan chan *Client
|
|
|
@ -37,8 +39,12 @@ type WsServer struct {
|
|
|
|
clientPool sync.Pool
|
|
|
|
clientPool sync.Pool
|
|
|
|
onlineUserNum int64
|
|
|
|
onlineUserNum int64
|
|
|
|
onlineUserConnNum int64
|
|
|
|
onlineUserConnNum int64
|
|
|
|
compressor Compressor
|
|
|
|
gzipCompressor Compressor
|
|
|
|
|
|
|
|
encoder Encoder
|
|
|
|
handler MessageHandler
|
|
|
|
handler MessageHandler
|
|
|
|
|
|
|
|
handshakeTimeout time.Duration
|
|
|
|
|
|
|
|
readBufferSize, WriteBufferSize int
|
|
|
|
|
|
|
|
validate *validator.Validate
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func newWsServer(opts ...Option) (*WsServer, error) {
|
|
|
|
func newWsServer(opts ...Option) (*WsServer, error) {
|
|
|
@ -53,16 +59,16 @@ func newWsServer(opts ...Option) (*WsServer, error) {
|
|
|
|
return &WsServer{
|
|
|
|
return &WsServer{
|
|
|
|
port: config.port,
|
|
|
|
port: config.port,
|
|
|
|
wsMaxConnNum: config.maxConnNum,
|
|
|
|
wsMaxConnNum: config.maxConnNum,
|
|
|
|
wsUpGrader: &websocket.Upgrader{
|
|
|
|
handshakeTimeout: config.handshakeTimeout,
|
|
|
|
HandshakeTimeout: config.handshakeTimeout,
|
|
|
|
readBufferSize: config.messageMaxMsgLength,
|
|
|
|
ReadBufferSize: config.messageMaxMsgLength,
|
|
|
|
|
|
|
|
CheckOrigin: func(r *http.Request) bool { return true },
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
clientPool: sync.Pool{
|
|
|
|
clientPool: sync.Pool{
|
|
|
|
New: func() interface{} {
|
|
|
|
New: func() interface{} {
|
|
|
|
return new(Client)
|
|
|
|
return new(Client)
|
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
|
|
|
|
validate: validator.New(),
|
|
|
|
|
|
|
|
clients: newUserMap(),
|
|
|
|
|
|
|
|
handler: NewGrpcHandler(),
|
|
|
|
}, nil
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
func (ws *WsServer) Run() error {
|
|
|
|
func (ws *WsServer) Run() error {
|
|
|
@ -72,53 +78,115 @@ func (ws *WsServer) Run() error {
|
|
|
|
select {
|
|
|
|
select {
|
|
|
|
case client = <-ws.registerChan:
|
|
|
|
case client = <-ws.registerChan:
|
|
|
|
ws.registerClient(client)
|
|
|
|
ws.registerClient(client)
|
|
|
|
case client = <-h.unregisterChan:
|
|
|
|
case client = <-ws.unregisterChan:
|
|
|
|
h.unregisterClient(client)
|
|
|
|
ws.unregisterClient(client)
|
|
|
|
case msg = <-h.readChan:
|
|
|
|
|
|
|
|
h.messageHandler(msg)
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}()
|
|
|
|
|
|
|
|
http.HandleFunc("/", ws.wsHandler) //Get request from client to handle by wsHandler
|
|
|
|
|
|
|
|
return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) //Start listening
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (ws *WsServer) registerClient(client *Client) {
|
|
|
|
func (ws *WsServer) registerClient(client *Client) {
|
|
|
|
var (
|
|
|
|
var (
|
|
|
|
ok bool
|
|
|
|
userOK bool
|
|
|
|
|
|
|
|
clientOK bool
|
|
|
|
cli *Client
|
|
|
|
cli *Client
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
cli, userOK,clientOK = ws.clients.Get(client.userID,client.platformID)
|
|
|
|
if cli, ok = h.clients.Get(client.key); ok == false {
|
|
|
|
if !userOK {
|
|
|
|
h.clients.Set(client.key, client)
|
|
|
|
ws.clients.Set(client.userID,client)
|
|
|
|
atomic.AddInt64(&h.onlineConnections, 1)
|
|
|
|
atomic.AddInt64(&ws.onlineUserNum, 1)
|
|
|
|
fmt.Println("R在线用户数量:", h.onlineConnections)
|
|
|
|
atomic.AddInt64(&ws.onlineUserConnNum, 1)
|
|
|
|
return
|
|
|
|
fmt.Println("R在线用户数量:", ws.onlineUserNum)
|
|
|
|
|
|
|
|
fmt.Println("R在线用户连接数量:", ws.onlineUserConnNum)
|
|
|
|
|
|
|
|
}else{
|
|
|
|
|
|
|
|
if clientOK {//已经有同平台的连接存在
|
|
|
|
|
|
|
|
ws.clients.Set(client.userID,client)
|
|
|
|
|
|
|
|
ws.multiTerminalLoginChecker(cli)
|
|
|
|
|
|
|
|
}else{
|
|
|
|
|
|
|
|
ws.clients.Set(client.userID,client)
|
|
|
|
|
|
|
|
atomic.AddInt64(&ws.onlineUserConnNum, 1)
|
|
|
|
|
|
|
|
fmt.Println("R在线用户数量:", ws.onlineUserNum)
|
|
|
|
|
|
|
|
fmt.Println("R在线用户连接数量:", ws.onlineUserConnNum)
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if client.onlineAt > cli.onlineAt {
|
|
|
|
}
|
|
|
|
h.clients.Set(client.key, client)
|
|
|
|
|
|
|
|
h.close(cli)
|
|
|
|
func (ws *WsServer) multiTerminalLoginChecker(client *Client) {
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ws *WsServer) unregisterClient(client *Client) {
|
|
|
|
|
|
|
|
isDeleteUser:=ws.clients.delete(client.userID,client.platformID)
|
|
|
|
|
|
|
|
if isDeleteUser {
|
|
|
|
|
|
|
|
atomic.AddInt64(&ws.onlineUserNum, -1)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
h.close(client)
|
|
|
|
atomic.AddInt64(&ws.onlineUserConnNum, -1)
|
|
|
|
|
|
|
|
fmt.Println("R在线用户数量:", ws.onlineUserNum)
|
|
|
|
|
|
|
|
fmt.Println("R在线用户连接数量:", ws.onlineUserConnNum)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
http.HandleFunc("/", ws.wsHandler) //Get request from client to handle by wsHandler
|
|
|
|
|
|
|
|
return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) //Start listening
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
|
func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
|
context := newContext(w, r)
|
|
|
|
context := newContext(w, r)
|
|
|
|
if isPass, compression := ws.headerCheck(w, r, operationID); isPass {
|
|
|
|
if ws.onlineUserConnNum >= ws.wsMaxConnNum {
|
|
|
|
conn, err := ws.wsUpGrader.Upgrade(w, r, nil) //Conn is obtained through the upgraded escalator
|
|
|
|
httpError(context, constant.ErrConnOverMaxNumLimit)
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
|
|
|
token string
|
|
|
|
|
|
|
|
userID string
|
|
|
|
|
|
|
|
platformID string
|
|
|
|
|
|
|
|
exists bool
|
|
|
|
|
|
|
|
compression bool
|
|
|
|
|
|
|
|
compressor Compressor
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
token, exists = context.Query(TOKEN)
|
|
|
|
|
|
|
|
if !exists {
|
|
|
|
|
|
|
|
httpError(context, constant.ErrConnArgsErr)
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
userID, exists = context.Query(USERID)
|
|
|
|
|
|
|
|
if !exists {
|
|
|
|
|
|
|
|
httpError(context, constant.ErrConnArgsErr)
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
platformID, exists = context.Query(PLATFORM_ID)
|
|
|
|
|
|
|
|
if !exists {
|
|
|
|
|
|
|
|
httpError(context, constant.ErrConnArgsErr)
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
err := tokenverify.WsVerifyToken(token, userID, platformID)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
log.Error(operationID, "upgrade http conn err", err.Error(), query)
|
|
|
|
httpError(context, err)
|
|
|
|
return
|
|
|
|
return
|
|
|
|
} else {
|
|
|
|
|
|
|
|
newConn := &UserConn{conn, new(sync.Mutex), utils.StringToInt32(query["platformID"][0]), 0, compression, query["sendID"][0], false, query["token"][0], conn.RemoteAddr().String() + "_" + strconv.Itoa(int(utils.GetCurrentTimestampByMill()))}
|
|
|
|
|
|
|
|
userCount++
|
|
|
|
|
|
|
|
ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0], newConn.connID, operationID)
|
|
|
|
|
|
|
|
go ws.readMsg(newConn)
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
wsLongConn:=newGWebSocket(constant.WebSocket,ws.handshakeTimeout,ws.readBufferSize)
|
|
|
|
log.Error(operationID, "headerCheck failed ")
|
|
|
|
err = wsLongConn.GenerateLongConn(w, r)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
httpError(context, err)
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
compressProtoc, exists := context.Query(COMPRESSION)
|
|
|
|
|
|
|
|
if exists {
|
|
|
|
|
|
|
|
if compressProtoc==GZIP_COMPRESSION_PROTOCAL{
|
|
|
|
|
|
|
|
compression = true
|
|
|
|
|
|
|
|
compressor = ws.gzipCompressor
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
compressProtoc, exists = context.GetHeader(COMPRESSION)
|
|
|
|
|
|
|
|
if exists {
|
|
|
|
|
|
|
|
if compressProtoc==GZIP_COMPRESSION_PROTOCAL {
|
|
|
|
|
|
|
|
compression = true
|
|
|
|
|
|
|
|
compressor = ws.gzipCompressor
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
client:=ws.clientPool.Get().(*Client)
|
|
|
|
|
|
|
|
client.ResetClient(context,wsLongConn,compression,compressor,ws.encoder,ws.handler,ws.unregisterChan,ws.validate)
|
|
|
|
|
|
|
|
ws.registerChan <- client
|
|
|
|
|
|
|
|
go client.readMessage()
|
|
|
|
}
|
|
|
|
}
|
|
|
|