|
|
@ -226,6 +226,7 @@ func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) {
|
|
|
|
func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) {
|
|
|
|
func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) {
|
|
|
|
replay := pbChat.SendMsgResp{}
|
|
|
|
replay := pbChat.SendMsgResp{}
|
|
|
|
newTime := db.GetCurrentTimestampByMill()
|
|
|
|
newTime := db.GetCurrentTimestampByMill()
|
|
|
|
|
|
|
|
t1 := time.Now()
|
|
|
|
log.Info(pb.OperationID, "rpc sendMsg come here ", pb.String())
|
|
|
|
log.Info(pb.OperationID, "rpc sendMsg come here ", pb.String())
|
|
|
|
flag, errCode, errMsg := isMessageHasReadEnabled(pb)
|
|
|
|
flag, errCode, errMsg := isMessageHasReadEnabled(pb)
|
|
|
|
log.Info(pb.OperationID, "isMessageHasReadEnabled ", flag)
|
|
|
|
log.Info(pb.OperationID, "isMessageHasReadEnabled ", flag)
|
|
|
@ -233,15 +234,18 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
|
|
|
|
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
|
|
|
|
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
flag, errCode, errMsg, _ = messageVerification(pb)
|
|
|
|
flag, errCode, errMsg, _ = messageVerification(pb)
|
|
|
|
log.Info(pb.OperationID, "userRelationshipVerification ", flag)
|
|
|
|
log.Info(pb.OperationID, "messageVerification ", flag, " cost time: ", time.Since(t1))
|
|
|
|
if !flag {
|
|
|
|
if !flag {
|
|
|
|
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
|
|
|
|
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
t1 = time.Now()
|
|
|
|
rpc.encapsulateMsgData(pb.MsgData)
|
|
|
|
rpc.encapsulateMsgData(pb.MsgData)
|
|
|
|
|
|
|
|
log.Info(pb.OperationID, "encapsulateMsgData ", " cost time: ", time.Since(t1))
|
|
|
|
msgToMQSingle := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData}
|
|
|
|
msgToMQSingle := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData}
|
|
|
|
// callback
|
|
|
|
// callback
|
|
|
|
|
|
|
|
t1 = time.Now()
|
|
|
|
callbackResp := callbackWordFilter(pb)
|
|
|
|
callbackResp := callbackWordFilter(pb)
|
|
|
|
log.Info(pb.OperationID, "callbackWordFilter ", callbackResp)
|
|
|
|
log.Info(pb.OperationID, "callbackWordFilter ", callbackResp, "cost time: ", time.Since(t1))
|
|
|
|
if callbackResp.ErrCode != 0 {
|
|
|
|
if callbackResp.ErrCode != 0 {
|
|
|
|
log.Error(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter resp: ", callbackResp)
|
|
|
|
log.Error(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter resp: ", callbackResp)
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -256,7 +260,9 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
|
|
|
|
switch pb.MsgData.SessionType {
|
|
|
|
switch pb.MsgData.SessionType {
|
|
|
|
case constant.SingleChatType:
|
|
|
|
case constant.SingleChatType:
|
|
|
|
// callback
|
|
|
|
// callback
|
|
|
|
|
|
|
|
t1 = time.Now()
|
|
|
|
callbackResp := callbackBeforeSendSingleMsg(pb)
|
|
|
|
callbackResp := callbackBeforeSendSingleMsg(pb)
|
|
|
|
|
|
|
|
log.Info(pb.OperationID, "callbackBeforeSendSingleMsg ", " cost time: ", time.Since(t1))
|
|
|
|
if callbackResp.ErrCode != 0 {
|
|
|
|
if callbackResp.ErrCode != 0 {
|
|
|
|
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg resp: ", callbackResp)
|
|
|
|
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg resp: ", callbackResp)
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -267,28 +273,37 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
|
|
|
|
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", "end rpc and return", callbackResp)
|
|
|
|
log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", "end rpc and return", callbackResp)
|
|
|
|
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0)
|
|
|
|
return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
t1 = time.Now()
|
|
|
|
isSend := modifyMessageByUserMessageReceiveOpt(pb.MsgData.RecvID, pb.MsgData.SendID, constant.SingleChatType, pb)
|
|
|
|
isSend := modifyMessageByUserMessageReceiveOpt(pb.MsgData.RecvID, pb.MsgData.SendID, constant.SingleChatType, pb)
|
|
|
|
|
|
|
|
log.Info(pb.OperationID, "modifyMessageByUserMessageReceiveOpt ", " cost time: ", time.Since(t1))
|
|
|
|
if isSend {
|
|
|
|
if isSend {
|
|
|
|
msgToMQSingle.MsgData = pb.MsgData
|
|
|
|
msgToMQSingle.MsgData = pb.MsgData
|
|
|
|
log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle)
|
|
|
|
log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle)
|
|
|
|
|
|
|
|
t1 = time.Now()
|
|
|
|
err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus)
|
|
|
|
err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus)
|
|
|
|
|
|
|
|
log.Info(pb.OperationID, "sendMsgToKafka ", " cost time: ", time.Since(t1))
|
|
|
|
if err1 != nil {
|
|
|
|
if err1 != nil {
|
|
|
|
log.NewError(msgToMQSingle.OperationID, "kafka send msg err :RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String(), err1.Error())
|
|
|
|
log.NewError(msgToMQSingle.OperationID, "kafka send msg err :RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String(), err1.Error())
|
|
|
|
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
|
|
|
|
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself
|
|
|
|
if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself
|
|
|
|
|
|
|
|
t1 = time.Now()
|
|
|
|
err2 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.SendID, constant.OnlineStatus)
|
|
|
|
err2 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.SendID, constant.OnlineStatus)
|
|
|
|
|
|
|
|
log.Info(pb.OperationID, "sendMsgToKafka ", " cost time: ", time.Since(t1))
|
|
|
|
if err2 != nil {
|
|
|
|
if err2 != nil {
|
|
|
|
log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String())
|
|
|
|
log.NewError(msgToMQSingle.OperationID, "kafka send msg err:SendID", msgToMQSingle.MsgData.SendID, msgToMQSingle.String())
|
|
|
|
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
|
|
|
|
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// callback
|
|
|
|
// callback
|
|
|
|
|
|
|
|
t1 = time.Now()
|
|
|
|
callbackResp = callbackAfterSendSingleMsg(pb)
|
|
|
|
callbackResp = callbackAfterSendSingleMsg(pb)
|
|
|
|
|
|
|
|
log.Info(pb.OperationID, "callbackAfterSendSingleMsg ", " cost time: ", time.Since(t1))
|
|
|
|
if callbackResp.ErrCode != 0 {
|
|
|
|
if callbackResp.ErrCode != 0 {
|
|
|
|
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSingleMsg resp: ", callbackResp)
|
|
|
|
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSingleMsg resp: ", callbackResp)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Debug(pb.OperationID, "send msg cost time all: ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID)
|
|
|
|
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
|
|
|
|
return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime)
|
|
|
|
case constant.GroupChatType:
|
|
|
|
case constant.GroupChatType:
|
|
|
|
// callback
|
|
|
|
// callback
|
|
|
|