kafka msg data conversion

pull/953/head
pluto 2 years ago
parent 3c97d57d9d
commit a5f0ed768e

@ -22,6 +22,7 @@ import (
var ( var (
topic = "latestMsgToRedis" topic = "latestMsgToRedis"
addr = "127.0.0.1:9092" addr = "127.0.0.1:9092"
//addr = "43.128.72.19:9092"
) )
var consumer sarama.Consumer var consumer sarama.Consumer
@ -77,10 +78,8 @@ func GetMessage() {
fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err) fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
} }
// Asynchronously consume information from each partition // Asynchronously consume information from each partition
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() { for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value) fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
} }
}(pc)
} }
} }

Loading…
Cancel
Save