diff --git a/tools/data-conversion/kafka-conversation.go b/tools/data-conversion/kafka-conversation.go index 83ec57a89..97c4249ec 100644 --- a/tools/data-conversion/kafka-conversation.go +++ b/tools/data-conversion/kafka-conversation.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" + pbMsg "github.com/OpenIMSDK/protocol/msg" "github.com/OpenIMSDK/protocol/sdkws" openKeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper" "github.com/OpenIMSDK/tools/errs" @@ -103,12 +104,22 @@ func GetMessage() { defer wg.Done() for msg := range pc.Messages() { //Transfer([]*sarama.ConsumerMessage{msg}) - msgFromMQ := &sdkws.MsgData{} - err := proto.Unmarshal(msg.Value, msgFromMQ) + + //V2 + msgFromMQV2 := pbMsg.MsgDataToMQ{} + err := proto.Unmarshal(msg.Value, &msgFromMQV2) if err != nil { fmt.Printf("err:%s \n", err) } - fmt.Printf("msg:%s \n", msgFromMQ) + fmt.Printf("msg:%s \n", &msgFromMQV2) + + //V3 + //msgFromMQ := &sdkws.MsgData{} + //err = proto.Unmarshal(msg.Value, msgFromMQ) + //if err != nil { + // fmt.Printf("err:%s \n", err) + //} + //fmt.Printf("msg:%s \n", &msgFromMQ) //fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) }