pull/218/head
Gordon 3 years ago
parent 0793b2ab38
commit eb63ed926d

@ -14,6 +14,7 @@ import (
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"strings" "strings"
"time"
) )
type fcb func(msg []byte, msgKey string) type fcb func(msg []byte, msgKey string)
@ -38,7 +39,7 @@ func (mc *HistoryConsumerHandler) Init() {
} }
func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) { func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) {
time := utils.GetCurrentTimestampByNano() now := time.Now()
msgFromMQ := pbMsg.MsgDataToMQ{} msgFromMQ := pbMsg.MsgDataToMQ{}
err := proto.Unmarshal(msg, &msgFromMQ) err := proto.Unmarshal(msg, &msgFromMQ)
if err != nil { if err != nil {
@ -62,13 +63,13 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
return return
} }
mc.singleMsgCount++ mc.singleMsgCount++
log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", utils.GetCurrentTimestampByNano()-time) log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
} }
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
} else { } else {
go sendMessageToPush(&msgFromMQ, msgKey) go sendMessageToPush(&msgFromMQ, msgKey)
} }
log.NewDebug(operationID, "saveUserChat cost time ", utils.GetCurrentTimestampByNano()-time) log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now))
case constant.GroupChatType: case constant.GroupChatType:
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist) log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist)
if isHistory { if isHistory {
@ -89,13 +90,13 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
return return
} }
mc.singleMsgCount++ mc.singleMsgCount++
log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", utils.GetCurrentTimestampByNano()-time) log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now))
} }
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
} else { } else {
go sendMessageToPush(&msgFromMQ, msgKey) go sendMessageToPush(&msgFromMQ, msgKey)
} }
log.NewDebug(operationID, "saveUserChat cost time ", utils.GetCurrentTimestampByNano()-time) log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now))
default: default:
log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
return return

@ -37,7 +37,7 @@ type AtContent struct {
func MsgToUser(pushMsg *pbPush.PushMsgReq) { func MsgToUser(pushMsg *pbPush.PushMsgReq) {
var wsResult []*pbRelay.SingleMsgToUser var wsResult []*pbRelay.SingleMsgToUser
isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush) isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush)
log.Debug("Get msg from msg_transfer And push msg", pushMsg.OperationID, "PushData", pushMsg.String()) log.Debug(pushMsg.OperationID, "Get msg from msg_transfer And push msg", pushMsg.String())
grpcCons := getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName) grpcCons := getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName)
//Online push message //Online push message
log.Debug("test", pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String()) log.Debug("test", pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String())
@ -45,14 +45,14 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) {
msgClient := pbRelay.NewOnlineMessageRelayServiceClient(v) msgClient := pbRelay.NewOnlineMessageRelayServiceClient(v)
reply, err := msgClient.OnlinePushMsg(context.Background(), &pbRelay.OnlinePushMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserID: pushMsg.PushToUserID}) reply, err := msgClient.OnlinePushMsg(context.Background(), &pbRelay.OnlinePushMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserID: pushMsg.PushToUserID})
if err != nil { if err != nil {
log.InfoByKv("push data to client rpc err", pushMsg.OperationID, "err", err) log.NewError("push data to client rpc err", pushMsg.OperationID, "err", err)
continue continue
} }
if reply != nil && reply.Resp != nil { if reply != nil && reply.Resp != nil {
wsResult = append(wsResult, reply.Resp...) wsResult = append(wsResult, reply.Resp...)
} }
} }
log.InfoByKv("push_result", pushMsg.OperationID, "result", wsResult, "sendData", pushMsg.MsgData) log.NewInfo(pushMsg.OperationID, "push_result", wsResult, "sendData", pushMsg.MsgData)
count++ count++
if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID { if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID {
for _, v := range wsResult { for _, v := range wsResult {

Loading…
Cancel
Save