From 37d9b18fb69da3dbe6b6b2daa1ed0c81581b85fc Mon Sep 17 00:00:00 2001 From: pluto <2631223275@qq.com> Date: Wed, 23 Aug 2023 16:48:48 +0800 Subject: [PATCH] tools for conversion --- tools/data-conversion/kafka-conversation.go | 27 ++++++++++++--------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/tools/data-conversion/kafka-conversation.go b/tools/data-conversion/kafka-conversation.go index 02f761b0b..4d9b3547a 100644 --- a/tools/data-conversion/kafka-conversation.go +++ b/tools/data-conversion/kafka-conversation.go @@ -17,7 +17,6 @@ package data_conversion import ( "context" "fmt" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" "github.com/OpenIMSDK/protocol/sdkws" openKeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper" @@ -37,6 +36,13 @@ var ( //addr = "43.128.72.19:9092" ) +const ( + ZkAddr = "127.0.0.1:2181" + ZKSchema = "openim" + ZKUsername = "" + ZKPassword = "" +) + var consumer sarama.Consumer var producer sarama.SyncProducer var wg sync.WaitGroup @@ -96,7 +102,13 @@ func GetMessage() { go func(sarama.PartitionConsumer) { defer wg.Done() for msg := range pc.Messages() { - Transfer([]*sarama.ConsumerMessage{msg}) + //Transfer([]*sarama.ConsumerMessage{msg}) + msgFromMQ := &sdkws.MsgData{} + err := proto.Unmarshal(msg.Value, msgFromMQ) + if err != nil { + fmt.Printf("err:%s", err) + } + fmt.Printf("msg:%s", msgFromMQ) //fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) } }(pc) @@ -138,17 +150,10 @@ func Transfer(consumerMessages []*sarama.ConsumerMessage) { } } -const ( - ZkAddr = "127.0.0.1:2181" - ZKSchema = "openim" - ZKUsername = "" - ZKPassword = "" -) - func GetMsgRpcService() (rpcclient.MessageRpcClient, error) { client, err := openKeeper.NewClient([]string{ZkAddr}, ZKSchema, - openKeeper.WithFreq(time.Hour), openKeeper.WithRoundRobin(), openKeeper.WithUserNameAndPassword(ZKPassword, - config.Config.Zookeeper.Password), openKeeper.WithTimeout(10), openKeeper.WithLogger(log.NewZkLogger())) + openKeeper.WithFreq(time.Hour), openKeeper.WithRoundRobin(), openKeeper.WithUserNameAndPassword(ZKUsername, + ZKPassword), openKeeper.WithTimeout(10), openKeeper.WithLogger(log.NewZkLogger())) msgClient := rpcclient.NewMessageRpcClient(client) if err != nil { return msgClient, errs.Wrap(err)