feat:data conversion

pull/1000/head
pluto 2 years ago
parent b1496dfc01
commit 26ffe569d4

@ -1,5 +1,61 @@
package main package main
import (
"fmt"
. "github.com/OpenIMSDK/Open-IM-Server/tools/conversion/common"
message "github.com/OpenIMSDK/Open-IM-Server/tools/conversion/msg"
"github.com/OpenIMSDK/Open-IM-Server/tools/conversion/mysql"
"sync"
)
var wg sync.WaitGroup
func main() { func main() {
fmt.Printf("start MySQL data conversion. \n")
wg.Add(1)
go func() {
defer wg.Done()
mysql.UserConversion()
}()
wg.Add(1)
go func() {
defer wg.Done()
mysql.FriendConversion()
}()
wg.Add(1)
go func() {
defer wg.Done()
mysql.GroupConversion()
}()
wg.Add(1)
go func() {
defer wg.Done()
mysql.GroupMemberConversion()
}()
wg.Add(1)
go func() {
defer wg.Done()
mysql.BlacksConversion()
}()
wg.Add(1)
go func() {
defer wg.Done()
mysql.RequestConversion()
}()
wg.Add(1)
go func() {
defer wg.Done()
mysql.BlacksConversion()
}()
wg.Add(1)
go func() {
defer wg.Done()
mysql.ChatLogsConversion()
}()
wg.Wait()
SuccessPrint(fmt.Sprintf("Successfully completed the MySQL conversion. \n"))
fmt.Printf("start message conversion. \n")
message.GetMessage()
SuccessPrint(fmt.Sprintf("Successfully completed the message conversion. \n"))
} }

