diff --git a/tools/data-conversion/kafka-conversation.go b/tools/data-conversion/kafka-conversation.go index f146a77f4..465349124 100644 --- a/tools/data-conversion/kafka-conversation.go +++ b/tools/data-conversion/kafka-conversation.go @@ -88,7 +88,7 @@ func GetMessage() { } fmt.Println(partitionList) //var ch chan int - msgRpcClient, err := GetMsgRpcService() + //msgRpcClient, err := GetMsgRpcService() if err != nil { fmt.Printf("rpc err:%s", err) } @@ -104,15 +104,15 @@ func GetMessage() { go func(sarama.PartitionConsumer) { defer wg.Done() for msg := range pc.Messages() { - Transfer([]*sarama.ConsumerMessage{msg}, msgRpcClient) + //Transfer([]*sarama.ConsumerMessage{msg}, msgRpcClient) //V2 - //msgFromMQV2 := pbMsg.MsgDataToMQ{} - //err := proto.Unmarshal(msg.Value, &msgFromMQV2) - //if err != nil { - // fmt.Printf("err:%s \n", err) - //} - //fmt.Printf("msg:%s \n", &msgFromMQV2) + msgFromMQV2 := pbMsg.MsgDataToMQ{} + err := proto.Unmarshal(msg.Value, &msgFromMQV2) + if err != nil { + fmt.Printf("err:%s \n", err) + } + fmt.Printf("msg:%s \n", &msgFromMQV2) //V3 //msgFromMQ := &sdkws.MsgData{}