diff --git a/test/data-conversion/kafka-conversation.go b/test/data-conversion/kafka-conversation.go index 349c6c66f..9429615b1 100644 --- a/test/data-conversion/kafka-conversation.go +++ b/test/data-conversion/kafka-conversation.go @@ -17,6 +17,7 @@ package data_conversion import ( "fmt" "github.com/Shopify/sarama" + "sync" ) var ( @@ -27,6 +28,7 @@ var ( var consumer sarama.Consumer var producer sarama.SyncProducer +var wg sync.WaitGroup func init() { @@ -71,15 +73,23 @@ func GetMessage() { fmt.Printf("fail to get list of partition:err%v\n", err) } fmt.Println(partitionList) - for partition := range partitionList { // iterate over all partitions - // Create a corresponding partition consumer for each partition + for partition := range partitionList { pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) if err != nil { - fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err) - } - // Asynchronously consume information from each partition - for msg := range pc.Messages() { - fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value) + panic(err) } + + defer pc.AsyncClose() + + wg.Add(1) + + go func(sarama.PartitionConsumer) { + defer wg.Done() + for msg := range pc.Messages() { + fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) + } + }(pc) + wg.Wait() + consumer.Close() } }