From f716b974cad466cd9fe258dd298f824448702028 Mon Sep 17 00:00:00 2001 From: pluto <2631223275@qq.com> Date: Wed, 23 Aug 2023 16:19:33 +0800 Subject: [PATCH] kafka msg data conversion --- tools/data-conversion/go.mod | 3 + .../data-conversion/kafka-conversation.go | 73 +++++++++++++-- {test => tools}/data-conversion/kafka_test.go | 0 tools/data-conversion/main.go | 5 + .../data-conversion/mongodb-conversion.go | 0 .../data-conversion/mysql-conversion.go | 92 +++++++++---------- {test => tools}/data-conversion/mysql_test.go | 0 .../data-conversion/redis-conversion.go | 0 8 files changed, 121 insertions(+), 52 deletions(-) create mode 100644 tools/data-conversion/go.mod rename {test => tools}/data-conversion/kafka-conversation.go (59%) rename {test => tools}/data-conversion/kafka_test.go (100%) create mode 100644 tools/data-conversion/main.go rename {test => tools}/data-conversion/mongodb-conversion.go (100%) rename {test => tools}/data-conversion/mysql-conversion.go (74%) rename {test => tools}/data-conversion/mysql_test.go (100%) rename {test => tools}/data-conversion/redis-conversion.go (100%) diff --git a/tools/data-conversion/go.mod b/tools/data-conversion/go.mod new file mode 100644 index 000000000..816309dd9 --- /dev/null +++ b/tools/data-conversion/go.mod @@ -0,0 +1,3 @@ +module github.com/OpenIMSDK/Open-IM-Server/tools/data-conversion + +go 1.20 diff --git a/test/data-conversion/kafka-conversation.go b/tools/data-conversion/kafka-conversation.go similarity index 59% rename from test/data-conversion/kafka-conversation.go rename to tools/data-conversion/kafka-conversation.go index 9de04c580..e37195afe 100644 --- a/test/data-conversion/kafka-conversation.go +++ b/tools/data-conversion/kafka-conversation.go @@ -15,9 +15,20 @@ package data_conversion import ( + "context" "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" + "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" + "github.com/OpenIMSDK/protocol/sdkws" + openKeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper" + "github.com/OpenIMSDK/tools/errs" + "github.com/OpenIMSDK/tools/log" "github.com/Shopify/sarama" + "google.golang.org/protobuf/proto" + "strconv" + "strings" "sync" + "time" ) var ( @@ -73,24 +84,74 @@ func GetMessage() { fmt.Printf("fail to get list of partition:err%v\n", err) } fmt.Println(partitionList) - var ch chan int + //var ch chan int for partition := range partitionList { pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetOldest) if err != nil { panic(err) } - + wg.Add(1) defer pc.AsyncClose() go func(sarama.PartitionConsumer) { - //defer wg.Done() + defer wg.Done() for msg := range pc.Messages() { + Transfer([]*sarama.ConsumerMessage{msg}) fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) } }(pc) - //wg.Add(1) - //wg.Wait() + } + wg.Wait() consumer.Close() - _ = <-ch + //_ = <-ch +} + +func Transfer(consumerMessages []*sarama.ConsumerMessage) { + for i := 0; i < len(consumerMessages); i++ { + msgFromMQ := &sdkws.MsgData{} + err := proto.Unmarshal(consumerMessages[i].Value, msgFromMQ) + if err != nil { + log.ZError(context.Background(), "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value)) + continue + } + var arr []string + for i, header := range consumerMessages[i].Headers { + arr = append(arr, strconv.Itoa(i), string(header.Key), string(header.Value)) + } + log.ZInfo( + context.Background(), + "consumer.kafka.GetContextWithMQHeader", + "len", + len(consumerMessages[i].Headers), + "header", + strings.Join(arr, ", "), + ) + log.ZDebug( + context.Background(), + "single msg come to distribution center", + "message", + msgFromMQ, + "key", + string(consumerMessages[i].Key), + ) + } +} + +const ( + ZkAddr = "127.0.0.1:2181" + ZKSchema = "openim" + ZKUsername = "" + ZKPassword = "" +) + +func GetMsgRpcService() (rpcclient.MessageRpcClient, error) { + client, err := openKeeper.NewClient([]string{ZkAddr}, ZKSchema, + openKeeper.WithFreq(time.Hour), openKeeper.WithRoundRobin(), openKeeper.WithUserNameAndPassword(ZKPassword, + config.Config.Zookeeper.Password), openKeeper.WithTimeout(10), openKeeper.WithLogger(log.NewZkLogger())) + msgClient := rpcclient.NewMessageRpcClient(client) + if err != nil { + return msgClient, errs.Wrap(err) + } + return msgClient, nil } diff --git a/test/data-conversion/kafka_test.go b/tools/data-conversion/kafka_test.go similarity index 100% rename from test/data-conversion/kafka_test.go rename to tools/data-conversion/kafka_test.go diff --git a/tools/data-conversion/main.go b/tools/data-conversion/main.go new file mode 100644 index 000000000..da6b4f66e --- /dev/null +++ b/tools/data-conversion/main.go @@ -0,0 +1,5 @@ +package data_conversion + +func main() { + +} diff --git a/test/data-conversion/mongodb-conversion.go b/tools/data-conversion/mongodb-conversion.go similarity index 100% rename from test/data-conversion/mongodb-conversion.go rename to tools/data-conversion/mongodb-conversion.go diff --git a/test/data-conversion/mysql-conversion.go b/tools/data-conversion/mysql-conversion.go similarity index 74% rename from test/data-conversion/mysql-conversion.go rename to tools/data-conversion/mysql-conversion.go index 5c24dd6e2..bdde368b3 100644 --- a/test/data-conversion/mysql-conversion.go +++ b/tools/data-conversion/mysql-conversion.go @@ -25,45 +25,45 @@ import ( ) var ( - MysqlDb_v2 *gorm.DB - MysqlDb_v3 *gorm.DB + MysqldbV2 *gorm.DB + MysqldbV3 *gorm.DB ) const ( - username_v2 = "root" - password_v2 = "123456" - ip_v2 = "127.0.0.1:3306" - database_v2 = "openim_v2" + usernameV2 = "root" + passwordV2 = "123456" + ipV2 = "127.0.0.1:3306" + databaseV2 = "openim_v2" ) const ( - username_v3 = "root" - password_v3 = "123456" - ip_v3 = "127.0.0.1:3306" - database_v3 = "openim_v3" + usernameV3 = "root" + passwordV3 = "123456" + ipV3 = "127.0.0.1:3306" + databaseV3 = "openim_v3" ) func init() { dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", - username_v2, - password_v2, - ip_v2, - database_v2, + usernameV2, + passwordV2, + ipV2, + databaseV2, ) db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) - MysqlDb_v2 = db + MysqldbV2 = db if err != nil { log.ZDebug(context.Background(), "err", err) } - dsn_v3 := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", - username_v3, - password_v3, - ip_v3, - database_v3, + dsnV3 := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", + usernameV3, + passwordV3, + ipV3, + databaseV3, ) - db_v3, err := gorm.Open(mysql.Open(dsn_v3), &gorm.Config{}) - MysqlDb_v3 = db_v3 + dbV3, err := gorm.Open(mysql.Open(dsnV3), &gorm.Config{}) + MysqldbV3 = dbV3 if err != nil { log.ZDebug(context.Background(), "err", err) } @@ -72,17 +72,17 @@ func init() { func UserConversion() { var count int64 var user relation.UserModel - MysqlDb_v2.Model(&user).Count(&count) + MysqldbV2.Model(&user).Count(&count) batchSize := 100 offset := 0 for int64(offset) < count { var results []relation.UserModel - MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results) + MysqldbV2.Limit(batchSize).Offset(offset).Find(&results) // Process query results fmt.Println("============================batch data===================", offset, batchSize) fmt.Println(results) - MysqlDb_v3.Create(results) + MysqldbV3.Create(results) fmt.Println("======================================================") offset += batchSize } @@ -91,17 +91,17 @@ func UserConversion() { func FriendConversion() { var count int64 var friend relation.FriendModel - MysqlDb_v2.Model(&friend).Count(&count) + MysqldbV2.Model(&friend).Count(&count) batchSize := 100 offset := 0 for int64(offset) < count { var results []relation.FriendModel - MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results) + MysqldbV2.Limit(batchSize).Offset(offset).Find(&results) // Process query results fmt.Println("============================batch data===================", offset, batchSize) fmt.Println(results) - MysqlDb_v3.Create(results) + MysqldbV3.Create(results) fmt.Println("======================================================") offset += batchSize } @@ -110,33 +110,33 @@ func FriendConversion() { func RequestConversion() { var count int64 var friendRequest relation.FriendRequestModel - MysqlDb_v2.Model(&friendRequest).Count(&count) + MysqldbV2.Model(&friendRequest).Count(&count) batchSize := 100 offset := 0 for int64(offset) < count { var results []relation.FriendRequestModel - MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results) + MysqldbV2.Limit(batchSize).Offset(offset).Find(&results) // Process query results fmt.Println("============================batch data===================", offset, batchSize) fmt.Println(results) - MysqlDb_v3.Create(results) + MysqldbV3.Create(results) fmt.Println("======================================================") offset += batchSize } var groupRequest relation.GroupRequestModel - MysqlDb_v2.Model(&groupRequest).Count(&count) + MysqldbV2.Model(&groupRequest).Count(&count) batchSize = 100 offset = 0 for int64(offset) < count { var results []relation.GroupRequestModel - MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results) + MysqldbV2.Limit(batchSize).Offset(offset).Find(&results) // Process query results fmt.Println("============================batch data===================", offset, batchSize) fmt.Println(results) - MysqlDb_v3.Create(results) + MysqldbV3.Create(results) fmt.Println("======================================================") offset += batchSize } @@ -145,13 +145,13 @@ func RequestConversion() { func GroupConversion() { var count int64 var group relation.GroupModel - MysqlDb_v2.Model(&group).Count(&count) + MysqldbV2.Model(&group).Count(&count) batchSize := 100 offset := 0 for int64(offset) < count { var results []relation.GroupModel - MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results) + MysqldbV2.Limit(batchSize).Offset(offset).Find(&results) for i, val := range results { temp := time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC) if val.NotificationUpdateTime.Equal(temp) { @@ -162,7 +162,7 @@ func GroupConversion() { // Process query results fmt.Println("============================batch data===================", offset, batchSize) fmt.Println(results) - MysqlDb_v3.Create(results) + MysqldbV3.Create(results) fmt.Println("======================================================") offset += batchSize } @@ -171,17 +171,17 @@ func GroupConversion() { func GroupMemberConversion() { var count int64 var groupMember relation.GroupMemberModel - MysqlDb_v2.Model(&groupMember).Count(&count) + MysqldbV2.Model(&groupMember).Count(&count) batchSize := 100 offset := 0 for int64(offset) < count { var results []relation.GroupMemberModel - MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results) + MysqldbV2.Limit(batchSize).Offset(offset).Find(&results) // Process query results fmt.Println("============================batch data===================", offset, batchSize) fmt.Println(results) - MysqlDb_v3.Create(results) + MysqldbV3.Create(results) fmt.Println("======================================================") offset += batchSize } @@ -190,17 +190,17 @@ func GroupMemberConversion() { func BlacksConversion() { var count int64 var black relation.BlackModel - MysqlDb_v2.Model(&black).Count(&count) + MysqldbV2.Model(&black).Count(&count) batchSize := 100 offset := 0 for int64(offset) < count { var results []relation.BlackModel - MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results) + MysqldbV2.Limit(batchSize).Offset(offset).Find(&results) // Process query results fmt.Println("============================batch data===================", offset, batchSize) fmt.Println(results) - MysqlDb_v3.Create(results) + MysqldbV3.Create(results) fmt.Println("======================================================") offset += batchSize } @@ -209,17 +209,17 @@ func BlacksConversion() { func ChatLogsConversion() { var count int64 var chat relation.ChatLogModel - MysqlDb_v2.Model(&chat).Count(&count) + MysqldbV2.Model(&chat).Count(&count) batchSize := 100 offset := 0 for int64(offset) < count { var results []relation.ChatLogModel - MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results) + MysqldbV2.Limit(batchSize).Offset(offset).Find(&results) // Process query results fmt.Println("============================batch data===================", offset, batchSize) //fmt.Println(results) - MysqlDb_v3.Create(results) + MysqldbV3.Create(results) fmt.Println("======================================================") offset += batchSize } diff --git a/test/data-conversion/mysql_test.go b/tools/data-conversion/mysql_test.go similarity index 100% rename from test/data-conversion/mysql_test.go rename to tools/data-conversion/mysql_test.go diff --git a/test/data-conversion/redis-conversion.go b/tools/data-conversion/redis-conversion.go similarity index 100% rename from test/data-conversion/redis-conversion.go rename to tools/data-conversion/redis-conversion.go