From a020e19ec0ad2668a0dbfdcd48ba4ec60af1479b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=85=B7=E8=B5=9E?= Date: Wed, 24 Aug 2022 21:10:42 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B5=81=E7=A8=8B=E6=9B=B4=E6=96=B0=E5=90=8E?= =?UTF-8?q?=E7=9A=84=E4=BB=A3=E7=A0=81=E6=95=B4=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../logic/online_history_msg_handler.go | 52 +++++++++---------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 58fa3dbda..a3cd12348 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -512,32 +512,32 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG // return nil //} -func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) { - log.Info(message.OperationID, "msg_transfer send message to push", "message", message.String()) - rpcPushMsg := pbPush.PushMsgReq{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID} - mqPushMsg := pbMsg.PushMsgDataToMQ{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID} - grpcConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName, message.OperationID) - if grpcConn == nil { - log.Error(rpcPushMsg.OperationID, "rpc dial failed", "push data", rpcPushMsg.String()) - pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID) - if err != nil { - log.Error(mqPushMsg.OperationID, "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error()) - } - return - } - msgClient := pbPush.NewPushMsgServiceClient(grpcConn) - _, err := msgClient.PushMsg(context.Background(), &rpcPushMsg) - if err != nil { - log.Error(rpcPushMsg.OperationID, "rpc send failed", rpcPushMsg.OperationID, "push data", rpcPushMsg.String(), "err", err.Error()) - pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID) - if err != nil { - log.Error(message.OperationID, "kafka send failed", mqPushMsg.OperationID, "send data", mqPushMsg.String(), "pid", pid, "offset", offset, "err", err.Error()) - } - } else { - log.Info(message.OperationID, "rpc send success", rpcPushMsg.OperationID, "push data", rpcPushMsg.String()) - - } -} +//func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) { +// log.Info(message.OperationID, "msg_transfer send message to push", "message", message.String()) +// rpcPushMsg := pbPush.PushMsgReq{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID} +// mqPushMsg := pbMsg.PushMsgDataToMQ{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID} +// grpcConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName, message.OperationID) +// if grpcConn == nil { +// log.Error(rpcPushMsg.OperationID, "rpc dial failed", "push data", rpcPushMsg.String()) +// pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID) +// if err != nil { +// log.Error(mqPushMsg.OperationID, "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error()) +// } +// return +// } +// msgClient := pbPush.NewPushMsgServiceClient(grpcConn) +// _, err := msgClient.PushMsg(context.Background(), &rpcPushMsg) +// if err != nil { +// log.Error(rpcPushMsg.OperationID, "rpc send failed", rpcPushMsg.OperationID, "push data", rpcPushMsg.String(), "err", err.Error()) +// pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID) +// if err != nil { +// log.Error(message.OperationID, "kafka send failed", mqPushMsg.OperationID, "send data", mqPushMsg.String(), "pid", pid, "offset", offset, "err", err.Error()) +// } +// } else { +// log.Info(message.OperationID, "rpc send success", rpcPushMsg.OperationID, "push data", rpcPushMsg.String()) +// +// } +//} func sendMessageToPushMQ(message *pbMsg.MsgDataToMQ, pushToUserID string) { log.Info(message.OperationID, utils.GetSelfFuncName(), "msg ", message.String(), pushToUserID)