流程更新后的代码整改

pull/270/head
酷赞 3 years ago
parent a815ed3989
commit a020e19ec0

@ -512,32 +512,32 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG
// return nil // return nil
//} //}
func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) { //func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) {
log.Info(message.OperationID, "msg_transfer send message to push", "message", message.String()) // log.Info(message.OperationID, "msg_transfer send message to push", "message", message.String())
rpcPushMsg := pbPush.PushMsgReq{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID} // rpcPushMsg := pbPush.PushMsgReq{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID}
mqPushMsg := pbMsg.PushMsgDataToMQ{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID} // mqPushMsg := pbMsg.PushMsgDataToMQ{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID}
grpcConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName, message.OperationID) // grpcConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName, message.OperationID)
if grpcConn == nil { // if grpcConn == nil {
log.Error(rpcPushMsg.OperationID, "rpc dial failed", "push data", rpcPushMsg.String()) // log.Error(rpcPushMsg.OperationID, "rpc dial failed", "push data", rpcPushMsg.String())
pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID) // pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID)
if err != nil { // if err != nil {
log.Error(mqPushMsg.OperationID, "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error()) // log.Error(mqPushMsg.OperationID, "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
} // }
return // return
} // }
msgClient := pbPush.NewPushMsgServiceClient(grpcConn) // msgClient := pbPush.NewPushMsgServiceClient(grpcConn)
_, err := msgClient.PushMsg(context.Background(), &rpcPushMsg) // _, err := msgClient.PushMsg(context.Background(), &rpcPushMsg)
if err != nil { // if err != nil {
log.Error(rpcPushMsg.OperationID, "rpc send failed", rpcPushMsg.OperationID, "push data", rpcPushMsg.String(), "err", err.Error()) // log.Error(rpcPushMsg.OperationID, "rpc send failed", rpcPushMsg.OperationID, "push data", rpcPushMsg.String(), "err", err.Error())
pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID) // pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID)
if err != nil { // if err != nil {
log.Error(message.OperationID, "kafka send failed", mqPushMsg.OperationID, "send data", mqPushMsg.String(), "pid", pid, "offset", offset, "err", err.Error()) // log.Error(message.OperationID, "kafka send failed", mqPushMsg.OperationID, "send data", mqPushMsg.String(), "pid", pid, "offset", offset, "err", err.Error())
} // }
} else { // } else {
log.Info(message.OperationID, "rpc send success", rpcPushMsg.OperationID, "push data", rpcPushMsg.String()) // log.Info(message.OperationID, "rpc send success", rpcPushMsg.OperationID, "push data", rpcPushMsg.String())
//
} // }
} //}
func sendMessageToPushMQ(message *pbMsg.MsgDataToMQ, pushToUserID string) { func sendMessageToPushMQ(message *pbMsg.MsgDataToMQ, pushToUserID string) {
log.Info(message.OperationID, utils.GetSelfFuncName(), "msg ", message.String(), pushToUserID) log.Info(message.OperationID, utils.GetSelfFuncName(), "msg ", message.String(), pushToUserID)

Loading…
Cancel
Save