diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index f6f0b04bc..84b33cb4f 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -547,7 +547,7 @@ func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) { grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName) if grpcConn == nil { log.Error(rpcPushMsg.OperationID, "rpc dial failed", "push data", rpcPushMsg.String()) - pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID) + pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID) if err != nil { log.Error(mqPushMsg.OperationID, "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error()) } @@ -557,7 +557,7 @@ func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) { _, err := msgClient.PushMsg(context.Background(), &rpcPushMsg) if err != nil { log.Error(rpcPushMsg.OperationID, "rpc send failed", rpcPushMsg.OperationID, "push data", rpcPushMsg.String(), "err", err.Error()) - pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID) + pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID) if err != nil { log.Error(message.OperationID, "kafka send failed", mqPushMsg.OperationID, "send data", mqPushMsg.String(), "pid", pid, "offset", offset, "err", err.Error()) } diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 7897d3f3d..1000c6771 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -376,7 +376,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status string) error { switch status { case constant.OnlineStatus: - pid, offset, err := rpc.onlineProducer.SendMessage(m, key) + pid, offset, err := rpc.onlineProducer.SendMessage(m, key, m.OperationID) if err != nil { log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) } else { @@ -384,7 +384,7 @@ func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status str } return err case constant.OfflineStatus: - pid, offset, err := rpc.onlineProducer.SendMessage(m, key) + pid, offset, err := rpc.onlineProducer.SendMessage(m, key, m.OperationID) if err != nil { log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) } diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 1c7eeb6fb..3d8ca02f5 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -33,19 +33,22 @@ func NewKafkaProducer(addr []string, topic string) *Producer { return &p } -func (p *Producer) SendMessage(m proto.Message, key string) (int32, int64, error) { +func (p *Producer) SendMessage(m proto.Message, key string, operationID string) (int32, int64, error) { + log2.Info(operationID, "SendMessage", "key ", key, m.String(), p.producer) kMsg := &sarama.ProducerMessage{} kMsg.Topic = p.topic kMsg.Key = sarama.StringEncoder(key) bMsg, err := proto.Marshal(m) if err != nil { - log2.Error("", "", "proto marshal err = %s", err.Error()) + log2.Error(operationID, "", "proto marshal err = %s", err.Error()) return -1, -1, err } if len(bMsg) == 0 { return 0, 0, errors.New("msg content is nil") } kMsg.Value = sarama.ByteEncoder(bMsg) - - return p.producer.SendMessage(kMsg) + log2.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer) + a, b, c := p.producer.SendMessage(kMsg) + log2.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg, p.producer) + return a, b, c }