You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Open-IM-Server/internal/msggateway/new/client.go

203 lines
5.5 KiB

package new
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/utils"
"context"
"errors"
"fmt"
"github.com/go-playground/validator/v10"
"runtime/debug"
"sync"
)
const (
// MessageText is for UTF-8 encoded text messages like JSON.
MessageText = iota + 1
// MessageBinary is for binary messages like protobufs.
MessageBinary
// CloseMessage denotes a close control message. The optional message
// payload contains a numeric code and text. Use the FormatCloseMessage
// function to format a close message payload.
CloseMessage = 8
// PingMessage denotes a ping control message. The optional message payload
// is UTF-8 encoded text.
PingMessage = 9
// PongMessage denotes a pong control message. The optional message payload
// is UTF-8 encoded text.
PongMessage = 10
)
type Client struct {
w *sync.Mutex
conn LongConn
platformID int
isCompress bool
userID string
isBackground bool
connID string
onlineAt int64 // 上线时间戳(毫秒)
handler MessageHandler
unregisterChan chan *Client
compressor Compressor
encoder Encoder
validate *validator.Validate
closed bool
}
func newClient(ctx *UserConnContext, conn LongConn, isCompress bool, compressor Compressor, encoder Encoder,
handler MessageHandler, unregisterChan chan *Client, validate *validator.Validate) *Client {
return &Client{
w: new(sync.Mutex),
conn: conn,
platformID: utils.StringToInt(ctx.GetPlatformID()),
isCompress: isCompress,
userID: ctx.GetUserID(),
compressor: compressor,
encoder: encoder,
connID: ctx.GetConnID(),
onlineAt: utils.GetCurrentTimestampByMill(),
handler: handler,
unregisterChan: unregisterChan,
validate: validate,
}
}
func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, isCompress bool, compressor Compressor, encoder Encoder,
handler MessageHandler, unregisterChan chan *Client, validate *validator.Validate) {
c.w = new(sync.Mutex)
c.conn = conn
c.platformID = utils.StringToInt(ctx.GetPlatformID())
c.isCompress = isCompress
c.userID = ctx.GetUserID()
c.compressor = compressor
c.encoder = encoder
c.connID = ctx.GetConnID()
c.onlineAt = utils.GetCurrentTimestampByMill()
c.handler = handler
c.unregisterChan = unregisterChan
c.validate = validate
}
func (c *Client) readMessage() {
defer func() {
if r := recover(); r != nil {
fmt.Println("socket have panic err:", r, string(debug.Stack()))
}
//c.close()
}()
var returnErr error
for {
messageType, message, returnErr := c.conn.ReadMessage()
if returnErr != nil {
break
}
if c.closed == true { //连接刚置位已经关闭,但是协程还没退出的场景
break
}
switch messageType {
case PingMessage:
case PongMessage:
case CloseMessage:
return
case MessageText:
case MessageBinary:
if len(message) == 0 {
continue
}
returnErr = c.handleMessage(message)
if returnErr != nil {
break
}
}
}
}
func (c *Client) handleMessage(message []byte) error {
if c.IsCompress {
var decompressErr error
message, decompressErr = c.compressor.DeCompress(message)
if decompressErr != nil {
return utils.Wrap(decompressErr, "")
}
}
var binaryReq Req
err := c.encoder.Decode(message, &binaryReq)
if err != nil {
return utils.Wrap(err, "")
}
if err := c.validate.Struct(binaryReq); err != nil {
return utils.Wrap(err, "")
}
if binaryReq.SendID != c.userID {
return errors.New("exception conn userID not same to req userID")
}
ctx := context.Background()
ctx = context.WithValue(ctx, c.connID, binaryReq.OperationID)
ctx = context.WithValue(ctx, OPERATION_ID, binaryReq.OperationID)
ctx = context.WithValue(ctx, "userID", binaryReq.SendID)
var messageErr error
var resp []byte
switch binaryReq.ReqIdentifier {
case constant.WSGetNewestSeq:
resp, messageErr = c.handler.GetSeq(ctx, binaryReq)
case constant.WSSendMsg:
resp, messageErr = c.handler.SendMessage(ctx, binaryReq)
case constant.WSSendSignalMsg:
resp, messageErr = c.handler.SendSignalMessage(ctx, binaryReq)
case constant.WSPullMsgBySeqList:
resp, messageErr = c.handler.PullMessageBySeqList(ctx, binaryReq)
case constant.WsLogoutMsg:
resp, messageErr = c.handler.UserLogout(ctx, binaryReq)
case constant.WsSetBackgroundStatus:
resp, messageErr = c.handler.SetUserDeviceBackground(ctx, binaryReq)
default:
return errors.New(fmt.Sprintf("ReqIdentifier failed,sendID:%d,msgIncr:%s,reqIdentifier:%s", binaryReq.SendID, binaryReq.MsgIncr, binaryReq.ReqIdentifier))
}
c.replyMessage(binaryReq, messageErr, resp)
return nil
}
func (c *Client) close() {
c.w.Lock()
defer c.w.Unlock()
c.conn.Close()
c.unregisterChan <- c
}
func (c *Client) replyMessage(binaryReq Req, err error, resp []byte) {
mReply := Resp{
ReqIdentifier: binaryReq.ReqIdentifier,
MsgIncr: binaryReq.MsgIncr,
OperationID: binaryReq.OperationID,
Data: resp,
}
_ = c.writeMsg(mReply)
}
func (c *Client) writeMsg(resp Resp) error {
c.w.Lock()
defer c.w.Unlock()
if c.closed == true {
return nil
}
encodedBuf := bufferPool.Get().([]byte)
resultBuf := bufferPool.Get().([]byte)
encodeBuf, err := c.encoder.Encode(resp)
if err != nil {
return utils.Wrap(err, "")
}
_ = c.conn.SetWriteTimeout(60)
if c.isCompress {
var compressErr error
resultBuf, compressErr = c.compressor.Compress(encodeBuf)
if compressErr != nil {
return utils.Wrap(compressErr, "")
}
return c.conn.WriteMessage(MessageBinary, resultBuf)
} else {
return c.conn.WriteMessage(MessageBinary, encodedBuf)
}
}