package msggateway import ( "context" "errors" "fmt" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/golang/protobuf/proto" "runtime/debug" "sync" ) const ( // MessageText is for UTF-8 encoded text messages like JSON. MessageText = iota + 1 // MessageBinary is for binary messages like protobufs. MessageBinary // CloseMessage denotes a close control message. The optional message // payload contains a numeric code and text. Use the FormatCloseMessage // function to format a close message payload. CloseMessage = 8 // PingMessage denotes a ping control message. The optional message payload // is UTF-8 encoded text. PingMessage = 9 // PongMessage denotes a pong control message. The optional message payload // is UTF-8 encoded text. PongMessage = 10 ) type Client struct { w *sync.Mutex conn LongConn platformID int isCompress bool userID string isBackground bool ctx *UserConnContext onlineAt int64 // 上线时间戳(毫秒) longConnServer LongConnServer closed bool } func newClient(ctx *UserConnContext, conn LongConn, isCompress bool) *Client { return &Client{ w: new(sync.Mutex), conn: conn, platformID: utils.StringToInt(ctx.GetPlatformID()), isCompress: isCompress, userID: ctx.GetUserID(), ctx: ctx, onlineAt: utils.GetCurrentTimestampByMill(), } } func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, isCompress bool, longConnServer LongConnServer) { c.w = new(sync.Mutex) c.conn = conn c.platformID = utils.StringToInt(ctx.GetPlatformID()) c.isCompress = isCompress c.userID = ctx.GetUserID() c.ctx = ctx c.onlineAt = utils.GetCurrentTimestampByMill() c.longConnServer = longConnServer } func (c *Client) readMessage() { defer func() { if r := recover(); r != nil { fmt.Println("socket have panic err:", r, string(debug.Stack())) } c.close() }() //var returnErr error for { messageType, message, returnErr := c.conn.ReadMessage() if returnErr != nil { break } if c.closed == true { //连接刚置位已经关闭,但是协程还没退出的场景 break } switch messageType { case PingMessage: case PongMessage: case CloseMessage: return case MessageText: case MessageBinary: if len(message) == 0 { continue } returnErr = c.handleMessage(message) if returnErr != nil { log.ZError(context.Background(), "WSGetNewestSeq", returnErr) break } } } } func (c *Client) handleMessage(message []byte) error { if c.isCompress { var decompressErr error message, decompressErr = c.longConnServer.DeCompress(message) if decompressErr != nil { return utils.Wrap(decompressErr, "") } } var binaryReq Req err := c.longConnServer.Decode(message, &binaryReq) if err != nil { return utils.Wrap(err, "") } if err := c.longConnServer.Validate(binaryReq); err != nil { return utils.Wrap(err, "") } if binaryReq.SendID != c.userID { return errors.New("exception conn userID not same to req userID") } ctx := mcontext.WithMustInfoCtx([]string{binaryReq.OperationID, binaryReq.SendID, constant.PlatformIDToName(c.platformID), c.ctx.GetConnID()}) var messageErr error var resp []byte switch binaryReq.ReqIdentifier { case WSGetNewestSeq: resp, messageErr = c.longConnServer.GetSeq(ctx, binaryReq) log.ZError(ctx, "WSGetNewestSeq", messageErr, "resp", resp) case WSSendMsg: resp, messageErr = c.longConnServer.SendMessage(ctx, binaryReq) case WSSendSignalMsg: resp, messageErr = c.longConnServer.SendSignalMessage(ctx, binaryReq) case WSPullMsgBySeqList: resp, messageErr = c.longConnServer.PullMessageBySeqList(ctx, binaryReq) case WsLogoutMsg: resp, messageErr = c.longConnServer.UserLogout(ctx, binaryReq) case WsSetBackgroundStatus: resp, messageErr = c.setAppBackgroundStatus(ctx, binaryReq) default: return errors.New(fmt.Sprintf("ReqIdentifier failed,sendID:%d,msgIncr:%s,reqIdentifier:%s", binaryReq.SendID, binaryReq.MsgIncr, binaryReq.ReqIdentifier)) } c.replyMessage(&binaryReq, messageErr, resp) return nil } func (c *Client) setAppBackgroundStatus(ctx context.Context, req Req) ([]byte, error) { resp, isBackground, messageErr := c.longConnServer.SetUserDeviceBackground(ctx, req) if messageErr != nil { return nil, messageErr } c.isBackground = isBackground //todo callback return resp, nil } func (c *Client) close() { c.w.Lock() defer c.w.Unlock() c.closed = true c.conn.Close() c.longConnServer.UnRegister(c) } func (c *Client) replyMessage(binaryReq *Req, err error, resp []byte) { mReply := Resp{ ReqIdentifier: binaryReq.ReqIdentifier, MsgIncr: binaryReq.MsgIncr, OperationID: binaryReq.OperationID, Data: resp, } _ = c.writeMsg(mReply) } func (c *Client) PushMessage(ctx context.Context, msgData *sdkws.MsgData) error { data, err := proto.Marshal(msgData) if err != nil { return err } resp := Resp{ ReqIdentifier: WSPushMsg, OperationID: mcontext.GetOperationID(ctx), Data: data, } return c.writeMsg(resp) } func (c *Client) KickOnlineMessage(ctx context.Context) error { return nil } func (c *Client) writeMsg(resp Resp) error { c.w.Lock() defer c.w.Unlock() if c.closed == true { return nil } encodedBuf := bufferPool.Get().([]byte) resultBuf := bufferPool.Get().([]byte) encodeBuf, err := c.longConnServer.Encode(resp) if err != nil { return utils.Wrap(err, "") } _ = c.conn.SetWriteTimeout(60) if c.isCompress { var compressErr error resultBuf, compressErr = c.longConnServer.Compress(encodeBuf) if compressErr != nil { return utils.Wrap(compressErr, "") } return c.conn.WriteMessage(MessageBinary, resultBuf) } else { return c.conn.WriteMessage(MessageBinary, encodedBuf) } }