diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 416474efa..2c470ef4a 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -242,7 +242,7 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG func (och *OnlineHistoryRedisConsumerHandler) sendMessageToPushMQ(ctx context.Context, message *pbMsg.MsgDataToMQ, pushToUserID string) { mqPushMsg := pbMsg.PushMsgDataToMQ{MsgData: message.MsgData, SourceID: pushToUserID} - pid, offset, err := och.producerToPush.SendMessage(ctx, &mqPushMsg, mqPushMsg.SourceID) + pid, offset, err := och.producerToPush.SendMessage(ctx, mqPushMsg.SourceID, &mqPushMsg) if err != nil { log.Error(tracelog.GetOperationID(ctx), "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error()) } @@ -251,7 +251,7 @@ func (och *OnlineHistoryRedisConsumerHandler) sendMessageToPushMQ(ctx context.Co func (och *OnlineHistoryRedisConsumerHandler) sendMessageToModifyMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ) { if len(messages) > 0 { - pid, offset, err := och.producerToModify.SendMessage(ctx, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, Messages: messages, TriggerID: triggerID}, aggregationID) + pid, offset, err := och.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, Messages: messages, TriggerID: triggerID}) if err != nil { log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID) } @@ -260,7 +260,7 @@ func (och *OnlineHistoryRedisConsumerHandler) sendMessageToModifyMQ(ctx context. func (och *OnlineHistoryRedisConsumerHandler) SendMessageToMongoCH(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) { if len(messages) > 0 { - pid, offset, err := och.producerToMongo.SendMessage(ctx, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, Messages: messages, TriggerID: triggerID}, aggregationID) + pid, offset, err := och.producerToMongo.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, Messages: messages, TriggerID: triggerID}) if err != nil { log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID) }