From 303162f8a5c4f6827a74144d4ce71158d31c82bb Mon Sep 17 00:00:00 2001 From: Wang Zhi Date: Fri, 25 Aug 2023 21:00:53 +0800 Subject: [PATCH] Add TLS config for Kafka (#956) * feat: add TLS utility. * chore: rename pkg/tls/tls.go to pkg/common/tls/tls.go . * feat: add util for kafka TLS config. * feat: setup TLS config for kafka consumer. * feat: add TLS config to kafka consumer group. * feat: add TLS config for kafka producer. * chore: add TLS config for kafka. * feat: add TLS config for kafka checker. --- pkg/common/config/config.go | 13 ++++-- pkg/common/kafka/consumer.go | 1 + pkg/common/kafka/consumer_group.go | 17 ++++--- pkg/common/kafka/producer.go | 1 + pkg/common/kafka/util.go | 20 +++++++++ pkg/common/tls/tls.go | 71 ++++++++++++++++++++++++++++++ tools/component/component.go | 2 + 7 files changed, 117 insertions(+), 8 deletions(-) create mode 100644 pkg/common/kafka/util.go create mode 100644 pkg/common/tls/tls.go diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 754753b3b..c6dd41419 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -81,9 +81,16 @@ type configStruct struct { } `yaml:"redis"` Kafka struct { - Username string `yaml:"username"` - Password string `yaml:"password"` - Addr []string `yaml:"addr"` + Username string `yaml:"username"` + Password string `yaml:"password"` + Addr []string `yaml:"addr"` + TLS *struct { + CACrt string `yaml:"caCrt"` + ClientCrt string `yaml:"clientCrt"` + ClientKey string `yaml:"clientKey"` + ClientKeyPwd string `yaml:"clientKeyPwd"` + InsecureSkipVerify bool `yaml:"insecureSkipVerify"` + } `yaml:"tls"` LatestMsgToRedis struct { Topic string `yaml:"topic"` } `yaml:"latestMsgToRedis"` diff --git a/pkg/common/kafka/consumer.go b/pkg/common/kafka/consumer.go index 67bc3977b..e253ec5e0 100644 --- a/pkg/common/kafka/consumer.go +++ b/pkg/common/kafka/consumer.go @@ -40,6 +40,7 @@ func NewKafkaConsumer(addr []string, topic string) *Consumer { consumerConfig.Net.SASL.User = config.Config.Kafka.Username consumerConfig.Net.SASL.Password = config.Config.Kafka.Password } + SetupTLSConfig(consumerConfig) consumer, err := sarama.NewConsumer(p.addr, consumerConfig) if err != nil { panic(err.Error()) diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index b4bd81660..38b8c041c 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -17,6 +17,7 @@ package kafka import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/tools/log" "github.com/Shopify/sarama" @@ -35,11 +36,17 @@ type MConsumerGroupConfig struct { } func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) *MConsumerGroup { - config := sarama.NewConfig() - config.Version = consumerConfig.KafkaVersion - config.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial - config.Consumer.Return.Errors = consumerConfig.IsReturnErr - consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, config) + consumerGroupConfig := sarama.NewConfig() + consumerGroupConfig.Version = consumerConfig.KafkaVersion + consumerGroupConfig.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial + consumerGroupConfig.Consumer.Return.Errors = consumerConfig.IsReturnErr + if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" { + consumerGroupConfig.Net.SASL.Enable = true + consumerGroupConfig.Net.SASL.User = config.Config.Kafka.Username + consumerGroupConfig.Net.SASL.Password = config.Config.Kafka.Password + } + SetupTLSConfig(consumerGroupConfig) + consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, consumerGroupConfig) if err != nil { panic(err.Error()) } diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 938757d40..b7ec32714 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -60,6 +60,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer { } p.addr = addr p.topic = topic + SetupTLSConfig(p.config) var producer sarama.SyncProducer var err error for i := 0; i <= maxRetry; i++ { diff --git a/pkg/common/kafka/util.go b/pkg/common/kafka/util.go new file mode 100644 index 000000000..833757fb8 --- /dev/null +++ b/pkg/common/kafka/util.go @@ -0,0 +1,20 @@ +package kafka + +import ( + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/tls" + "github.com/Shopify/sarama" +) + +// SetupTLSConfig set up the TLS config from config file. +func SetupTLSConfig(cfg *sarama.Config) { + if config.Config.Kafka.TLS != nil { + cfg.Net.TLS.Enable = true + cfg.Net.TLS.Config = tls.NewTLSConfig( + config.Config.Kafka.TLS.ClientCrt, + config.Config.Kafka.TLS.ClientKey, + config.Config.Kafka.TLS.CACrt, + []byte(config.Config.Kafka.TLS.ClientKeyPwd), + ) + } +} diff --git a/pkg/common/tls/tls.go b/pkg/common/tls/tls.go new file mode 100644 index 000000000..66d3a0e2b --- /dev/null +++ b/pkg/common/tls/tls.go @@ -0,0 +1,71 @@ +package tls + +import ( + "crypto/tls" + "crypto/x509" + "encoding/pem" + "errors" + "os" + + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" +) + + +func decryptPEM(data []byte, passphrase []byte) ([]byte, error) { + if len(passphrase) == 0 { + return data, nil + } + b, _ := pem.Decode(data) + d, err := x509.DecryptPEMBlock(b, passphrase) + if err != nil { + return nil, err + } + return pem.EncodeToMemory(&pem.Block{ + Type: b.Type, + Bytes: d, + }), nil +} + +func readEncryptablePEMBlock(path string, pwd []byte) ([]byte, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, err + } + return decryptPEM(data, pwd) +} + +// NewTLSConfig setup the TLS config from general config file. +func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byte) *tls.Config { + tlsConfig := tls.Config{} + + if clientCertFile != "" && clientKeyFile != "" { + certPEMBlock, err := os.ReadFile(clientCertFile) + if err != nil { + panic(err) + } + keyPEMBlock, err := readEncryptablePEMBlock(clientKeyFile, keyPwd) + if err != nil { + panic(err) + } + cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock) + if err != nil { + panic(err) + } + tlsConfig.Certificates = []tls.Certificate{cert} + } + + caCert, err := os.ReadFile(caCertFile) + if err != nil { + panic(err) + } + caCertPool := x509.NewCertPool() + ok := caCertPool.AppendCertsFromPEM(caCert) + if !ok { + panic(errors.New("not a valid CA cert")) + } + tlsConfig.RootCAs = caCertPool + + tlsConfig.InsecureSkipVerify = config.Config.Kafka.TLS.InsecureSkipVerify + + return &tlsConfig +} diff --git a/tools/component/component.go b/tools/component/component.go index 295ac44b1..be454f900 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -39,6 +39,7 @@ import ( "gorm.io/gorm" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka" "github.com/minio/minio-go/v7/pkg/credentials" ) @@ -274,6 +275,7 @@ func checkKafka() error { cfg.Net.SASL.User = config.Config.Kafka.Username cfg.Net.SASL.Password = config.Config.Kafka.Password } + kafka.SetupTLSConfig(cfg) kafkaClient, err := sarama.NewClient(config.Config.Kafka.Addr, cfg) if err != nil { return errs.Wrap(err)