feat: add TLS config to kafka consumer group.

pull/956/head
Wang Zhi 2 years ago committed by GitHub
parent 6d28a6f612
commit 500edc4568
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -17,6 +17,7 @@ package kafka
import ( import (
"context" "context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/log"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
@ -35,11 +36,17 @@ type MConsumerGroupConfig struct {
} }
func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) *MConsumerGroup { func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) *MConsumerGroup {
config := sarama.NewConfig() consumerGroupConfig := sarama.NewConfig()
config.Version = consumerConfig.KafkaVersion consumerGroupConfig.Version = consumerConfig.KafkaVersion
config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial consumerGroupConfig.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial
config.Consumer.Return.Errors = consumerConfig.IsReturnErr consumerGroupConfig.Consumer.Return.Errors = consumerConfig.IsReturnErr
consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, config) if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" {
consumerGroupConfig.Net.SASL.Enable = true
consumerGroupConfig.Net.SASL.User = config.Config.Kafka.Username
consumerGroupConfig.Net.SASL.Password = config.Config.Kafka.Password
}
SetupTLSConfig(consumerGroupConfig)
consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, consumerGroupConfig)
if err != nil { if err != nil {
panic(err.Error()) panic(err.Error())
} }

Loading…
Cancel
Save