diff --git a/internal/msg_transfer/logic/offline_history_msg_handler.go b/internal/msg_transfer/logic/offline_history_msg_handler.go index 4179b1800..9355385d0 100644 --- a/internal/msg_transfer/logic/offline_history_msg_handler.go +++ b/internal/msg_transfer/logic/offline_history_msg_handler.go @@ -105,13 +105,15 @@ func (mc *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS //} for msg := range claim.Messages() { if GetOnlineTopicStatus() == OnlineTopicVacancy { - log.NewDebug("", "offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) + log.NewDebug("", "vacancy offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) sess.MarkMessage(msg, "") } else { select { case <-mc.cmdCh: + log.NewDebug("", "cmd offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) case <-time.After(time.Millisecond * time.Duration(100)): + log.NewDebug("", "timeout offline kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) } mc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) sess.MarkMessage(msg, "")