|
|
@ -31,7 +31,7 @@ func (ms *PushConsumerHandler) Init() {
|
|
|
|
config.Config.Kafka.ConsumerGroupID.MsgToPush)
|
|
|
|
config.Config.Kafka.ConsumerGroupID.MsgToPush)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
func (ms *PushConsumerHandler) handleMs2PsChat(msg []byte) {
|
|
|
|
func (ms *PushConsumerHandler) handleMs2PsChat(msg []byte) {
|
|
|
|
log.InfoByKv("msg come from kafka And push!!!", "", "msg", string(msg))
|
|
|
|
log.NewDebug("", "msg come from kafka And push!!!", "msg", string(msg))
|
|
|
|
msgFromMQ := pbChat.PushMsgDataToMQ{}
|
|
|
|
msgFromMQ := pbChat.PushMsgDataToMQ{}
|
|
|
|
if err := proto.Unmarshal(msg, &msgFromMQ); err != nil {
|
|
|
|
if err := proto.Unmarshal(msg, &msgFromMQ); err != nil {
|
|
|
|
log.ErrorByKv("push Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
|
|
|
|
log.ErrorByKv("push Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
|
|
|
@ -45,7 +45,7 @@ func (PushConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return
|
|
|
|
func (ms *PushConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
|
|
|
func (ms *PushConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
|
|
|
claim sarama.ConsumerGroupClaim) error {
|
|
|
|
claim sarama.ConsumerGroupClaim) error {
|
|
|
|
for msg := range claim.Messages() {
|
|
|
|
for msg := range claim.Messages() {
|
|
|
|
log.InfoByKv("kafka get info to mysql", "", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
|
|
|
|
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
|
|
|
|
ms.msgHandle[msg.Topic](msg.Value)
|
|
|
|
ms.msgHandle[msg.Topic](msg.Value)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
return nil
|
|
|
|