From f373669f026cfa1ceeb96efe388cf80dc3d131e6 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 11 May 2022 19:31:32 +0800 Subject: [PATCH] test --- internal/msg_transfer/logic/offline_history_msg_handler.go | 5 ++--- internal/msg_transfer/logic/online_history_msg_handler.go | 1 + 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/msg_transfer/logic/offline_history_msg_handler.go b/internal/msg_transfer/logic/offline_history_msg_handler.go index 16e6181ac..a62bba4e2 100644 --- a/internal/msg_transfer/logic/offline_history_msg_handler.go +++ b/internal/msg_transfer/logic/offline_history_msg_handler.go @@ -98,10 +98,9 @@ func (OfflineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error func (OfflineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (mc *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group - - log.NewDebug("", "new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) + log.NewDebug("", "offline new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) for msg := range claim.Messages() { - log.NewDebug("", "kafka get info to delay mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", "offline") + log.NewDebug("", "kafka get info to delay mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "offline") //mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) } //select { diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 0286a348b..755ca4433 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -133,6 +133,7 @@ func (OnlineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error func (OnlineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group + log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) och.TriggerCmd(OnlineTopicBusy) SetOnlineTopicStatus(OnlineTopicBusy) for msg := range claim.Messages() {