diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index 01b84e85e..6da75972d 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -24,17 +24,13 @@ type MConsumerGroupConfig struct { IsReturnErr bool } -func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addr []string, groupID string) *MConsumerGroup { +func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) *MConsumerGroup { config := sarama.NewConfig() config.Version = consumerConfig.KafkaVersion config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial config.Consumer.Return.Errors = consumerConfig.IsReturnErr - fmt.Println("init address is ", addr, "topics is ", topics) - client, err := sarama.NewClient(addr, config) - if err != nil { - panic(err.Error()) - } - consumerGroup, err := sarama.NewConsumerGroupFromClient(groupID, client) + fmt.Println("init address is ", addrs, "topics is ", topics) + consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, config) if err != nil { panic(err.Error()) }