diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index b4bd81660..38b8c041c 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -17,6 +17,7 @@ package kafka import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/tools/log" "github.com/Shopify/sarama" @@ -35,11 +36,17 @@ type MConsumerGroupConfig struct { } func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) *MConsumerGroup { - config := sarama.NewConfig() - config.Version = consumerConfig.KafkaVersion - config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial - config.Consumer.Return.Errors = consumerConfig.IsReturnErr - consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, config) + consumerGroupConfig := sarama.NewConfig() + consumerGroupConfig.Version = consumerConfig.KafkaVersion + consumerGroupConfig.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial + consumerGroupConfig.Consumer.Return.Errors = consumerConfig.IsReturnErr + if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" { + consumerGroupConfig.Net.SASL.Enable = true + consumerGroupConfig.Net.SASL.User = config.Config.Kafka.Username + consumerGroupConfig.Net.SASL.Password = config.Config.Kafka.Password + } + SetupTLSConfig(consumerGroupConfig) + consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, consumerGroupConfig) if err != nil { panic(err.Error()) }