merge code : Filter push data by time

pull/351/head
skiffer-git 2 years ago
parent 0f0e99b42e
commit 6ff8fc7f87

@ -65,6 +65,7 @@ func (ms *PushConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
for msg := range claim.Messages() { for msg := range claim.Messages() {
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
ms.msgHandle[msg.Topic](msg.Value) ms.msgHandle[msg.Topic](msg.Value)
sess.MarkMessage(msg, "")
} }
return nil return nil
} }

Loading…
Cancel
Save