|
|
|
@ -11,8 +11,10 @@ import (
|
|
|
|
|
pbRelay "Open_IM/pkg/proto/relay"
|
|
|
|
|
"Open_IM/pkg/utils"
|
|
|
|
|
"bytes"
|
|
|
|
|
"compress/gzip"
|
|
|
|
|
"context"
|
|
|
|
|
"encoding/gob"
|
|
|
|
|
"io/ioutil"
|
|
|
|
|
"strings"
|
|
|
|
|
|
|
|
|
|
go_redis "github.com/go-redis/redis/v8"
|
|
|
|
@ -31,6 +33,7 @@ type UserConn struct {
|
|
|
|
|
w *sync.Mutex
|
|
|
|
|
platformID int32
|
|
|
|
|
PushedMaxSeq uint32
|
|
|
|
|
IsCompress bool
|
|
|
|
|
}
|
|
|
|
|
type WServer struct {
|
|
|
|
|
wsAddr string
|
|
|
|
@ -75,7 +78,11 @@ func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
log.Error(operationID, "upgrade http conn err", err.Error(), query)
|
|
|
|
|
return
|
|
|
|
|
} else {
|
|
|
|
|
newConn := &UserConn{conn, new(sync.Mutex), utils.StringToInt32(query["platformID"][0]), 0}
|
|
|
|
|
var isCompress = false
|
|
|
|
|
if r.Header.Get("compression") == "gzip" {
|
|
|
|
|
isCompress = true
|
|
|
|
|
}
|
|
|
|
|
newConn := &UserConn{conn, new(sync.Mutex), utils.StringToInt32(query["platformID"][0]), 0, isCompress}
|
|
|
|
|
userCount++
|
|
|
|
|
ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0], operationID)
|
|
|
|
|
go ws.readMsg(newConn)
|
|
|
|
@ -97,6 +104,23 @@ func (ws *WServer) readMsg(conn *UserConn) {
|
|
|
|
|
ws.delUserConn(conn)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if conn.IsCompress {
|
|
|
|
|
buff := bytes.NewBuffer(msg)
|
|
|
|
|
reader, err := gzip.NewReader(buff)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.NewWarn("", "un gzip read failed")
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
msg, err = ioutil.ReadAll(reader)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.NewWarn("", "ReadAll failed")
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
err = reader.Close()
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.NewWarn("", "reader close failed")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
ws.msgParse(conn, msg)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -110,6 +134,17 @@ func (ws *WServer) SetWriteTimeout(conn *UserConn, timeout int) {
|
|
|
|
|
func (ws *WServer) writeMsg(conn *UserConn, a int, msg []byte) error {
|
|
|
|
|
conn.w.Lock()
|
|
|
|
|
defer conn.w.Unlock()
|
|
|
|
|
if conn.IsCompress {
|
|
|
|
|
buff := bytes.NewBuffer(msg)
|
|
|
|
|
gz := gzip.NewWriter(buff)
|
|
|
|
|
if _, err := gz.Write(buff.Bytes()); err != nil {
|
|
|
|
|
return utils.Wrap(err, "")
|
|
|
|
|
}
|
|
|
|
|
if err := gz.Close(); err != nil {
|
|
|
|
|
return utils.Wrap(err, "")
|
|
|
|
|
}
|
|
|
|
|
msg = buff.Bytes()
|
|
|
|
|
}
|
|
|
|
|
conn.SetWriteDeadline(time.Now().Add(time.Duration(60) * time.Second))
|
|
|
|
|
return conn.WriteMessage(a, msg)
|
|
|
|
|
}
|
|
|
|
|