From d14a2c15bce66ea484f3153dde49b715b50ee68d Mon Sep 17 00:00:00 2001 From: pluto <2631223275@qq.com> Date: Fri, 18 Aug 2023 17:59:41 +0800 Subject: [PATCH] kafka msg data conversion --- test/data-conversion/kafka-conversation.go | 24 +++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/test/data-conversion/kafka-conversation.go b/test/data-conversion/kafka-conversation.go index 349c6c66f..9429615b1 100644 --- a/test/data-conversion/kafka-conversation.go +++ b/test/data-conversion/kafka-conversation.go @@ -17,6 +17,7 @@ package data_conversion import ( "fmt" "github.com/Shopify/sarama" + "sync" ) var ( @@ -27,6 +28,7 @@ var ( var consumer sarama.Consumer var producer sarama.SyncProducer +var wg sync.WaitGroup func init() { @@ -71,15 +73,23 @@ func GetMessage() { fmt.Printf("fail to get list of partition:err%v\n", err) } fmt.Println(partitionList) - for partition := range partitionList { // iterate over all partitions - // Create a corresponding partition consumer for each partition + for partition := range partitionList { pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) if err != nil { - fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err) - } - // Asynchronously consume information from each partition - for msg := range pc.Messages() { - fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value) + panic(err) } + + defer pc.AsyncClose() + + wg.Add(1) + + go func(sarama.PartitionConsumer) { + defer wg.Done() + for msg := range pc.Messages() { + fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) + } + }(pc) + wg.Wait() + consumer.Close() } }