proto modify

test-errcode
wangchuxiao 2 years ago
parent a862fa3960
commit e07c9077ca

@ -17,7 +17,6 @@ import (
"github.com/go-playground/validator/v10" "github.com/go-playground/validator/v10"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/mitchellh/mapstructure" "github.com/mitchellh/mapstructure"
"net/http"
) )
var _ context.Context // 解决goland编辑器bug var _ context.Context // 解决goland编辑器bug
@ -27,12 +26,11 @@ func NewMsg(zk *openKeeper.ZkClient) *Msg {
} }
type Msg struct { type Msg struct {
zk *openKeeper.ZkClient zk *openKeeper.ZkClient
validate *validator.Validate
} }
var validate *validator.Validate func () SetOptions(options map[string]bool, value bool) {
func SetOptions(options map[string]bool, value bool) {
utils.SetSwitchFromOptions(options, constant.IsHistory, value) utils.SetSwitchFromOptions(options, constant.IsHistory, value)
utils.SetSwitchFromOptions(options, constant.IsPersistent, value) utils.SetSwitchFromOptions(options, constant.IsPersistent, value)
utils.SetSwitchFromOptions(options, constant.IsSenderSync, value) utils.SetSwitchFromOptions(options, constant.IsSenderSync, value)
@ -240,7 +238,7 @@ func (o *Msg) ManagementSendMsg(c *gin.Context) {
log.Info(params.OperationID, "", "api ManagementSendMsg call end..., [data: %s] [reply: %s]", pbData.String(), RpcResp.String()) log.Info(params.OperationID, "", "api ManagementSendMsg call end..., [data: %s] [reply: %s]", pbData.String(), RpcResp.String())
resp := apistruct.ManagementSendMsgResp{ResultList: sdkws.UserSendMsgResp{ServerMsgID: RpcResp.ServerMsgID, ClientMsgID: RpcResp.ClientMsgID, SendTime: RpcResp.SendTime}} resp := apistruct.ManagementSendMsgResp{ResultList: sdkws.UserSendMsgResp{ServerMsgID: RpcResp.ServerMsgID, ClientMsgID: RpcResp.ClientMsgID, SendTime: RpcResp.SendTime}}
log.Info(params.OperationID, "ManagementSendMsg return", resp) log.Info(params.OperationID, "ManagementSendMsg return", resp)
c.JSON(http.StatusOK, resp) apiresp.GinSuccess(c, resp)
} }
func (o *Msg) ManagementBatchSendMsg(c *gin.Context) { func (o *Msg) ManagementBatchSendMsg(c *gin.Context) {

@ -7,6 +7,7 @@ import (
sdkws "OpenIM/pkg/proto/sdkws" sdkws "OpenIM/pkg/proto/sdkws"
"OpenIM/pkg/utils" "OpenIM/pkg/utils"
"context" "context"
"google.golang.org/grpc"
"strings" "strings"
) )
@ -74,28 +75,22 @@ func (r *RPCServer) GetSingleUserMsgForPush(operationID string, msgData *sdkws.M
} }
func (r *RPCServer) GetSingleUserMsg(operationID string, currentMsgSeq uint32, userID string) []*sdkws.MsgData { func (r *RPCServer) GetSingleUserMsg(operationID string, currentMsgSeq uint32, userID string) []*sdkws.MsgData {
seqList, err := r.GenPullSeqList(currentMsgSeq, operationID, userID) seqs, err := r.GenPullSeqList(currentMsgSeq, operationID, userID)
if err != nil { if err != nil {
log.Error(operationID, "GenPullSeqList failed ", err.Error(), currentMsgSeq, userID) log.Error(operationID, "GenPullSeqList failed ", err.Error(), currentMsgSeq, userID)
return nil return nil
} }
if len(seqList) == 0 { if len(seqs) == 0 {
log.Error(operationID, "GenPullSeqList len == 0 ", currentMsgSeq, userID) log.Error(operationID, "GenPullSeqList len == 0 ", currentMsgSeq, userID)
return nil return nil
} }
rpcReq := sdkws.PullMessageBySeqListReq{} rpcReq := sdkws.PullMessageBySeqsReq{}
rpcReq.SeqList = seqList //rpcReq.Seqs = seqs
rpcReq.UserID = userID rpcReq.UserID = userID
rpcReq.OperationID = operationID var grpcConn *grpc.ClientConn
grpcConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImMsgName, rpcReq.OperationID)
if grpcConn == nil {
errMsg := "getcdv3.GetDefaultConn == nil"
log.NewError(rpcReq.OperationID, errMsg)
return nil
}
msgClient := pbChat.NewMsgClient(grpcConn) msgClient := pbChat.NewMsgClient(grpcConn)
reply, err := msgClient.PullMessageBySeqList(context.Background(), &rpcReq) reply, err := msgClient.PullMessageBySeqs(context.Background(), &rpcReq)
if err != nil { if err != nil {
log.Error(operationID, "PullMessageBySeqList failed ", err.Error(), rpcReq.String()) log.Error(operationID, "PullMessageBySeqList failed ", err.Error(), rpcReq.String())
return nil return nil

@ -13,12 +13,10 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/gob" "encoding/gob"
"runtime"
"strings"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"google.golang.org/grpc" "google.golang.org/grpc"
"runtime"
) )
func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) { func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) {
@ -81,33 +79,18 @@ func (ws *WServer) getSeqReq(conn *UserConn, m *Req) {
log.Info(m.OperationID, "argsValidate ", isPass, errCode, errMsg) log.Info(m.OperationID, "argsValidate ", isPass, errCode, errMsg)
if isPass { if isPass {
rpcReq := sdkws.GetMaxAndMinSeqReq{} rpcReq := sdkws.GetMaxAndMinSeqReq{}
rpcReq.GroupIDList = data.(sdkws.GetMaxAndMinSeqReq).GroupIDList rpcReq.GroupIDs = data.(sdkws.GetMaxAndMinSeqReq).GroupIDs
rpcReq.UserID = m.SendID rpcReq.UserID = m.SendID
rpcReq.OperationID = m.OperationID log.Debug(m.OperationID, "Ws call success to getMaxAndMinSeq", m.SendID, m.ReqIdentifier, m.MsgIncr, data.(sdkws.GetMaxAndMinSeqReq).GroupIDs)
log.Debug(m.OperationID, "Ws call success to getMaxAndMinSeq", m.SendID, m.ReqIdentifier, m.MsgIncr, data.(sdkws.GetMaxAndMinSeqReq).GroupIDList) var grpcConn *grpc.ClientConn
grpcConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImMsgName, rpcReq.OperationID)
if grpcConn == nil {
errMsg := rpcReq.OperationID + "getcdv3.GetDefaultConn == nil"
nReply.ErrCode = 500
nReply.ErrMsg = errMsg
log.NewError(rpcReq.OperationID, errMsg)
ws.getSeqResp(conn, m, nReply)
return
}
msgClient := pbChat.NewMsgClient(grpcConn) msgClient := pbChat.NewMsgClient(grpcConn)
rpcReply, err := msgClient.GetMaxAndMinSeq(context.Background(), &rpcReq) rpcReply, err := msgClient.GetMaxAndMinSeq(context.Background(), &rpcReq)
if err != nil { if err != nil {
nReply.ErrCode = 500
nReply.ErrMsg = err.Error()
log.Error(rpcReq.OperationID, "rpc call failed to GetMaxAndMinSeq ", nReply.String())
ws.getSeqResp(conn, m, nReply) ws.getSeqResp(conn, m, nReply)
} else { } else {
log.NewInfo(rpcReq.OperationID, "rpc call success to getSeqReq", rpcReply.String())
ws.getSeqResp(conn, m, rpcReply) ws.getSeqResp(conn, m, rpcReply)
} }
} else { } else {
nReply.ErrCode = errCode
nReply.ErrMsg = errMsg
log.Error(m.OperationID, "argsValidate failed send resp: ", nReply.String()) log.Error(m.OperationID, "argsValidate failed send resp: ", nReply.String())
ws.getSeqResp(conn, m, nReply) ws.getSeqResp(conn, m, nReply)
} }
@ -119,8 +102,6 @@ func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *sdkws.GetMaxAndMinSeqR
mReply := Resp{ mReply := Resp{
ReqIdentifier: m.ReqIdentifier, ReqIdentifier: m.ReqIdentifier,
MsgIncr: m.MsgIncr, MsgIncr: m.MsgIncr,
ErrCode: pb.GetErrCode(),
ErrMsg: pb.GetErrMsg(),
OperationID: m.OperationID, OperationID: m.OperationID,
Data: b, Data: b,
} }
@ -131,52 +112,44 @@ func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *sdkws.GetMaxAndMinSeqR
func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) { func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) {
log.NewInfo(m.OperationID, "Ws call success to pullMsgBySeqListReq start", m.SendID, m.ReqIdentifier, m.MsgIncr, string(m.Data)) log.NewInfo(m.OperationID, "Ws call success to pullMsgBySeqListReq start", m.SendID, m.ReqIdentifier, m.MsgIncr, string(m.Data))
nReply := new(sdkws.PullMessageBySeqListResp) nReply := new(sdkws.PullMessageBySeqsResp)
isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsgBySeqList, m.OperationID) isPass, _, _, data := ws.argsValidate(m, constant.WSPullMsgBySeqList, m.OperationID)
if isPass { if isPass {
rpcReq := sdkws.PullMessageBySeqListReq{} rpcReq := sdkws.PullMessageBySeqsReq{}
rpcReq.SeqList = data.(sdkws.PullMessageBySeqListReq).SeqList rpcReq.Seqs = data.(sdkws.PullMessageBySeqsReq).Seqs
rpcReq.UserID = m.SendID rpcReq.UserID = m.SendID
rpcReq.OperationID = m.OperationID rpcReq.GroupSeqs = data.(sdkws.PullMessageBySeqsReq).GroupSeqs
rpcReq.GroupSeqList = data.(sdkws.PullMessageBySeqListReq).GroupSeqList log.NewInfo(m.OperationID, "Ws call success to pullMsgBySeqListReq middle", m.SendID, m.ReqIdentifier, m.MsgIncr, data.(sdkws.PullMessageBySeqsReq).Seqs)
log.NewInfo(m.OperationID, "Ws call success to pullMsgBySeqListReq middle", m.SendID, m.ReqIdentifier, m.MsgIncr, data.(sdkws.PullMessageBySeqListReq).SeqList) var grpcConn *grpc.ClientConn
grpcConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImMsgName, m.OperationID)
//grpcConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImMsgName, m.OperationID)
if grpcConn == nil { if grpcConn == nil {
errMsg := rpcReq.OperationID + "getcdv3.GetDefaultConn == nil"
nReply.ErrCode = 500
nReply.ErrMsg = errMsg
log.NewError(rpcReq.OperationID, errMsg)
ws.pullMsgBySeqListResp(conn, m, nReply) ws.pullMsgBySeqListResp(conn, m, nReply)
return return
} }
msgClient := pbChat.NewMsgClient(grpcConn) msgClient := pbChat.NewMsgClient(grpcConn)
maxSizeOption := grpc.MaxCallRecvMsgSize(1024 * 1024 * 20) maxSizeOption := grpc.MaxCallRecvMsgSize(1024 * 1024 * 20)
reply, err := msgClient.PullMessageBySeqList(context.Background(), &rpcReq, maxSizeOption) reply, err := msgClient.PullMessageBySeqs(context.Background(), &rpcReq, maxSizeOption)
if err != nil { if err != nil {
log.NewError(rpcReq.OperationID, "pullMsgBySeqListReq err", err.Error())
nReply.ErrCode = 200
nReply.ErrMsg = err.Error()
ws.pullMsgBySeqListResp(conn, m, nReply) ws.pullMsgBySeqListResp(conn, m, nReply)
} else { } else {
log.NewInfo(rpcReq.OperationID, "rpc call success to pullMsgBySeqListReq", reply.String(), len(reply.List)) //log.NewInfo(rpcReq.OperationID, "rpc call success to pullMsgBySeqListReq", reply.String(), len(reply.List))
ws.pullMsgBySeqListResp(conn, m, reply) ws.pullMsgBySeqListResp(conn, m, reply)
} }
} else { } else {
nReply.ErrCode = errCode
nReply.ErrMsg = errMsg
ws.pullMsgBySeqListResp(conn, m, nReply) ws.pullMsgBySeqListResp(conn, m, nReply)
} }
} }
func (ws *WServer) pullMsgBySeqListResp(conn *UserConn, m *Req, pb *sdkws.PullMessageBySeqListResp) { func (ws *WServer) pullMsgBySeqListResp(conn *UserConn, m *Req, pb *sdkws.PullMessageBySeqsResp) {
log.NewInfo(m.OperationID, "pullMsgBySeqListResp come here ", pb.String()) log.NewInfo(m.OperationID, "pullMsgBySeqListResp come here ", pb.String())
c, _ := proto.Marshal(pb) c, _ := proto.Marshal(pb)
mReply := Resp{ mReply := Resp{
ReqIdentifier: m.ReqIdentifier, ReqIdentifier: m.ReqIdentifier,
MsgIncr: m.MsgIncr, MsgIncr: m.MsgIncr,
ErrCode: pb.GetErrCode(), //ErrCode: pb.GetErrCode(),
ErrMsg: pb.GetErrMsg(), //ErrMsg: pb.GetErrMsg(),
OperationID: m.OperationID, OperationID: m.OperationID,
Data: c, Data: c,
} }
log.NewInfo(m.OperationID, "pullMsgBySeqListResp all data is ", mReply.ReqIdentifier, mReply.MsgIncr, mReply.ErrCode, mReply.ErrMsg, log.NewInfo(m.OperationID, "pullMsgBySeqListResp all data is ", mReply.ReqIdentifier, mReply.MsgIncr, mReply.ErrCode, mReply.ErrMsg,
len(mReply.Data)) len(mReply.Data))
@ -188,22 +161,24 @@ func (ws *WServer) userLogoutReq(conn *UserConn, m *Req) {
rpcReq := push.DelUserPushTokenReq{} rpcReq := push.DelUserPushTokenReq{}
rpcReq.UserID = m.SendID rpcReq.UserID = m.SendID
rpcReq.PlatformID = conn.PlatformID rpcReq.PlatformID = conn.PlatformID
rpcReq.OperationID = m.OperationID //rpcReq.OperationID = m.OperationID
grpcConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName, m.OperationID) var grpcConn *grpc.ClientConn
//grpcConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName, m.OperationID)
if grpcConn == nil { if grpcConn == nil {
errMsg := rpcReq.OperationID + "getcdv3.GetDefaultConn == nil" //errMsg := rpcReq.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(rpcReq.OperationID, errMsg) //log.NewError(rpcReq.OperationID, errMsg)
ws.userLogoutResp(conn, m) ws.userLogoutResp(conn, m)
return return
} }
msgClient := push.NewPushMsgServiceClient(grpcConn) msgClient := push.NewPushMsgServiceClient(grpcConn)
reply, err := msgClient.DelUserPushToken(context.Background(), &rpcReq) _, err := msgClient.DelUserPushToken(context.Background(), &rpcReq)
if err != nil { if err != nil {
log.NewError(rpcReq.OperationID, "DelUserPushToken err", err.Error()) //log.NewError(rpcReq.OperationID, "DelUserPushToken err", err.Error())
ws.userLogoutResp(conn, m) ws.userLogoutResp(conn, m)
} else { } else {
log.NewInfo(rpcReq.OperationID, "rpc call success to DelUserPushToken", reply.String()) //log.NewInfo(rpcReq.OperationID, "rpc call success to DelUserPushToken", reply.String())
ws.userLogoutResp(conn, m) ws.userLogoutResp(conn, m)
} }
ws.userLogoutResp(conn, m) ws.userLogoutResp(conn, m)
@ -225,39 +200,41 @@ func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) {
log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID) log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID)
nReply := new(pbChat.SendMsgResp) nReply := new(pbChat.SendMsgResp)
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg, m.OperationID) isPass, _, _, pData := ws.argsValidate(m, constant.WSSendMsg, m.OperationID)
if isPass { if isPass {
data := pData.(sdkws.MsgData) data := pData.(sdkws.MsgData)
pbData := pbChat.SendMsgReq{ pbData := pbChat.SendMsgReq{
Token: m.Token, //Token: m.Token,
OperationID: m.OperationID, //OperationID: m.OperationID,
MsgData: &data, MsgData: &data,
} }
log.NewInfo(m.OperationID, "Ws call success to sendMsgReq middle", m.ReqIdentifier, m.SendID, m.MsgIncr, data.String()) log.NewInfo(m.OperationID, "Ws call success to sendMsgReq middle", m.ReqIdentifier, m.SendID, m.MsgIncr, data.String())
etcdConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImMsgName, m.OperationID) var grpcConn *grpc.ClientConn
if etcdConn == nil {
//etcdConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImMsgName, m.OperationID)
if grpcConn == nil {
errMsg := m.OperationID + "getcdv3.GetDefaultConn == nil" errMsg := m.OperationID + "getcdv3.GetDefaultConn == nil"
nReply.ErrCode = 500 //nReply.ErrCode = 500
nReply.ErrMsg = errMsg //nReply.ErrMsg = errMsg
log.NewError(m.OperationID, errMsg) log.NewError(m.OperationID, errMsg)
ws.sendMsgResp(conn, m, nReply) ws.sendMsgResp(conn, m, nReply)
return return
} }
client := pbChat.NewMsgClient(etcdConn) client := pbChat.NewMsgClient(grpcConn)
reply, err := client.SendMsg(context.Background(), &pbData) reply, err := client.SendMsg(context.Background(), &pbData)
if err != nil { if err != nil {
log.NewError(pbData.OperationID, "UserSendMsg err", err.Error()) //log.NewError(pbData.OperationID, "UserSendMsg err", err.Error())
nReply.ErrCode = 200 //nReply.ErrCode = 200
nReply.ErrMsg = err.Error() //nReply.ErrMsg = err.Error()
ws.sendMsgResp(conn, m, nReply) ws.sendMsgResp(conn, m, nReply)
} else { } else {
log.NewInfo(pbData.OperationID, "rpc call success to sendMsgReq", reply.String()) //log.NewInfo(pbData.OperationID, "rpc call success to sendMsgReq", reply.String())
ws.sendMsgResp(conn, m, reply) ws.sendMsgResp(conn, m, reply)
} }
} else { } else {
nReply.ErrCode = errCode //nReply.ErrCode = errCode
nReply.ErrMsg = errMsg //nReply.ErrMsg = errMsg
ws.sendMsgResp(conn, m, nReply) ws.sendMsgResp(conn, m, nReply)
} }
@ -271,8 +248,6 @@ func (ws *WServer) sendMsgResp(conn *UserConn, m *Req, pb *pbChat.SendMsgResp) {
mReply := Resp{ mReply := Resp{
ReqIdentifier: m.ReqIdentifier, ReqIdentifier: m.ReqIdentifier,
MsgIncr: m.MsgIncr, MsgIncr: m.MsgIncr,
ErrCode: pb.GetErrCode(),
ErrMsg: pb.GetErrMsg(),
OperationID: m.OperationID, OperationID: m.OperationID,
Data: b, Data: b,
} }
@ -282,20 +257,21 @@ func (ws *WServer) sendMsgResp(conn *UserConn, m *Req, pb *pbChat.SendMsgResp) {
func (ws *WServer) sendSignalMsgReq(conn *UserConn, m *Req) { func (ws *WServer) sendSignalMsgReq(conn *UserConn, m *Req) {
log.NewInfo(m.OperationID, "Ws call success to sendSignalMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, string(m.Data)) log.NewInfo(m.OperationID, "Ws call success to sendSignalMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, string(m.Data))
nReply := new(pbChat.SendMsgResp) //nReply := new(pbChat.SendMsgResp)
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendSignalMsg, m.OperationID) isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendSignalMsg, m.OperationID)
if isPass { if isPass {
signalResp := pbRtc.SignalResp{} signalResp := sdkws.SignalResp{}
etcdConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImRtcName, m.OperationID) var grpcConn *grpc.ClientConn
if etcdConn == nil { //etcdConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImRtcName, m.OperationID)
if grpcConn == nil {
errMsg := m.OperationID + "getcdv3.GetDefaultConn == nil" errMsg := m.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(m.OperationID, errMsg) log.NewError(m.OperationID, errMsg)
ws.sendSignalMsgResp(conn, 204, errMsg, m, &signalResp) ws.sendSignalMsgResp(conn, 204, errMsg, m, &signalResp)
return return
} }
rtcClient := pbRtc.NewRtcServiceClient(etcdConn) rtcClient := pbRtc.NewRtcServiceClient(grpcConn)
req := &pbRtc.SignalMessageAssembleReq{ req := &pbRtc.SignalMessageAssembleReq{
SignalReq: pData.(*pbRtc.SignalReq), SignalReq: pData.(*sdkws.SignalReq),
OperationID: m.OperationID, OperationID: m.OperationID,
} }
respPb, err := rtcClient.SignalMessageAssemble(context.Background(), req) respPb, err := rtcClient.SignalMessageAssemble(context.Background(), req)
@ -310,40 +286,41 @@ func (ws *WServer) sendSignalMsgReq(conn *UserConn, m *Req) {
log.NewInfo(m.OperationID, utils.GetSelfFuncName(), respPb.String()) log.NewInfo(m.OperationID, utils.GetSelfFuncName(), respPb.String())
if respPb.IsPass { if respPb.IsPass {
pbData := pbChat.SendMsgReq{ pbData := pbChat.SendMsgReq{
Token: m.Token, //Token: m.Token,
OperationID: m.OperationID, //OperationID: m.OperationID,
MsgData: &msgData, MsgData: &msgData,
} }
log.NewInfo(m.OperationID, utils.GetSelfFuncName(), "pbData: ", pbData) log.NewInfo(m.OperationID, utils.GetSelfFuncName(), "pbData: ", pbData)
log.NewInfo(m.OperationID, "Ws call success to sendSignalMsgReq middle", m.ReqIdentifier, m.SendID, m.MsgIncr, msgData) log.NewInfo(m.OperationID, "Ws call success to sendSignalMsgReq middle", m.ReqIdentifier, m.SendID, m.MsgIncr, msgData)
etcdConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImMsgName, m.OperationID) var grpcConn *grpc.ClientConn
if etcdConn == nil { //etcdConn := rpc.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImMsgName, m.OperationID)
if grpcConn == nil {
errMsg := m.OperationID + "getcdv3.GetDefaultConn == nil" errMsg := m.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(m.OperationID, errMsg) log.NewError(m.OperationID, errMsg)
ws.sendSignalMsgResp(conn, 200, errMsg, m, &signalResp) ws.sendSignalMsgResp(conn, 200, errMsg, m, &signalResp)
return return
} }
client := pbChat.NewMsgClient(etcdConn) client := pbChat.NewMsgClient(grpcConn)
reply, err := client.SendMsg(context.Background(), &pbData) _, err := client.SendMsg(context.Background(), &pbData)
if err != nil { if err != nil {
log.NewError(pbData.OperationID, utils.GetSelfFuncName(), "rpc sendMsg err", err.Error()) //log.NewError(pbData.OperationID, utils.GetSelfFuncName(), "rpc sendMsg err", err.Error())
nReply.ErrCode = 200 //nReply.ErrCode = 200
nReply.ErrMsg = err.Error() //nReply.ErrMsg = err.Error()
ws.sendSignalMsgResp(conn, 200, err.Error(), m, &signalResp) ws.sendSignalMsgResp(conn, 200, err.Error(), m, &signalResp)
} else { } else {
log.NewInfo(pbData.OperationID, "rpc call success to sendMsgReq", reply.String(), signalResp.String(), m) //log.NewInfo(pbData.OperationID, "rpc call success to sendMsgReq", reply.String(), signalResp.String(), m)
ws.sendSignalMsgResp(conn, 0, "", m, &signalResp) ws.sendSignalMsgResp(conn, 0, "", m, &signalResp)
} }
} else { } else {
log.NewError(m.OperationID, utils.GetSelfFuncName(), respPb.IsPass, respPb.CommonResp.ErrCode, respPb.CommonResp.ErrMsg) //log.NewError(m.OperationID, utils.GetSelfFuncName(), respPb.IsPass, respPb.CommonResp.ErrCode, respPb.CommonResp.ErrMsg)
ws.sendSignalMsgResp(conn, respPb.CommonResp.ErrCode, respPb.CommonResp.ErrMsg, m, &signalResp) //ws.sendSignalMsgResp(conn, respPb.CommonResp.ErrCode, respPb.CommonResp.ErrMsg, m, &signalResp)
} }
} else { } else {
ws.sendSignalMsgResp(conn, errCode, errMsg, m, nil) ws.sendSignalMsgResp(conn, errCode, errMsg, m, nil)
} }
} }
func (ws *WServer) sendSignalMsgResp(conn *UserConn, errCode int32, errMsg string, m *Req, pb *pbRtc.SignalResp) { func (ws *WServer) sendSignalMsgResp(conn *UserConn, errCode int32, errMsg string, m *Req, pb *sdkws.SignalResp) {
// := make(map[string]interface{}) // := make(map[string]interface{})
log.Debug(m.OperationID, "sendSignalMsgResp is", pb.String()) log.Debug(m.OperationID, "sendSignalMsgResp is", pb.String())
b, _ := proto.Marshal(pb) b, _ := proto.Marshal(pb)
@ -386,19 +363,19 @@ func (ws *WServer) sendErrMsg(conn *UserConn, errCode int32, errMsg string, reqI
} }
func SetTokenKicked(userID string, platformID int, operationID string) { func SetTokenKicked(userID string, platformID int, operationID string) {
m, err := db.DB.GetTokenMapByUidPid(userID, constant.PlatformIDToName(platformID)) //m, err := db.DB.GetTokenMapByUidPid(userID, constant.PlatformIDToName(platformID))
if err != nil { //if err != nil {
log.Error(operationID, "GetTokenMapByUidPid failed ", err.Error(), userID, constant.PlatformIDToName(platformID)) // log.Error(operationID, "GetTokenMapByUidPid failed ", err.Error(), userID, constant.PlatformIDToName(platformID))
return // return
} //}
for k, _ := range m { //for k, _ := range m {
m[k] = constant.KickedToken // m[k] = constant.KickedToken
} //}
err = db.DB.SetTokenMapByUidPid(userID, platformID, m) //err = db.DB.SetTokenMapByUidPid(userID, platformID, m)
if err != nil { //if err != nil {
log.Error(operationID, "SetTokenMapByUidPid failed ", err.Error(), userID, constant.PlatformIDToName(platformID)) // log.Error(operationID, "SetTokenMapByUidPid failed ", err.Error(), userID, constant.PlatformIDToName(platformID))
return // return
} //}
} }
func (ws *WServer) setUserDeviceBackground(conn *UserConn, m *Req) { func (ws *WServer) setUserDeviceBackground(conn *UserConn, m *Req) {
@ -406,10 +383,10 @@ func (ws *WServer) setUserDeviceBackground(conn *UserConn, m *Req) {
if isPass { if isPass {
req := pData.(*sdkws.SetAppBackgroundStatusReq) req := pData.(*sdkws.SetAppBackgroundStatusReq)
conn.IsBackground = req.IsBackground conn.IsBackground = req.IsBackground
callbackResp := callbackUserOnline(m.OperationID, conn.userID, int(conn.PlatformID), conn.token, conn.IsBackground, conn.connID) //callbackResp := callbackUserOnline(m.OperationID, conn.userID, int(conn.PlatformID), conn.token, conn.IsBackground, conn.connID)
if callbackResp.ErrCode != 0 { //if callbackResp.ErrCode != 0 {
log.NewError(m.OperationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp) // log.NewError(m.OperationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp)
} //}
log.NewInfo(m.OperationID, "SetUserDeviceBackground", "success", *conn, req.IsBackground) log.NewInfo(m.OperationID, "SetUserDeviceBackground", "success", *conn, req.IsBackground)
} }
ws.setUserDeviceBackgroundResp(conn, m, errCode, errMsg) ws.setUserDeviceBackgroundResp(conn, m, errCode, errMsg)

Loading…
Cancel
Save