concurrent consumption of messages

pull/232/head
Gordon 3 years ago
parent 143e2908ad
commit a2888b009d

@ -32,7 +32,8 @@ func (pc *PersistentConsumerHandler) Init() {
} }
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(msg []byte, msgKey string) { func (pc *PersistentConsumerHandler) handleChatWs2Mysql(cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
msg := cMsg.Value
log.NewInfo("msg come here mysql!!!", "", "msg", string(msg)) log.NewInfo("msg come here mysql!!!", "", "msg", string(msg))
var tag bool var tag bool
msgFromMQ := pbMsg.MsgDataToMQ{} msgFromMQ := pbMsg.MsgDataToMQ{}
@ -71,7 +72,7 @@ func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSessi
claim sarama.ConsumerGroupClaim) error { claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() { for msg := range claim.Messages() {
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
pc.msgHandle[msg.Topic](msg.Value, string(msg.Key)) pc.msgHandle[msg.Topic](msg, string(msg.Key), sess)
sess.MarkMessage(msg, "") sess.MarkMessage(msg, "")
} }
return nil return nil

Loading…
Cancel
Save