diff --git a/internal/msg_transfer/logic/offline_history_msg_handler.go b/internal/msg_transfer/logic/offline_history_msg_handler.go index c4e78f96d..4d1618853 100644 --- a/internal/msg_transfer/logic/offline_history_msg_handler.go +++ b/internal/msg_transfer/logic/offline_history_msg_handler.go @@ -98,19 +98,24 @@ func (OfflineHistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error func (OfflineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (mc *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group - log.NewDebug("", "new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) - select { - case cmd := <-mc.cmdCh: - if cmd.Cmd == OnlineTopicVacancy { - for msg := range claim.Messages() { - if GetOnlineTopicStatus() == OnlineTopicVacancy { - log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) - mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) - sess.MarkMessage(msg, "") - } - } - } + log.NewDebug("", "new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) + for msg := range claim.Messages() { + log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "offline") + mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) } + //select { + //case cmd := <-mc.cmdCh: + // if cmd.Cmd == OnlineTopicVacancy { + // for msg := range claim.Messages() { + // if GetOnlineTopicStatus() == OnlineTopicVacancy { + // log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) + // mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) + // sess.MarkMessage(msg, "") + // } + // } + // } + // + //} return nil }