pull/232/head
skiffer-git 2 years ago
parent d1293dd047
commit 96f09c45f5

@ -547,7 +547,7 @@ func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) {
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName)
if grpcConn == nil {
log.Error(rpcPushMsg.OperationID, "rpc dial failed", "push data", rpcPushMsg.String())
pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID)
pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID)
if err != nil {
log.Error(mqPushMsg.OperationID, "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
}
@ -557,7 +557,7 @@ func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) {
_, err := msgClient.PushMsg(context.Background(), &rpcPushMsg)
if err != nil {
log.Error(rpcPushMsg.OperationID, "rpc send failed", rpcPushMsg.OperationID, "push data", rpcPushMsg.String(), "err", err.Error())
pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID)
pid, offset, err := producer.SendMessage(&mqPushMsg, mqPushMsg.PushToUserID, rpcPushMsg.OperationID)
if err != nil {
log.Error(message.OperationID, "kafka send failed", mqPushMsg.OperationID, "send data", mqPushMsg.String(), "pid", pid, "offset", offset, "err", err.Error())
}

@ -376,7 +376,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status string) error {
switch status {
case constant.OnlineStatus:
pid, offset, err := rpc.onlineProducer.SendMessage(m, key)
pid, offset, err := rpc.onlineProducer.SendMessage(m, key, m.OperationID)
if err != nil {
log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status)
} else {
@ -384,7 +384,7 @@ func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status str
}
return err
case constant.OfflineStatus:
pid, offset, err := rpc.onlineProducer.SendMessage(m, key)
pid, offset, err := rpc.onlineProducer.SendMessage(m, key, m.OperationID)
if err != nil {
log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status)
}

@ -33,19 +33,22 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
return &p
}
func (p *Producer) SendMessage(m proto.Message, key string) (int32, int64, error) {
func (p *Producer) SendMessage(m proto.Message, key string, operationID string) (int32, int64, error) {
log2.Info(operationID, "SendMessage", "key ", key, m.String(), p.producer)
kMsg := &sarama.ProducerMessage{}
kMsg.Topic = p.topic
kMsg.Key = sarama.StringEncoder(key)
bMsg, err := proto.Marshal(m)
if err != nil {
log2.Error("", "", "proto marshal err = %s", err.Error())
log2.Error(operationID, "", "proto marshal err = %s", err.Error())
return -1, -1, err
}
if len(bMsg) == 0 {
return 0, 0, errors.New("msg content is nil")
}
kMsg.Value = sarama.ByteEncoder(bMsg)
return p.producer.SendMessage(kMsg)
log2.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer)
a, b, c := p.producer.SendMessage(kMsg)
log2.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg, p.producer)
return a, b, c
}

Loading…
Cancel
Save