diff --git a/internal/api/msg.go b/internal/api/msg.go index 726889d56..c3d398975 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -170,6 +170,7 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM log.ZDebug(c, "getSendMsgReq", "req", req.Content) switch req.ContentType { case constant.Text: + data = apistruct.TextElem{} case constant.Picture: data = apistruct.PictureElem{} diff --git a/test/data-conversion/kafka-conversation.go b/test/data-conversion/kafka-conversation.go deleted file mode 100644 index e3a26e009..000000000 --- a/test/data-conversion/kafka-conversation.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package data_conversion - -import ( - "fmt" - - "github.com/Shopify/sarama" -) - -var ( - topic = "latestMsgToRedis" - addr = "127.0.0.1:9092" -) - -var consumer sarama.Consumer -var producer sarama.SyncProducer - -func init() { - - //Producer - config := sarama.NewConfig() // Instantiate a sarama Config - config.Producer.Return.Successes = true // Whether to enable the successes channel to be notified after the message is sent successfully - config.Producer.Return.Errors = true - config.Producer.RequiredAcks = sarama.WaitForAll // Set producer Message Reply level 0 1 all - config.Producer.Partitioner = sarama.NewHashPartitioner // Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly - - client, err := sarama.NewSyncProducer([]string{addr}, config) - if err != nil { - fmt.Println("producer closed, err:", err) - } - producer = client - - //Consumer - consumerT, err := sarama.NewConsumer([]string{addr}, sarama.NewConfig()) - if err != nil { - fmt.Printf("fail to start consumer, err:%v\n", err) - } - consumer = consumerT -} - -func SendMessage() { - // construct a message - msg := &sarama.ProducerMessage{} - msg.Topic = topic - msg.Value = sarama.StringEncoder("this is a test log") - - // Send a message - pid, offset, err := producer.SendMessage(msg) - if err != nil { - fmt.Println("send msg failed, err:", err) - } - fmt.Printf("pid:%v offset:%v\n", pid, offset) -} - -func GetMessage() { - partitionList, err := consumer.Partitions(topic) // Get all partitions according to topic - if err != nil { - fmt.Printf("fail to get list of partition:err%v\n", err) - } - fmt.Println(partitionList) - for partition := range partitionList { // iterate over all partitions - // Create a corresponding partition consumer for each partition - pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) - if err != nil { - fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err) - } - // Asynchronously consume information from each partition - go func(sarama.PartitionConsumer) { - for msg := range pc.Messages() { - fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value) - } - }(pc) - } -} diff --git a/test/data-conversion/kafka_test.go b/test/data-conversion/kafka_test.go deleted file mode 100644 index e926c4541..000000000 --- a/test/data-conversion/kafka_test.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package data_conversion - -import "testing" - -func TestGetMessage(t *testing.T) { - GetMessage() -} - -func TestSendMessage(t *testing.T) { - SendMessage() -} diff --git a/test/data-conversion/mongodb-conversion.go b/test/data-conversion/mongodb-conversion.go deleted file mode 100644 index 975b28ead..000000000 --- a/test/data-conversion/mongodb-conversion.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package data_conversion diff --git a/test/data-conversion/mysql-conversion.go b/test/data-conversion/mysql-conversion.go deleted file mode 100644 index 08d7ab789..000000000 --- a/test/data-conversion/mysql-conversion.go +++ /dev/null @@ -1,228 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package data_conversion - -import ( - "context" - "fmt" - "time" - - "github.com/OpenIMSDK/tools/log" - "gorm.io/driver/mysql" - "gorm.io/gorm" - - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" -) - -var ( - MysqlDb_v2 *gorm.DB - MysqlDb_v3 *gorm.DB -) - -const ( - username_v2 = "root" - password_v2 = "123456" - ip_v2 = "127.0.0.1:3306" - database_v2 = "openim_v2" -) - -const ( - username_v3 = "root" - password_v3 = "123456" - ip_v3 = "127.0.0.1:3306" - database_v3 = "openim_v3" -) - -func init() { - dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", - username_v2, - password_v2, - ip_v2, - database_v2, - ) - db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) - MysqlDb_v2 = db - if err != nil { - log.ZDebug(context.Background(), "err", err) - } - - dsn_v3 := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", - username_v3, - password_v3, - ip_v3, - database_v3, - ) - db_v3, err := gorm.Open(mysql.Open(dsn_v3), &gorm.Config{}) - MysqlDb_v3 = db_v3 - if err != nil { - log.ZDebug(context.Background(), "err", err) - } -} - -func UserConversion() { - var count int64 - var user relation.UserModel - MysqlDb_v2.Model(&user).Count(&count) - batchSize := 100 - offset := 0 - - for int64(offset) < count { - var results []relation.UserModel - MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results) - // Process query results - fmt.Println("============================batch data===================", offset, batchSize) - fmt.Println(results) - MysqlDb_v3.Create(results) - fmt.Println("======================================================") - offset += batchSize - } -} - -func FriendConversion() { - var count int64 - var friend relation.FriendModel - MysqlDb_v2.Model(&friend).Count(&count) - batchSize := 100 - offset := 0 - - for int64(offset) < count { - var results []relation.FriendModel - MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results) - // Process query results - fmt.Println("============================batch data===================", offset, batchSize) - fmt.Println(results) - MysqlDb_v3.Create(results) - fmt.Println("======================================================") - offset += batchSize - } -} - -func RequestConversion() { - var count int64 - var friendRequest relation.FriendRequestModel - MysqlDb_v2.Model(&friendRequest).Count(&count) - batchSize := 100 - offset := 0 - - for int64(offset) < count { - var results []relation.FriendRequestModel - MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results) - // Process query results - fmt.Println("============================batch data===================", offset, batchSize) - fmt.Println(results) - MysqlDb_v3.Create(results) - fmt.Println("======================================================") - offset += batchSize - } - - var groupRequest relation.GroupRequestModel - MysqlDb_v2.Model(&groupRequest).Count(&count) - batchSize = 100 - offset = 0 - - for int64(offset) < count { - var results []relation.GroupRequestModel - MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results) - // Process query results - fmt.Println("============================batch data===================", offset, batchSize) - fmt.Println(results) - MysqlDb_v3.Create(results) - fmt.Println("======================================================") - offset += batchSize - } -} - -func GroupConversion() { - var count int64 - var group relation.GroupModel - MysqlDb_v2.Model(&group).Count(&count) - batchSize := 100 - offset := 0 - - for int64(offset) < count { - var results []relation.GroupModel - MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results) - for i, val := range results { - temp := time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC) - if val.NotificationUpdateTime.Equal(temp) { - results[i].NotificationUpdateTime = time.Now() - //fmt.Println(val.NotificationUpdateTime) - } - } - // Process query results - fmt.Println("============================batch data===================", offset, batchSize) - fmt.Println(results) - MysqlDb_v3.Create(results) - fmt.Println("======================================================") - offset += batchSize - } -} - -func GroupMemberConversion() { - var count int64 - var groupMember relation.GroupMemberModel - MysqlDb_v2.Model(&groupMember).Count(&count) - batchSize := 100 - offset := 0 - - for int64(offset) < count { - var results []relation.GroupMemberModel - MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results) - // Process query results - fmt.Println("============================batch data===================", offset, batchSize) - fmt.Println(results) - MysqlDb_v3.Create(results) - fmt.Println("======================================================") - offset += batchSize - } -} - -func BlacksConversion() { - var count int64 - var black relation.BlackModel - MysqlDb_v2.Model(&black).Count(&count) - batchSize := 100 - offset := 0 - - for int64(offset) < count { - var results []relation.BlackModel - MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results) - // Process query results - fmt.Println("============================batch data===================", offset, batchSize) - fmt.Println(results) - MysqlDb_v3.Create(results) - fmt.Println("======================================================") - offset += batchSize - } -} - -func ChatLogsConversion() { - var count int64 - var chat relation.ChatLogModel - MysqlDb_v2.Model(&chat).Count(&count) - batchSize := 100 - offset := 0 - - for int64(offset) < count { - var results []relation.ChatLogModel - MysqlDb_v2.Limit(batchSize).Offset(offset).Find(&results) - // Process query results - fmt.Println("============================batch data===================", offset, batchSize) - //fmt.Println(results) - MysqlDb_v3.Create(results) - fmt.Println("======================================================") - offset += batchSize - } -} diff --git a/test/data-conversion/mysql_test.go b/test/data-conversion/mysql_test.go deleted file mode 100644 index 3e3f3ad06..000000000 --- a/test/data-conversion/mysql_test.go +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package data_conversion - -import "testing" - -// pass -func TestUserConversion(t *testing.T) { - UserConversion() -} - -// pass -func TestFriendConversion(t *testing.T) { - FriendConversion() -} - -// pass -func TestGroupConversion(t *testing.T) { - GroupConversion() - GroupMemberConversion() -} - -// pass -func TestBlacksConversion(t *testing.T) { - BlacksConversion() -} - -// pass -func TestRequestConversion(t *testing.T) { - RequestConversion() -} - -// pass -func TestChatLogsConversion(t *testing.T) { - // If the printed result is too long, the console will not display it, but it can run normally - ChatLogsConversion() -} diff --git a/test/data-conversion/redis-conversion.go b/test/data-conversion/redis-conversion.go deleted file mode 100644 index 975b28ead..000000000 --- a/test/data-conversion/redis-conversion.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package data_conversion diff --git a/tools/data-conversion/common/config.go b/tools/data-conversion/common/config.go index 1a09feedf..0d2812569 100644 --- a/tools/data-conversion/common/config.go +++ b/tools/data-conversion/common/config.go @@ -1,5 +1,7 @@ package common +// MySQL +// V2 const ( UsernameV2 = "root" PasswordV2 = "openIM123" @@ -7,9 +9,25 @@ const ( DatabaseV2 = "openIM_v2" ) +// V3 const ( UsernameV3 = "root" PasswordV3 = "openIM123" IpV3 = "43.134.63.160:13306" DatabaseV3 = "openIM_v3" ) + +// Kafka +const ( + Topic = "ws2ms_chat" + KafkaAddr = "127.0.0.1:9092" +) + +// Zookeeper +const ( + ZkAddr = "43.134.63.160:2181" + ZKSchema = "openim" + ZKUsername = "" + ZKPassword = "" + MsgRpcName = "Msg" +) diff --git a/tools/data-conversion/conversion.go b/tools/data-conversion/conversion.go index 08e247509..790580777 100644 --- a/tools/data-conversion/conversion.go +++ b/tools/data-conversion/conversion.go @@ -1 +1,5 @@ -package data_conversion +package main + +func main() { + +} diff --git a/tools/data-conversion/msg/kafka-conversation.go b/tools/data-conversion/msg/kafka-conversation.go index 9c6ef5217..055b038e0 100644 --- a/tools/data-conversion/msg/kafka-conversation.go +++ b/tools/data-conversion/msg/kafka-conversation.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + . "github.com/OpenIMSDK/Open-IM-Server/tools/conversion/common" pbmsg "github.com/OpenIMSDK/Open-IM-Server/tools/conversion/proto/msg" "github.com/OpenIMSDK/protocol/constant" msgv3 "github.com/OpenIMSDK/protocol/msg" @@ -33,20 +34,6 @@ import ( "time" ) -var ( - topic = "ws2ms_chat" - addr = "127.0.0.1:9092" - //addr = "43.128.72.19:9092" -) - -const ( - ZkAddr = "43.134.63.160:2181" - ZKSchema = "openim" - ZKUsername = "" - ZKPassword = "" - MsgName = "Msg" -) - var consumer sarama.Consumer var producerV2 sarama.SyncProducer var wg sync.WaitGroup @@ -62,14 +49,14 @@ func init() { config.Producer.RequiredAcks = sarama.WaitForAll // Set producer Message Reply level 0 1 all config.Producer.Partitioner = sarama.NewHashPartitioner // Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly - client, err := sarama.NewSyncProducer([]string{addr}, config) + client, err := sarama.NewSyncProducer([]string{KafkaAddr}, config) if err != nil { fmt.Println("producer closed, err:", err) } producerV2 = client //Consumer - consumerT, err := sarama.NewConsumer([]string{addr}, sarama.NewConfig()) + consumerT, err := sarama.NewConsumer([]string{KafkaAddr}, sarama.NewConfig()) if err != nil { fmt.Printf("fail to start consumer, err:%v\n", err) } @@ -81,7 +68,7 @@ func init() { func SendMessage() { // construct a message msg := &sarama.ProducerMessage{} - msg.Topic = topic + msg.Topic = Topic msg.Value = sarama.StringEncoder("this is a test log") // Send a message @@ -93,7 +80,7 @@ func SendMessage() { } func GetMessage() { - partitionList, err := consumer.Partitions(topic) // Get all partitions according to topic + partitionList, err := consumer.Partitions(Topic) // Get all partitions according to topic if err != nil { fmt.Printf("fail to get list of partition:err%v\n", err) } @@ -105,7 +92,7 @@ func GetMessage() { } for partition := range partitionList { - pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetOldest) + pc, err := consumer.ConsumePartition(Topic, int32(partition), sarama.OffsetOldest) if err != nil { panic(err) } @@ -329,7 +316,7 @@ func NewMessage() msgv3.MsgClient { fmt.Printf("discov, err:%s", err) } discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) - conn, err := discov.GetConn(context.Background(), MsgName) + conn, err := discov.GetConn(context.Background(), MsgRpcName) if err != nil { fmt.Printf("conn, err:%s", err) panic(err)