diff --git a/internal/api/msg.go b/internal/api/msg.go index 726889d56..c3d398975 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -170,6 +170,7 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM log.ZDebug(c, "getSendMsgReq", "req", req.Content) switch req.ContentType { case constant.Text: + data = apistruct.TextElem{} case constant.Picture: data = apistruct.PictureElem{} diff --git a/tools/data-conversion/common/config.go b/tools/data-conversion/common/config.go index 1a09feedf..0d2812569 100644 --- a/tools/data-conversion/common/config.go +++ b/tools/data-conversion/common/config.go @@ -1,5 +1,7 @@ package common +// MySQL +// V2 const ( UsernameV2 = "root" PasswordV2 = "openIM123" @@ -7,9 +9,25 @@ const ( DatabaseV2 = "openIM_v2" ) +// V3 const ( UsernameV3 = "root" PasswordV3 = "openIM123" IpV3 = "43.134.63.160:13306" DatabaseV3 = "openIM_v3" ) + +// Kafka +const ( + Topic = "ws2ms_chat" + KafkaAddr = "127.0.0.1:9092" +) + +// Zookeeper +const ( + ZkAddr = "43.134.63.160:2181" + ZKSchema = "openim" + ZKUsername = "" + ZKPassword = "" + MsgRpcName = "Msg" +) diff --git a/tools/data-conversion/conversion.go b/tools/data-conversion/conversion.go index 08e247509..790580777 100644 --- a/tools/data-conversion/conversion.go +++ b/tools/data-conversion/conversion.go @@ -1 +1,5 @@ -package data_conversion +package main + +func main() { + +} diff --git a/tools/data-conversion/msg/kafka-conversation.go b/tools/data-conversion/msg/kafka-conversation.go index 9c6ef5217..055b038e0 100644 --- a/tools/data-conversion/msg/kafka-conversation.go +++ b/tools/data-conversion/msg/kafka-conversation.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + . "github.com/OpenIMSDK/Open-IM-Server/tools/conversion/common" pbmsg "github.com/OpenIMSDK/Open-IM-Server/tools/conversion/proto/msg" "github.com/OpenIMSDK/protocol/constant" msgv3 "github.com/OpenIMSDK/protocol/msg" @@ -33,20 +34,6 @@ import ( "time" ) -var ( - topic = "ws2ms_chat" - addr = "127.0.0.1:9092" - //addr = "43.128.72.19:9092" -) - -const ( - ZkAddr = "43.134.63.160:2181" - ZKSchema = "openim" - ZKUsername = "" - ZKPassword = "" - MsgName = "Msg" -) - var consumer sarama.Consumer var producerV2 sarama.SyncProducer var wg sync.WaitGroup @@ -62,14 +49,14 @@ func init() { config.Producer.RequiredAcks = sarama.WaitForAll // Set producer Message Reply level 0 1 all config.Producer.Partitioner = sarama.NewHashPartitioner // Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly - client, err := sarama.NewSyncProducer([]string{addr}, config) + client, err := sarama.NewSyncProducer([]string{KafkaAddr}, config) if err != nil { fmt.Println("producer closed, err:", err) } producerV2 = client //Consumer - consumerT, err := sarama.NewConsumer([]string{addr}, sarama.NewConfig()) + consumerT, err := sarama.NewConsumer([]string{KafkaAddr}, sarama.NewConfig()) if err != nil { fmt.Printf("fail to start consumer, err:%v\n", err) } @@ -81,7 +68,7 @@ func init() { func SendMessage() { // construct a message msg := &sarama.ProducerMessage{} - msg.Topic = topic + msg.Topic = Topic msg.Value = sarama.StringEncoder("this is a test log") // Send a message @@ -93,7 +80,7 @@ func SendMessage() { } func GetMessage() { - partitionList, err := consumer.Partitions(topic) // Get all partitions according to topic + partitionList, err := consumer.Partitions(Topic) // Get all partitions according to topic if err != nil { fmt.Printf("fail to get list of partition:err%v\n", err) } @@ -105,7 +92,7 @@ func GetMessage() { } for partition := range partitionList { - pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetOldest) + pc, err := consumer.ConsumePartition(Topic, int32(partition), sarama.OffsetOldest) if err != nil { panic(err) } @@ -329,7 +316,7 @@ func NewMessage() msgv3.MsgClient { fmt.Printf("discov, err:%s", err) } discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) - conn, err := discov.GetConn(context.Background(), MsgName) + conn, err := discov.GetConn(context.Background(), MsgRpcName) if err != nil { fmt.Printf("conn, err:%s", err) panic(err)