@ -16,7 +16,10 @@ package msggateway
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/OpenIMSDK/tools/apiresp"
"net/http"
"os"
"os/signal"
@ -422,84 +425,102 @@ func (ws *WsServer) unregisterClient(client *Client) {
)
}
func ( ws * WsServer ) wsHandler ( w http . ResponseWriter , r * http . Request ) {
connContext := newContext ( w , r )
func ( ws * WsServer ) ParseWSArgs ( r * http . Request ) ( args * WSArgs , err error ) {
var v WSArgs
defer func ( ) {
args = & v
} ( )
query := r . URL . Query ( )
v . MsgResp , _ = strconv . ParseBool ( query . Get ( MsgResp ) )
if ws . onlineUserConnNum . Load ( ) >= ws . wsMaxConnNum {
httpError ( connContext , errs . ErrConnOverMaxNumLimit )
return
return nil , errs . ErrConnOverMaxNumLimit . Wrap ( "over max conn num limit" )
}
var (
token string
userID string
platformIDStr string
exists bool
compression bool
)
token , exists = connContext . Query ( Token )
if ! exists {
httpError ( connContext , errs . ErrConnArgsErr )
return
if v . Token = query . Get ( Token ) ; v . Token == "" {
return nil , errs . ErrConnArgsErr . Wrap ( "token is empty" )
}
userID , exists = connContext . Query ( WsUserID )
if ! exists {
httpError ( connContext , errs . ErrConnArgsErr )
return
if v . UserID = query . Get ( WsUserID ) ; v . UserID == "" {
return nil , errs . ErrConnArgsErr . Wrap ( "sendID is empty" )
}
platformIDStr , exists = connContext . Query ( PlatformID )
if ! exists {
httpError ( connContext , errs . ErrConnArgsErr )
return
platformIDStr := query . Get ( PlatformID )
if platformIDStr == "" {
return nil , errs . ErrConnArgsErr . Wrap ( "platformID is empty" )
}
platformID , err := strconv . Atoi ( platformIDStr )
if err != nil {
httpError ( connContext , errs . ErrConnArgsErr )
return
return nil , errs . ErrConnArgsErr . Wrap ( "platformID is not int" )
}
if err = authverify . WsVerifyToken ( token , userID , platformID ) ; err != nil {
httpError ( connContext , err )
return
v . PlatformID = platformID
if err = authverify . WsVerifyToken ( v . Token , v . UserID , platformID ) ; err != nil {
return nil , err
}
if query . Get ( Compression ) == GzipCompressionProtocol {
v . Compression = true
}
m , err := ws . cache . GetTokensWithoutError ( context . Background ( ) , userID , platformID )
if r . Header . Get ( Compression ) == GzipCompressionProtocol {
v . Compression = true
}
m , err := ws . cache . GetTokensWithoutError ( context . Background ( ) , v . UserID , platformID )
if err != nil {
httpError ( connContext , err )
return
return nil , err
}
if v , ok := m [ t oken] ; ok {
if v , ok := m [ v. T oken] ; ok {
switch v {
case constant . NormalToken :
case constant . KickedToken :
httpError ( connContext , errs . ErrTokenKicked . Wrap ( ) )
return
return nil , errs . ErrTokenKicked . Wrap ( )
default :
httpError ( connContext , errs . ErrTokenUnknown . Wrap ( ) )
return
return nil , errs . ErrTokenUnknown . Wrap ( fmt . Sprintf ( "token status is %d" , v ) )
}
} else {
httpError ( connContext , errs . ErrTokenNotExist . Wrap ( ) )
return
return nil , errs . ErrTokenNotExist . Wrap ( )
}
return & v , nil
}
wsLongConn := newGWebSocket ( WebSocket , ws . handshakeTimeout , ws . writeBufferSize )
err = wsLongConn . GenerateLongConn ( w , r )
if err != nil {
type WSArgs struct {
Token string
UserID string
PlatformID int
Compression bool
MsgResp bool
}
func ( ws * WsServer ) wsHandler ( w http . ResponseWriter , r * http . Request ) {
connContext := newContext ( w , r )
args , pErr := ws . ParseWSArgs ( r )
var wsLongConn * GWebSocket
if args . MsgResp {
wsLongConn = newGWebSocket ( WebSocket , ws . handshakeTimeout , ws . writeBufferSize )
if err := wsLongConn . GenerateLongConn ( w , r ) ; err != nil {
httpError ( connContext , err )
return
}
compressProtoc , exists := connContext . Query ( Compression )
if exists {
if compressProtoc == GzipCompressionProtocol {
compression = true
data , err := json . Marshal ( apiresp . ParseError ( pErr ) )
if err != nil {
_ = wsLongConn . Close ( )
return
}
if err := wsLongConn . WriteMessage ( MessageText , data ) ; err != nil {
_ = wsLongConn . Close ( )
return
}
if pErr != nil {
_ = wsLongConn . Close ( )
return
}
compressProtoc , exists = connContext . GetHeader ( Compression )
if exists {
if compressProtoc == GzipCompressionProtocol {
compression = true
} else {
if pErr != nil {
httpError ( connContext , pErr )
return
}
wsLongConn = newGWebSocket ( WebSocket , ws . handshakeTimeout , ws . writeBufferSize )
if err := wsLongConn . GenerateLongConn ( w , r ) ; err != nil {
httpError ( connContext , err )
return
}
}
client := ws . clientPool . Get ( ) . ( * Client )
client . ResetClient ( connContext , wsLongConn , connContext . GetBackground ( ) , compression , ws , token )
client . ResetClient ( connContext , wsLongConn , connContext . GetBackground ( ) , args. Compression , ws , args . T oken)
ws . registerChan <- client
go client . readMessage ( )
}