kafka msg data conversion

pull/1000/head
pluto 2 years ago
parent 07307d0bef
commit f716b974ca

@ -0,0 +1,3 @@
module github.com/OpenIMSDK/Open-IM-Server/tools/data-conversion
go 1.20

@ -15,9 +15,20 @@
package data_conversion
import (
"context"
"fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
"github.com/OpenIMSDK/protocol/sdkws"
openKeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
"github.com/Shopify/sarama"
"google.golang.org/protobuf/proto"
"strconv"
"strings"
"sync"
"time"
)
var (
@ -73,24 +84,74 @@ func GetMessage() {
fmt.Printf("fail to get list of partition:err%v\n", err)
}
fmt.Println(partitionList)
var ch chan int
//var ch chan int
for partition := range partitionList {
pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetOldest)
if err != nil {
panic(err)
}
wg.Add(1)
defer pc.AsyncClose()
go func(sarama.PartitionConsumer) {
//defer wg.Done()
defer wg.Done()
for msg := range pc.Messages() {
Transfer([]*sarama.ConsumerMessage{msg})
fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}
}(pc)
//wg.Add(1)
//wg.Wait()
}
wg.Wait()
consumer.Close()
_ = <-ch
//_ = <-ch
}
func Transfer(consumerMessages []*sarama.ConsumerMessage) {
for i := 0; i < len(consumerMessages); i++ {
msgFromMQ := &sdkws.MsgData{}
err := proto.Unmarshal(consumerMessages[i].Value, msgFromMQ)
if err != nil {
log.ZError(context.Background(), "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value))
continue
}
var arr []string
for i, header := range consumerMessages[i].Headers {
arr = append(arr, strconv.Itoa(i), string(header.Key), string(header.Value))
}
log.ZInfo(
context.Background(),
"consumer.kafka.GetContextWithMQHeader",
"len",
len(consumerMessages[i].Headers),
"header",
strings.Join(arr, ", "),
)
log.ZDebug(
context.Background(),
"single msg come to distribution center",
"message",
msgFromMQ,
"key",
string(consumerMessages[i].Key),
)
}
}
const (
ZkAddr = "127.0.0.1:2181"
ZKSchema = "openim"
ZKUsername = ""
ZKPassword = ""
)
func GetMsgRpcService() (rpcclient.MessageRpcClient, error) {
client, err := openKeeper.NewClient([]string{ZkAddr}, ZKSchema,
openKeeper.WithFreq(time.Hour), openKeeper.WithRoundRobin(), openKeeper.WithUserNameAndPassword(ZKPassword,
config.Config.Zookeeper.Password), openKeeper.WithTimeout(10), openKeeper.WithLogger(log.NewZkLogger()))
msgClient := rpcclient.NewMessageRpcClient(client)
if err != nil {
return msgClient, errs.Wrap(err)
}
return msgClient, nil
}

@ -0,0 +1,5 @@
package data_conversion
func main() {
}

@ -25,45 +25,45 @@ import (
)
var (
MysqlDb_v2 *gorm.DB
MysqlDb_v3 *gorm.DB
MysqldbV2 *gorm.DB
MysqldbV3 *gorm.DB
)
const (
username_v2 = "root"
password_v2 = "123456"
ip_v2 = "127.0.0.1:3306"
database_v2 = "openim_v2"
usernameV2 = "root"
passwordV2 = "123456"
ipV2 = "127.0.0.1:3306"
databaseV2 = "openim_v2"
)
const (
username_v3 = "root"
password_v3 = "123456"
ip_v3 = "127.0.0.1:3306"
database_v3 = "openim_v3"
usernameV3 = "root"
passwordV3 = "123456"
ipV3 = "127.0.0.1:3306"
databaseV3 = "openim_v3"
)
func init() {
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
username_v2,
password_v2,
ip_v2,
database_v2,
usernameV2,
passwordV2,
ipV2,
databaseV2,
)
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
MysqlDb_v2 = db
MysqldbV2 = db
if err != nil {
log.ZDebug(context.Background(), "err", err)
}
dsn_v3 := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
username_v3,
password_v3,
ip_v3,
database_v3,
dsnV3 := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
usernameV3,
passwordV3,
ipV3,
databaseV3,
)
db_v3, err := gorm.Open(mysql.Open(dsn_v3), &gorm.Config{})
MysqlDb_v3 = db_v3
dbV3, err := gorm.Open(mysql.Open(dsnV3), &gorm.Config{})
MysqldbV3 = dbV3
if err != nil {
log.ZDebug(context.Background(), "err", err)
}
@ -72,17 +72,17 @@ func init() {
func UserConversion() {
var count int64
var user relation.UserModel
MysqlDb_v2.Model(&user).Count(&count)
MysqldbV2.Model(&user).Count(&count)
batchSize := 100
offset := 0
for int64(offset) < count {
var results []relation.UserModel
MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results)
MysqldbV2.Limit(batchSize).Offset(offset).Find(&results)
// Process query results
fmt.Println("============================batch data===================", offset, batchSize)
fmt.Println(results)
MysqlDb_v3.Create(results)
MysqldbV3.Create(results)
fmt.Println("======================================================")
offset += batchSize
}
@ -91,17 +91,17 @@ func UserConversion() {
func FriendConversion() {
var count int64
var friend relation.FriendModel
MysqlDb_v2.Model(&friend).Count(&count)
MysqldbV2.Model(&friend).Count(&count)
batchSize := 100
offset := 0
for int64(offset) < count {
var results []relation.FriendModel
MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results)
MysqldbV2.Limit(batchSize).Offset(offset).Find(&results)
// Process query results
fmt.Println("============================batch data===================", offset, batchSize)
fmt.Println(results)
MysqlDb_v3.Create(results)
MysqldbV3.Create(results)
fmt.Println("======================================================")
offset += batchSize
}
@ -110,33 +110,33 @@ func FriendConversion() {
func RequestConversion() {
var count int64
var friendRequest relation.FriendRequestModel
MysqlDb_v2.Model(&friendRequest).Count(&count)
MysqldbV2.Model(&friendRequest).Count(&count)
batchSize := 100
offset := 0
for int64(offset) < count {
var results []relation.FriendRequestModel
MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results)
MysqldbV2.Limit(batchSize).Offset(offset).Find(&results)
// Process query results
fmt.Println("============================batch data===================", offset, batchSize)
fmt.Println(results)
MysqlDb_v3.Create(results)
MysqldbV3.Create(results)
fmt.Println("======================================================")
offset += batchSize
}
var groupRequest relation.GroupRequestModel
MysqlDb_v2.Model(&groupRequest).Count(&count)
MysqldbV2.Model(&groupRequest).Count(&count)
batchSize = 100
offset = 0
for int64(offset) < count {
var results []relation.GroupRequestModel
MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results)
MysqldbV2.Limit(batchSize).Offset(offset).Find(&results)
// Process query results
fmt.Println("============================batch data===================", offset, batchSize)
fmt.Println(results)
MysqlDb_v3.Create(results)
MysqldbV3.Create(results)
fmt.Println("======================================================")
offset += batchSize
}
@ -145,13 +145,13 @@ func RequestConversion() {
func GroupConversion() {
var count int64
var group relation.GroupModel
MysqlDb_v2.Model(&group).Count(&count)
MysqldbV2.Model(&group).Count(&count)
batchSize := 100
offset := 0
for int64(offset) < count {
var results []relation.GroupModel
MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results)
MysqldbV2.Limit(batchSize).Offset(offset).Find(&results)
for i, val := range results {
temp := time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC)
if val.NotificationUpdateTime.Equal(temp) {
@ -162,7 +162,7 @@ func GroupConversion() {
// Process query results
fmt.Println("============================batch data===================", offset, batchSize)
fmt.Println(results)
MysqlDb_v3.Create(results)
MysqldbV3.Create(results)
fmt.Println("======================================================")
offset += batchSize
}
@ -171,17 +171,17 @@ func GroupConversion() {
func GroupMemberConversion() {
var count int64
var groupMember relation.GroupMemberModel
MysqlDb_v2.Model(&groupMember).Count(&count)
MysqldbV2.Model(&groupMember).Count(&count)
batchSize := 100
offset := 0
for int64(offset) < count {
var results []relation.GroupMemberModel
MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results)
MysqldbV2.Limit(batchSize).Offset(offset).Find(&results)
// Process query results
fmt.Println("============================batch data===================", offset, batchSize)
fmt.Println(results)
MysqlDb_v3.Create(results)
MysqldbV3.Create(results)
fmt.Println("======================================================")
offset += batchSize
}
@ -190,17 +190,17 @@ func GroupMemberConversion() {
func BlacksConversion() {
var count int64
var black relation.BlackModel
MysqlDb_v2.Model(&black).Count(&count)
MysqldbV2.Model(&black).Count(&count)
batchSize := 100
offset := 0
for int64(offset) < count {
var results []relation.BlackModel
MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results)
MysqldbV2.Limit(batchSize).Offset(offset).Find(&results)
// Process query results
fmt.Println("============================batch data===================", offset, batchSize)
fmt.Println(results)
MysqlDb_v3.Create(results)
MysqldbV3.Create(results)
fmt.Println("======================================================")
offset += batchSize
}
@ -209,17 +209,17 @@ func BlacksConversion() {
func ChatLogsConversion() {
var count int64
var chat relation.ChatLogModel
MysqlDb_v2.Model(&chat).Count(&count)
MysqldbV2.Model(&chat).Count(&count)
batchSize := 100
offset := 0
for int64(offset) < count {
var results []relation.ChatLogModel
MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results)
MysqldbV2.Limit(batchSize).Offset(offset).Find(&results)
// Process query results
fmt.Println("============================batch data===================", offset, batchSize)
//fmt.Println(results)
MysqlDb_v3.Create(results)
MysqldbV3.Create(results)
fmt.Println("======================================================")
offset += batchSize
}
Loading…
Cancel
Save