@ -51,14 +51,14 @@ func init() {
client, err := sarama.NewSyncProducer([]string{KafkaAddr}, config) client, err := sarama.NewSyncProducer([]string{KafkaAddr}, config)
if err != nil { if err != nil {
fmt.Println("producer closed, err:", err) ErrorPrint(fmt.Sprintln("producer closed, err:", err))
} }
producerV2 = client producerV2 = client
//Consumer //Consumer
consumerT, err := sarama.NewConsumer([]string{KafkaAddr}, sarama.NewConfig()) consumerT, err := sarama.NewConsumer([]string{KafkaAddr}, sarama.NewConfig())
if err != nil { if err != nil {
fmt.Printf("fail to start consumer, err:%v\n", err) ErrorPrint(fmt.Sprintf("fail to start consumer, err:%v\n", err))
} }
consumer = consumerT consumer = consumerT
@ -74,27 +74,23 @@ func SendMessage() {
// Send a message // Send a message
pid, offset, err := producerV2.SendMessage(msg) pid, offset, err := producerV2.SendMessage(msg)
if err != nil { if err != nil {
fmt.Println("send msg failed, err:", err) ErrorPrint(fmt.Sprintln("send msg failed, err:", err))
} }
fmt.Printf("pid:%v offset:%v\n", pid, offset) SuccessPrint(fmt.Sprintf("pid:%v offset:%v\n", pid, offset))
} }
func GetMessage() { func GetMessage() {
partitionList, err := consumer.Partitions(Topic) // Get all partitions according to topic partitionList, err := consumer.Partitions(Topic) // Get all partitions according to topic
if err != nil { if err != nil {
fmt.Printf("fail to get list of partition:err%v\n", err) ErrorPrint(fmt.Sprintf("fail to get list of partition:err%v\n", err))
} }
fmt.Println(partitionList) fmt.Println(partitionList)
//var ch chan int //var ch chan int
if err != nil {
fmt.Printf("rpc err:%s", err)
}
for partition := range partitionList { for partition := range partitionList {
pc, err := consumer.ConsumePartition(Topic, int32(partition), sarama.OffsetOldest) pc, err := consumer.ConsumePartition(Topic, int32(partition), sarama.OffsetOldest)
if err != nil { if err != nil {
panic(err) ErrorPrint(fmt.Sprintf("fail to get partition:err%v\n", err))
} }
wg.Add(1) wg.Add(1)
defer pc.AsyncClose() defer pc.AsyncClose()
@ -140,9 +136,9 @@ func Transfer(consumerMessages []*sarama.ConsumerMessage) {
msgFromMQV2 := pbmsg.MsgDataToMQ{} msgFromMQV2 := pbmsg.MsgDataToMQ{}
err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQV2) err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQV2)
if err != nil { if err != nil {
fmt.Printf("err:%s \n", err) ErrorPrint(fmt.Sprintf("err:%s \n", err))
} }
fmt.Printf("msg:%s \n", &msgFromMQV2) SuccessPrint(fmt.Sprintf("msg:%s \n", &msgFromMQV2))
//fmt.Printf("rpcClient:%s \n", msgRpcClient) //fmt.Printf("rpcClient:%s \n", msgRpcClient)
if msgFromMQV2.MsgData.ContentType == constant.Text { if msgFromMQV2.MsgData.ContentType == constant.Text {
text := string(msgFromMQV2.MsgData.Content) text := string(msgFromMQV2.MsgData.Content)
@ -151,7 +147,7 @@ func Transfer(consumerMessages []*sarama.ConsumerMessage) {
} }
msgFromMQV2.MsgData.Content, err = json.Marshal(textElem) msgFromMQV2.MsgData.Content, err = json.Marshal(textElem)
if err != nil { if err != nil {
fmt.Printf("test err: %s \n", err) ErrorPrint(fmt.Sprintf("test err: %s \n", err))
} }
} }
if msgFromMQV2.MsgData.SessionType == constant.SingleChatType || msgFromMQV2.MsgData.SessionType == constant.NotificationChatType { if msgFromMQV2.MsgData.SessionType == constant.SingleChatType || msgFromMQV2.MsgData.SessionType == constant.NotificationChatType {
@ -193,9 +189,9 @@ func Transfer(consumerMessages []*sarama.ConsumerMessage) {
ctx := context.WithValue(context.Background(), "operationID", msgFromMQV2.OperationID) ctx := context.WithValue(context.Background(), "operationID", msgFromMQV2.OperationID)
resp, err := msgRpcClient.SendMsg(ctx, &msgv3.SendMsgReq{MsgData: msgData}) resp, err := msgRpcClient.SendMsg(ctx, &msgv3.SendMsgReq{MsgData: msgData})
if err != nil { if err != nil {
fmt.Printf("resp err: %s \n", err) ErrorPrint(fmt.Sprintf("resp err: %s \n", err))
} }
fmt.Printf("resp: %s \n", resp) SuccessPrint(fmt.Sprintf("resp: %s \n", resp))
} else if msgFromMQV2.MsgData.SessionType == constant.GroupChatType { } else if msgFromMQV2.MsgData.SessionType == constant.GroupChatType {
if string(consumerMessages[i].Key) != msgFromMQV2.MsgData.SendID { if string(consumerMessages[i].Key) != msgFromMQV2.MsgData.SendID {
continue continue
@ -238,9 +234,9 @@ func Transfer(consumerMessages []*sarama.ConsumerMessage) {
ctx := context.WithValue(context.Background(), "operationID", msgFromMQV2.OperationID) ctx := context.WithValue(context.Background(), "operationID", msgFromMQV2.OperationID)
resp, err := msgRpcClient.SendMsg(ctx, &msgv3.SendMsgReq{MsgData: msgData}) resp, err := msgRpcClient.SendMsg(ctx, &msgv3.SendMsgReq{MsgData: msgData})
if err != nil { if err != nil {
fmt.Printf("resp err: %s \n", err) ErrorPrint(fmt.Sprintf("resp err: %s \n", err))
} }
fmt.Printf("resp: %s \n", resp) SuccessPrint(fmt.Sprintf("resp: %s \n", resp))
} else if msgFromMQV2.MsgData.SessionType == constant.SuperGroupChatType { } else if msgFromMQV2.MsgData.SessionType == constant.SuperGroupChatType {
if msgFromMQV2.MsgData.ContentType < constant.ContentTypeBegin || msgFromMQV2.MsgData.ContentType > constant.AdvancedText { if msgFromMQV2.MsgData.ContentType < constant.ContentTypeBegin || msgFromMQV2.MsgData.ContentType > constant.AdvancedText {
continue continue
@ -280,9 +276,9 @@ func Transfer(consumerMessages []*sarama.ConsumerMessage) {
ctx := context.WithValue(context.Background(), "operationID", msgFromMQV2.OperationID) ctx := context.WithValue(context.Background(), "operationID", msgFromMQV2.OperationID)
resp, err := msgRpcClient.SendMsg(ctx, &msgv3.SendMsgReq{MsgData: msgData}) resp, err := msgRpcClient.SendMsg(ctx, &msgv3.SendMsgReq{MsgData: msgData})
if err != nil { if err != nil {
fmt.Printf("resp err: %s \n", err) ErrorPrint(fmt.Sprintf("resp err: %s \n", err))
} }
fmt.Printf("resp: %s \n", resp) SuccessPrint(fmt.Sprintf("resp: %s \n", resp))
} }
fmt.Printf("\n\n\n") fmt.Printf("\n\n\n")
} }
@ -313,13 +309,12 @@ func NewMessage() msgv3.MsgClient {
openKeeper.WithLogger(log.NewZkLogger()), openKeeper.WithLogger(log.NewZkLogger()),
) )
if err != nil { if err != nil {
fmt.Printf("discov, err:%s", err) ErrorPrint(fmt.Sprintf("discov, err:%s", err))
} }
discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := discov.GetConn(context.Background(), MsgRpcName) conn, err := discov.GetConn(context.Background(), MsgRpcName)
if err != nil { if err != nil {
fmt.Printf("conn, err:%s", err) ErrorPrint(fmt.Sprintf("conn, err:%s", err))
panic(err)
} }
client := msgv3.NewMsgClient(conn) client := msgv3.NewMsgClient(conn)
return client return client

Loading…
Cancel
Save