You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Open-IM-Server/pkg/common/kafka/consumer.go

44 lines
958 B

package kafka
import (
2 years ago
"Open_IM/pkg/common/config"
"sync"
2 years ago
"github.com/Shopify/sarama"
)
type Consumer struct {
addr []string
WG sync.WaitGroup
Topic string
PartitionList []int32
Consumer sarama.Consumer
}
func NewKafkaConsumer(addr []string, topic string) *Consumer {
p := Consumer{}
p.Topic = topic
p.addr = addr
2 years ago
consumerConfig := sarama.NewConfig()
2 years ago
if config.Config.Kafka.SASLUserName != "" && config.Config.Kafka.SASLPassword != "" {
2 years ago
consumerConfig.Net.SASL.Enable = true
2 years ago
consumerConfig.Net.SASL.User = config.Config.Kafka.SASLUserName
consumerConfig.Net.SASL.Password = config.Config.Kafka.SASLPassword
2 years ago
}
consumer, err := sarama.NewConsumer(p.addr, consumerConfig)
if err != nil {
panic(err.Error())
return nil
}
p.Consumer = consumer
partitionList, err := consumer.Partitions(p.Topic)
if err != nil {
panic(err.Error())
return nil
}
p.PartitionList = partitionList
return &p
}