@ -59,6 +59,7 @@ func (ms *PushConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
for msg := range claim.Messages() {
log.InfoByKv("kafka get info to mysql", "", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
ms.msgHandle[msg.Topic](msg.Value)
sess.MarkMessage(msg, "")
}
return nil