diff --git a/tools/data-conversion/conversion.go b/tools/data-conversion/conversion.go index 790580777..fa50f645d 100644 --- a/tools/data-conversion/conversion.go +++ b/tools/data-conversion/conversion.go @@ -1,5 +1,61 @@ package main +import ( + "fmt" + . "github.com/OpenIMSDK/Open-IM-Server/tools/conversion/common" + message "github.com/OpenIMSDK/Open-IM-Server/tools/conversion/msg" + "github.com/OpenIMSDK/Open-IM-Server/tools/conversion/mysql" + "sync" +) + +var wg sync.WaitGroup + func main() { + fmt.Printf("start MySQL data conversion. \n") + wg.Add(1) + go func() { + defer wg.Done() + mysql.UserConversion() + }() + wg.Add(1) + go func() { + defer wg.Done() + mysql.FriendConversion() + }() + wg.Add(1) + go func() { + defer wg.Done() + mysql.GroupConversion() + }() + wg.Add(1) + go func() { + defer wg.Done() + mysql.GroupMemberConversion() + }() + wg.Add(1) + go func() { + defer wg.Done() + mysql.BlacksConversion() + }() + wg.Add(1) + go func() { + defer wg.Done() + mysql.RequestConversion() + }() + wg.Add(1) + go func() { + defer wg.Done() + mysql.BlacksConversion() + }() + wg.Add(1) + go func() { + defer wg.Done() + mysql.ChatLogsConversion() + }() + wg.Wait() + SuccessPrint(fmt.Sprintf("Successfully completed the MySQL conversion. \n")) + fmt.Printf("start message conversion. \n") + message.GetMessage() + SuccessPrint(fmt.Sprintf("Successfully completed the message conversion. \n")) } diff --git a/tools/data-conversion/msg/kafka-conversation.go b/tools/data-conversion/msg/kafka-conversation.go index 055b038e0..69670287e 100644 --- a/tools/data-conversion/msg/kafka-conversation.go +++ b/tools/data-conversion/msg/kafka-conversation.go @@ -51,14 +51,14 @@ func init() { client, err := sarama.NewSyncProducer([]string{KafkaAddr}, config) if err != nil { - fmt.Println("producer closed, err:", err) + ErrorPrint(fmt.Sprintln("producer closed, err:", err)) } producerV2 = client //Consumer consumerT, err := sarama.NewConsumer([]string{KafkaAddr}, sarama.NewConfig()) if err != nil { - fmt.Printf("fail to start consumer, err:%v\n", err) + ErrorPrint(fmt.Sprintf("fail to start consumer, err:%v\n", err)) } consumer = consumerT @@ -74,27 +74,23 @@ func SendMessage() { // Send a message pid, offset, err := producerV2.SendMessage(msg) if err != nil { - fmt.Println("send msg failed, err:", err) + ErrorPrint(fmt.Sprintln("send msg failed, err:", err)) } - fmt.Printf("pid:%v offset:%v\n", pid, offset) + SuccessPrint(fmt.Sprintf("pid:%v offset:%v\n", pid, offset)) } func GetMessage() { partitionList, err := consumer.Partitions(Topic) // Get all partitions according to topic if err != nil { - fmt.Printf("fail to get list of partition:err%v\n", err) + ErrorPrint(fmt.Sprintf("fail to get list of partition:err%v\n", err)) } fmt.Println(partitionList) //var ch chan int - if err != nil { - fmt.Printf("rpc err:%s", err) - } - for partition := range partitionList { pc, err := consumer.ConsumePartition(Topic, int32(partition), sarama.OffsetOldest) if err != nil { - panic(err) + ErrorPrint(fmt.Sprintf("fail to get partition:err%v\n", err)) } wg.Add(1) defer pc.AsyncClose() @@ -140,9 +136,9 @@ func Transfer(consumerMessages []*sarama.ConsumerMessage) { msgFromMQV2 := pbmsg.MsgDataToMQ{} err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQV2) if err != nil { - fmt.Printf("err:%s \n", err) + ErrorPrint(fmt.Sprintf("err:%s \n", err)) } - fmt.Printf("msg:%s \n", &msgFromMQV2) + SuccessPrint(fmt.Sprintf("msg:%s \n", &msgFromMQV2)) //fmt.Printf("rpcClient:%s \n", msgRpcClient) if msgFromMQV2.MsgData.ContentType == constant.Text { text := string(msgFromMQV2.MsgData.Content) @@ -151,7 +147,7 @@ func Transfer(consumerMessages []*sarama.ConsumerMessage) { } msgFromMQV2.MsgData.Content, err = json.Marshal(textElem) if err != nil { - fmt.Printf("test err: %s \n", err) + ErrorPrint(fmt.Sprintf("test err: %s \n", err)) } } if msgFromMQV2.MsgData.SessionType == constant.SingleChatType || msgFromMQV2.MsgData.SessionType == constant.NotificationChatType { @@ -193,9 +189,9 @@ func Transfer(consumerMessages []*sarama.ConsumerMessage) { ctx := context.WithValue(context.Background(), "operationID", msgFromMQV2.OperationID) resp, err := msgRpcClient.SendMsg(ctx, &msgv3.SendMsgReq{MsgData: msgData}) if err != nil { - fmt.Printf("resp err: %s \n", err) + ErrorPrint(fmt.Sprintf("resp err: %s \n", err)) } - fmt.Printf("resp: %s \n", resp) + SuccessPrint(fmt.Sprintf("resp: %s \n", resp)) } else if msgFromMQV2.MsgData.SessionType == constant.GroupChatType { if string(consumerMessages[i].Key) != msgFromMQV2.MsgData.SendID { continue @@ -238,9 +234,9 @@ func Transfer(consumerMessages []*sarama.ConsumerMessage) { ctx := context.WithValue(context.Background(), "operationID", msgFromMQV2.OperationID) resp, err := msgRpcClient.SendMsg(ctx, &msgv3.SendMsgReq{MsgData: msgData}) if err != nil { - fmt.Printf("resp err: %s \n", err) + ErrorPrint(fmt.Sprintf("resp err: %s \n", err)) } - fmt.Printf("resp: %s \n", resp) + SuccessPrint(fmt.Sprintf("resp: %s \n", resp)) } else if msgFromMQV2.MsgData.SessionType == constant.SuperGroupChatType { if msgFromMQV2.MsgData.ContentType < constant.ContentTypeBegin || msgFromMQV2.MsgData.ContentType > constant.AdvancedText { continue @@ -280,9 +276,9 @@ func Transfer(consumerMessages []*sarama.ConsumerMessage) { ctx := context.WithValue(context.Background(), "operationID", msgFromMQV2.OperationID) resp, err := msgRpcClient.SendMsg(ctx, &msgv3.SendMsgReq{MsgData: msgData}) if err != nil { - fmt.Printf("resp err: %s \n", err) + ErrorPrint(fmt.Sprintf("resp err: %s \n", err)) } - fmt.Printf("resp: %s \n", resp) + SuccessPrint(fmt.Sprintf("resp: %s \n", resp)) } fmt.Printf("\n\n\n") } @@ -313,13 +309,12 @@ func NewMessage() msgv3.MsgClient { openKeeper.WithLogger(log.NewZkLogger()), ) if err != nil { - fmt.Printf("discov, err:%s", err) + ErrorPrint(fmt.Sprintf("discov, err:%s", err)) } discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) conn, err := discov.GetConn(context.Background(), MsgRpcName) if err != nil { - fmt.Printf("conn, err:%s", err) - panic(err) + ErrorPrint(fmt.Sprintf("conn, err:%s", err)) } client := msgv3.NewMsgClient(conn) return client