|
|
|
@ -51,14 +51,14 @@ func init() {
|
|
|
|
|
|
|
|
|
|
client, err := sarama.NewSyncProducer([]string{KafkaAddr}, config)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Println("producer closed, err:", err)
|
|
|
|
|
ErrorPrint(fmt.Sprintln("producer closed, err:", err))
|
|
|
|
|
}
|
|
|
|
|
producerV2 = client
|
|
|
|
|
|
|
|
|
|
//Consumer
|
|
|
|
|
consumerT, err := sarama.NewConsumer([]string{KafkaAddr}, sarama.NewConfig())
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Printf("fail to start consumer, err:%v\n", err)
|
|
|
|
|
ErrorPrint(fmt.Sprintf("fail to start consumer, err:%v\n", err))
|
|
|
|
|
}
|
|
|
|
|
consumer = consumerT
|
|
|
|
|
|
|
|
|
@ -74,27 +74,23 @@ func SendMessage() {
|
|
|
|
|
// Send a message
|
|
|
|
|
pid, offset, err := producerV2.SendMessage(msg)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Println("send msg failed, err:", err)
|
|
|
|
|
ErrorPrint(fmt.Sprintln("send msg failed, err:", err))
|
|
|
|
|
}
|
|
|
|
|
fmt.Printf("pid:%v offset:%v\n", pid, offset)
|
|
|
|
|
SuccessPrint(fmt.Sprintf("pid:%v offset:%v\n", pid, offset))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func GetMessage() {
|
|
|
|
|
partitionList, err := consumer.Partitions(Topic) // Get all partitions according to topic
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Printf("fail to get list of partition:err%v\n", err)
|
|
|
|
|
ErrorPrint(fmt.Sprintf("fail to get list of partition:err%v\n", err))
|
|
|
|
|
}
|
|
|
|
|
fmt.Println(partitionList)
|
|
|
|
|
//var ch chan int
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Printf("rpc err:%s", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for partition := range partitionList {
|
|
|
|
|
pc, err := consumer.ConsumePartition(Topic, int32(partition), sarama.OffsetOldest)
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
ErrorPrint(fmt.Sprintf("fail to get partition:err%v\n", err))
|
|
|
|
|
}
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
defer pc.AsyncClose()
|
|
|
|
@ -140,9 +136,9 @@ func Transfer(consumerMessages []*sarama.ConsumerMessage) {
|
|
|
|
|
msgFromMQV2 := pbmsg.MsgDataToMQ{}
|
|
|
|
|
err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQV2)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Printf("err:%s \n", err)
|
|
|
|
|
ErrorPrint(fmt.Sprintf("err:%s \n", err))
|
|
|
|
|
}
|
|
|
|
|
fmt.Printf("msg:%s \n", &msgFromMQV2)
|
|
|
|
|
SuccessPrint(fmt.Sprintf("msg:%s \n", &msgFromMQV2))
|
|
|
|
|
//fmt.Printf("rpcClient:%s \n", msgRpcClient)
|
|
|
|
|
if msgFromMQV2.MsgData.ContentType == constant.Text {
|
|
|
|
|
text := string(msgFromMQV2.MsgData.Content)
|
|
|
|
@ -151,7 +147,7 @@ func Transfer(consumerMessages []*sarama.ConsumerMessage) {
|
|
|
|
|
}
|
|
|
|
|
msgFromMQV2.MsgData.Content, err = json.Marshal(textElem)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Printf("test err: %s \n", err)
|
|
|
|
|
ErrorPrint(fmt.Sprintf("test err: %s \n", err))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if msgFromMQV2.MsgData.SessionType == constant.SingleChatType || msgFromMQV2.MsgData.SessionType == constant.NotificationChatType {
|
|
|
|
@ -193,9 +189,9 @@ func Transfer(consumerMessages []*sarama.ConsumerMessage) {
|
|
|
|
|
ctx := context.WithValue(context.Background(), "operationID", msgFromMQV2.OperationID)
|
|
|
|
|
resp, err := msgRpcClient.SendMsg(ctx, &msgv3.SendMsgReq{MsgData: msgData})
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Printf("resp err: %s \n", err)
|
|
|
|
|
ErrorPrint(fmt.Sprintf("resp err: %s \n", err))
|
|
|
|
|
}
|
|
|
|
|
fmt.Printf("resp: %s \n", resp)
|
|
|
|
|
SuccessPrint(fmt.Sprintf("resp: %s \n", resp))
|
|
|
|
|
} else if msgFromMQV2.MsgData.SessionType == constant.GroupChatType {
|
|
|
|
|
if string(consumerMessages[i].Key) != msgFromMQV2.MsgData.SendID {
|
|
|
|
|
continue
|
|
|
|
@ -238,9 +234,9 @@ func Transfer(consumerMessages []*sarama.ConsumerMessage) {
|
|
|
|
|
ctx := context.WithValue(context.Background(), "operationID", msgFromMQV2.OperationID)
|
|
|
|
|
resp, err := msgRpcClient.SendMsg(ctx, &msgv3.SendMsgReq{MsgData: msgData})
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Printf("resp err: %s \n", err)
|
|
|
|
|
ErrorPrint(fmt.Sprintf("resp err: %s \n", err))
|
|
|
|
|
}
|
|
|
|
|
fmt.Printf("resp: %s \n", resp)
|
|
|
|
|
SuccessPrint(fmt.Sprintf("resp: %s \n", resp))
|
|
|
|
|
} else if msgFromMQV2.MsgData.SessionType == constant.SuperGroupChatType {
|
|
|
|
|
if msgFromMQV2.MsgData.ContentType < constant.ContentTypeBegin || msgFromMQV2.MsgData.ContentType > constant.AdvancedText {
|
|
|
|
|
continue
|
|
|
|
@ -280,9 +276,9 @@ func Transfer(consumerMessages []*sarama.ConsumerMessage) {
|
|
|
|
|
ctx := context.WithValue(context.Background(), "operationID", msgFromMQV2.OperationID)
|
|
|
|
|
resp, err := msgRpcClient.SendMsg(ctx, &msgv3.SendMsgReq{MsgData: msgData})
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Printf("resp err: %s \n", err)
|
|
|
|
|
ErrorPrint(fmt.Sprintf("resp err: %s \n", err))
|
|
|
|
|
}
|
|
|
|
|
fmt.Printf("resp: %s \n", resp)
|
|
|
|
|
SuccessPrint(fmt.Sprintf("resp: %s \n", resp))
|
|
|
|
|
}
|
|
|
|
|
fmt.Printf("\n\n\n")
|
|
|
|
|
}
|
|
|
|
@ -313,13 +309,12 @@ func NewMessage() msgv3.MsgClient {
|
|
|
|
|
openKeeper.WithLogger(log.NewZkLogger()),
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Printf("discov, err:%s", err)
|
|
|
|
|
ErrorPrint(fmt.Sprintf("discov, err:%s", err))
|
|
|
|
|
}
|
|
|
|
|
discov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
|
|
|
conn, err := discov.GetConn(context.Background(), MsgRpcName)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Printf("conn, err:%s", err)
|
|
|
|
|
panic(err)
|
|
|
|
|
ErrorPrint(fmt.Sprintf("conn, err:%s", err))
|
|
|
|
|
}
|
|
|
|
|
client := msgv3.NewMsgClient(conn)
|
|
|
|
|
return client
|
|
|
|
|