@ -18,6 +18,7 @@ import (
"context"
"context"
"encoding/json"
"encoding/json"
"fmt"
"fmt"
. "github.com/OpenIMSDK/Open-IM-Server/tools/conversion/common"
pbmsg "github.com/OpenIMSDK/Open-IM-Server/tools/conversion/proto/msg"
pbmsg "github.com/OpenIMSDK/Open-IM-Server/tools/conversion/proto/msg"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/constant"
msgv3 "github.com/OpenIMSDK/protocol/msg"
msgv3 "github.com/OpenIMSDK/protocol/msg"
@ -33,20 +34,6 @@ import (
"time"
"time"
)
)
var (
topic = "ws2ms_chat"
addr = "127.0.0.1:9092"
//addr = "43.128.72.19:9092"
)
const (
ZkAddr = "43.134.63.160:2181"
ZKSchema = "openim"
ZKUsername = ""
ZKPassword = ""
MsgName = "Msg"
)
var consumer sarama . Consumer
var consumer sarama . Consumer
var producerV2 sarama . SyncProducer
var producerV2 sarama . SyncProducer
var wg sync . WaitGroup
var wg sync . WaitGroup
@ -62,14 +49,14 @@ func init() {
config . Producer . RequiredAcks = sarama . WaitForAll // Set producer Message Reply level 0 1 all
config . Producer . RequiredAcks = sarama . WaitForAll // Set producer Message Reply level 0 1 all
config . Producer . Partitioner = sarama . NewHashPartitioner // Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly
config . Producer . Partitioner = sarama . NewHashPartitioner // Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly
client , err := sarama . NewSyncProducer ( [ ] string { addr} , config )
client , err := sarama . NewSyncProducer ( [ ] string { K afkaA ddr} , config )
if err != nil {
if err != nil {
fmt . Println ( "producer closed, err:" , err )
fmt . Println ( "producer closed, err:" , err )
}
}
producerV2 = client
producerV2 = client
//Consumer
//Consumer
consumerT , err := sarama . NewConsumer ( [ ] string { addr} , sarama . NewConfig ( ) )
consumerT , err := sarama . NewConsumer ( [ ] string { K afkaA ddr} , sarama . NewConfig ( ) )
if err != nil {
if err != nil {
fmt . Printf ( "fail to start consumer, err:%v\n" , err )
fmt . Printf ( "fail to start consumer, err:%v\n" , err )
}
}
@ -81,7 +68,7 @@ func init() {
func SendMessage ( ) {
func SendMessage ( ) {
// construct a message
// construct a message
msg := & sarama . ProducerMessage { }
msg := & sarama . ProducerMessage { }
msg . Topic = t opic
msg . Topic = T opic
msg . Value = sarama . StringEncoder ( "this is a test log" )
msg . Value = sarama . StringEncoder ( "this is a test log" )
// Send a message
// Send a message
@ -93,7 +80,7 @@ func SendMessage() {
}
}
func GetMessage ( ) {
func GetMessage ( ) {
partitionList , err := consumer . Partitions ( t opic) // Get all partitions according to topic
partitionList , err := consumer . Partitions ( T opic) // Get all partitions according to topic
if err != nil {
if err != nil {
fmt . Printf ( "fail to get list of partition:err%v\n" , err )
fmt . Printf ( "fail to get list of partition:err%v\n" , err )
}
}
@ -105,7 +92,7 @@ func GetMessage() {
}
}
for partition := range partitionList {
for partition := range partitionList {
pc , err := consumer . ConsumePartition ( t opic, int32 ( partition ) , sarama . OffsetOldest )
pc , err := consumer . ConsumePartition ( T opic, int32 ( partition ) , sarama . OffsetOldest )
if err != nil {
if err != nil {
panic ( err )
panic ( err )
}
}
@ -329,7 +316,7 @@ func NewMessage() msgv3.MsgClient {
fmt . Printf ( "discov, err:%s" , err )
fmt . Printf ( "discov, err:%s" , err )
}
}
discov . AddOption ( mw . GrpcClient ( ) , grpc . WithTransportCredentials ( insecure . NewCredentials ( ) ) )
discov . AddOption ( mw . GrpcClient ( ) , grpc . WithTransportCredentials ( insecure . NewCredentials ( ) ) )
conn , err := discov . GetConn ( context . Background ( ) , Msg Name)
conn , err := discov . GetConn ( context . Background ( ) , Msg Rpc Name)
if err != nil {
if err != nil {
fmt . Printf ( "conn, err:%s" , err )
fmt . Printf ( "conn, err:%s" , err )
panic ( err )
panic ( err )