|
|
@ -28,14 +28,12 @@ func NewKafkaConsumer(addr []string, topic string) *Consumer {
|
|
|
|
consumer, err := sarama.NewConsumer(p.addr, consumerConfig)
|
|
|
|
consumer, err := sarama.NewConsumer(p.addr, consumerConfig)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
panic(err.Error())
|
|
|
|
panic(err.Error())
|
|
|
|
return nil
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
p.Consumer = consumer
|
|
|
|
p.Consumer = consumer
|
|
|
|
|
|
|
|
|
|
|
|
partitionList, err := consumer.Partitions(p.Topic)
|
|
|
|
partitionList, err := consumer.Partitions(p.Topic)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
panic(err.Error())
|
|
|
|
panic(err.Error())
|
|
|
|
return nil
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
p.PartitionList = partitionList
|
|
|
|
p.PartitionList = partitionList
|
|
|
|
|
|
|
|
|
|
|
|