From 3bc2e97e8b1d3d42f10f748d646566a3f7044a97 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 11 May 2022 20:02:33 +0800 Subject: [PATCH] test --- .../logic/offline_history_msg_handler.go | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/internal/msg_transfer/logic/offline_history_msg_handler.go b/internal/msg_transfer/logic/offline_history_msg_handler.go index 7a63965ff..a0416d7ed 100644 --- a/internal/msg_transfer/logic/offline_history_msg_handler.go +++ b/internal/msg_transfer/logic/offline_history_msg_handler.go @@ -103,18 +103,23 @@ func (mc *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS // log.NewDebug("", "kafka get info to delay mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "offline") // //mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) //} + cmd := Cmd2Value{} +repeat: select { - case cmd := <-mc.cmdCh: - if cmd.Cmd == OnlineTopicVacancy { - for msg := range claim.Messages() { - if GetOnlineTopicStatus() == OnlineTopicVacancy { - log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) - mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) - sess.MarkMessage(msg, "") - } + case cmd = <-mc.cmdCh: + case <-time.After(time.Millisecond * time.Duration(1)): + goto repeat + } + if cmd.Cmd == OnlineTopicVacancy { + for msg := range claim.Messages() { + if GetOnlineTopicStatus() == OnlineTopicVacancy { + log.NewDebug("", "kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) + mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) + sess.MarkMessage(msg, "") + } else { + goto repeat } } - } return nil }