@ -23,11 +23,7 @@ import (
)
)
var (
var (
// msgInsertMysqlProcessed perometheus.Countr
msgInsertMysqlProcessed prometheus . Counter
msgInsertMysqlProcessed = promauto . NewCounter ( prometheus . CounterOpts {
Name : "insert_mysql_msg_total" ,
Help : "The total number of msg insert mysql events" ,
} )
)
)
type PersistentConsumerHandler struct {
type PersistentConsumerHandler struct {
@ -41,12 +37,12 @@ func (pc *PersistentConsumerHandler) Init() {
pc . persistentConsumerGroup = kfk . NewMConsumerGroup ( & kfk . MConsumerGroupConfig { KafkaVersion : sarama . V2_0_0_0 ,
pc . persistentConsumerGroup = kfk . NewMConsumerGroup ( & kfk . MConsumerGroupConfig { KafkaVersion : sarama . V2_0_0_0 ,
OffsetsInitial : sarama . OffsetNewest , IsReturnErr : false } , [ ] string { config . Config . Kafka . Ws2mschat . Topic } ,
OffsetsInitial : sarama . OffsetNewest , IsReturnErr : false } , [ ] string { config . Config . Kafka . Ws2mschat . Topic } ,
config . Config . Kafka . Ws2mschat . Addr , config . Config . Kafka . ConsumerGroupID . MsgToMySql )
config . Config . Kafka . Ws2mschat . Addr , config . Config . Kafka . ConsumerGroupID . MsgToMySql )
// if config.Config.Prometheus.Enable {
if config . Config . Prometheus . Enable {
// msgInsertMysqlProcessed = promauto.NewCounter(prometheus.CounterOpts{
msgInsertMysqlProcessed = promauto . NewCounter ( prometheus . CounterOpts {
// Name: "insert_mysql_msg_total",
Name : "insert_mysql_msg_total" ,
// Help: "The total number of msg insert mysql events",
Help : "The total number of msg insert mysql events" ,
// })
} )
// }
}
}
}
func ( pc * PersistentConsumerHandler ) handleChatWs2Mysql ( cMsg * sarama . ConsumerMessage , msgKey string , _ sarama . ConsumerGroupSession ) {
func ( pc * PersistentConsumerHandler ) handleChatWs2Mysql ( cMsg * sarama . ConsumerMessage , msgKey string , _ sarama . ConsumerGroupSession ) {