You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Open-IM-Server/src/msg_gateway/gate/rpc_server.go

193 lines
5.9 KiB

package gate
import (
"Open_IM/src/common/config"
"Open_IM/src/common/constant"
"Open_IM/src/common/log"
"Open_IM/src/grpc-etcdv3/getcdv3"
pbRelay "Open_IM/src/proto/relay"
"Open_IM/src/utils"
"bytes"
"context"
"encoding/gob"
"fmt"
"github.com/gorilla/websocket"
"google.golang.org/grpc"
"net"
"strings"
)
type RPCServer struct {
rpcPort int
rpcRegisterName string
etcdSchema string
etcdAddr []string
}
func (r *RPCServer) onInit(rpcPort int) {
r.rpcPort = rpcPort
r.rpcRegisterName = config.Config.RpcRegisterName.OpenImOnlineMessageRelayName
r.etcdSchema = config.Config.Etcd.EtcdSchema
r.etcdAddr = config.Config.Etcd.EtcdAddr
}
func (r *RPCServer) run() {
ip := utils.ServerIP
registerAddress := ip + ":" + utils.IntToString(r.rpcPort)
listener, err := net.Listen("tcp", registerAddress)
if err != nil {
log.ErrorByArgs(fmt.Sprintf("fail to listening consumer, err:%v\n", err))
return
}
defer listener.Close()
srv := grpc.NewServer()
defer srv.GracefulStop()
pbRelay.RegisterOnlineMessageRelayServiceServer(srv, r)
err = getcdv3.RegisterEtcd4Unique(r.etcdSchema, strings.Join(r.etcdAddr, ","), ip, r.rpcPort, r.rpcRegisterName, 10)
if err != nil {
log.ErrorByKv("register push message rpc to etcd err", "", "err", err.Error())
}
err = srv.Serve(listener)
if err != nil {
log.ErrorByKv("push message rpc listening err", "", "err", err.Error())
return
}
}
func (r *RPCServer) MsgToUser(_ context.Context, in *pbRelay.MsgToUserReq) (*pbRelay.MsgToUserResp, error) {
log.InfoByKv("PushMsgToUser is arriving", in.OperationID, "args", in.String())
var resp []*pbRelay.SingleMsgToUser
var RecvID string
msg := make(map[string]interface{})
mReply := make(map[string]interface{})
mReply["reqIdentifier"] = constant.WSPushMsg
mReply["errCode"] = 0
mReply["errMsg"] = ""
msg["sendID"] = in.SendID
msg["recvID"] = in.RecvID
msg["msgFrom"] = in.MsgFrom
msg["contentType"] = in.ContentType
msg["sessionType"] = in.SessionType
msg["senderNickName"] = in.SenderNickName
msg["senderFaceUrl"] = in.SenderFaceURL
msg["clientMsgID"] = in.ClientMsgID
msg["serverMsgID"] = in.ServerMsgID
msg["content"] = in.Content
msg["seq"] = in.RecvSeq
msg["sendTime"] = in.SendTime
msg["senderPlatformID"] = in.PlatformID
mReply["data"] = msg
var b bytes.Buffer
enc := gob.NewEncoder(&b)
err := enc.Encode(mReply)
if err != nil {
fmt.Println(err)
}
switch in.GetSessionType() {
case constant.SingleChatType:
RecvID = in.GetRecvID()
case constant.GroupChatType:
RecvID = strings.Split(in.GetRecvID(), " ")[0]
}
var tag bool
a := genUidPlatformArray(RecvID)
for _, v := range a {
if conn := ws.getUserConn(v); conn != nil {
UIDAndPID := strings.Split(v, " ")
tag = true
resultCode := sendMsgToUser(conn, b.Bytes(), in, UIDAndPID[1], UIDAndPID[0])
temp := &pbRelay.SingleMsgToUser{
ResultCode: resultCode,
RecvID: UIDAndPID[0],
RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]),
}
resp = append(resp, temp)
}
}
if !tag {
log.NewError(in.OperationID, "push err ,ws conn not in map", in.String())
}
//for key, conn := range ws.wsUserToConn {
// UIDAndPID := strings.Split(key, " ")
// if UIDAndPID[0] == RecvID {
// tag = true
// resultCode := sendMsgToUser(conn, bMsg, in, UIDAndPID[1], UIDAndPID[0])
// temp := &pbRelay.SingleMsgToUser{
// ResultCode: resultCode,
// RecvID: UIDAndPID[0],
// RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]),
// }
// resp = append(resp, temp)
// }
//}
//if !tag {
// log.NewError(in.OperationID, "push err ,ws conn not in map", in.String())
//}
//switch in.GetContentType() {
//case constant.SyncSenderMsg:
// log.InfoByKv("come sync", in.OperationID, "args", in.String())
// RecvID = in.GetSendID()
// if in.MsgFrom != constant.SysMsgType {
// for key, conn := range ws.wsUserToConn {
// UIDAndPID := strings.Split(key, " ")
// if UIDAndPID[0] == RecvID && utils.PlatformIDToName(in.GetPlatformID()) != UIDAndPID[1] {
// resultCode := sendMsgToUser(conn, bMsg, in, UIDAndPID[1], UIDAndPID[0])
// temp := &pbRelay.SingleMsgToUser{
// ResultCode: resultCode,
// RecvID: UIDAndPID[0],
// RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]),
// }
// resp = append(resp, temp)
// }
//
// }
// }
//default:
// log.InfoByKv("not come sync", in.OperationID, "args", in.String())
// switch in.SessionType {
// case constant.SingleChatType:
// log.InfoByKv("come single", in.OperationID, "args", in.String())
// RecvID = in.GetRecvID()
// case constant.GroupChatType:
// RecvID = strings.Split(in.GetRecvID(), " ")[0]
// default:
// }
// log.InfoByKv("come for range", in.OperationID, "args", in.String())
//
// for key, conn := range ws.wsUserToConn {
// UIDAndPID := strings.Split(key, " ")
// if UIDAndPID[0] == RecvID {
// resultCode := sendMsgToUser(conn, bMsg, in, UIDAndPID[1], UIDAndPID[0])
// temp := &pbRelay.SingleMsgToUser{
// ResultCode: resultCode,
// RecvID: UIDAndPID[0],
// RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]),
// }
// resp = append(resp, temp)
// }
// }
//}
return &pbRelay.MsgToUserResp{
Resp: resp,
}, nil
}
func sendMsgToUser(conn *UserConn, bMsg []byte, in *pbRelay.MsgToUserReq, RecvPlatForm, RecvID string) (ResultCode int64) {
err := ws.writeMsg(conn, websocket.BinaryMessage, bMsg)
if err != nil {
log.ErrorByKv("PushMsgToUser is failed By Ws", "", "Addr", conn.RemoteAddr().String(),
"error", err, "senderPlatform", utils.PlatformIDToName(in.PlatformID), "recvPlatform", RecvPlatForm, "args", in.String(), "recvID", RecvID)
ResultCode = -2
return ResultCode
} else {
log.InfoByKv("PushMsgToUser is success By Ws", in.OperationID, "args", in.String())
ResultCode = 0
return ResultCode
}
}
func genUidPlatformArray(uid string) (array []string) {
for i := 1; i <= utils.LinuxPlatformID; i++ {
array = append(array, uid+" "+utils.PlatformIDToName(int32(i)))
}
return array
}