diff --git a/test/data-conversion/kafka-conversation.go b/test/data-conversion/kafka-conversation.go index c8728eb53..4bf8a1110 100644 --- a/test/data-conversion/kafka-conversation.go +++ b/test/data-conversion/kafka-conversation.go @@ -73,6 +73,7 @@ func GetMessage() { fmt.Printf("fail to get list of partition:err%v\n", err) } fmt.Println(partitionList) + var ch chan int for partition := range partitionList { pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetOldest) if err != nil { @@ -81,15 +82,15 @@ func GetMessage() { defer pc.AsyncClose() - wg.Add(1) - go func(sarama.PartitionConsumer) { - defer wg.Done() + //defer wg.Done() for msg := range pc.Messages() { fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) } }(pc) - wg.Wait() + //wg.Add(1) + //wg.Wait() consumer.Close() } + _ = <-ch }