From b4d7b9c50c3cae574c3234a2a8bcebf1e3a62456 Mon Sep 17 00:00:00 2001 From: pluto <2631223275@qq.com> Date: Tue, 22 Aug 2023 15:53:14 +0800 Subject: [PATCH] kafka msg data conversion --- test/data-conversion/kafka-conversation.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/test/data-conversion/kafka-conversation.go b/test/data-conversion/kafka-conversation.go index c8728eb53..4bf8a1110 100644 --- a/test/data-conversion/kafka-conversation.go +++ b/test/data-conversion/kafka-conversation.go @@ -73,6 +73,7 @@ func GetMessage() { fmt.Printf("fail to get list of partition:err%v\n", err) } fmt.Println(partitionList) + var ch chan int for partition := range partitionList { pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetOldest) if err != nil { @@ -81,15 +82,15 @@ func GetMessage() { defer pc.AsyncClose() - wg.Add(1) - go func(sarama.PartitionConsumer) { - defer wg.Done() + //defer wg.Done() for msg := range pc.Messages() { fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) } }(pc) - wg.Wait() + //wg.Add(1) + //wg.Wait() consumer.Close() } + _ = <-ch }