From 459ea0120089c080d596b49afd2775065c6ca480 Mon Sep 17 00:00:00 2001 From: luhaoling <2198702716@qq.com> Date: Mon, 26 Feb 2024 18:03:25 +0800 Subject: [PATCH] fix: fix the TLS.CACrt is nil error --- .../msgtransfer/online_history_msg_handler.go | 17 +++++++++++------ .../msgtransfer/online_msg_to_mongo_handler.go | 15 +++++++++------ internal/push/push_handler.go | 15 +++++++++------ pkg/common/config/parse.go | 2 +- pkg/common/db/controller/msg.go | 16 ++++++++++------ pkg/common/kafka/consumer.go | 15 +++++++++------ 6 files changed, 49 insertions(+), 31 deletions(-) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 0d449a26c..1b57c33c9 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -101,13 +101,18 @@ func NewOnlineHistoryRedisConsumerHandler( och.conversationRpcClient = conversationRpcClient och.groupRpcClient = groupRpcClient var err error - tlsConfig := &kafka.TLSConfig{ - CACrt: config.Kafka.TLS.CACrt, - ClientCrt: config.Kafka.TLS.ClientCrt, - ClientKey: config.Kafka.TLS.ClientKey, - ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd, - InsecureSkipVerify: false, + + var tlsConfig *kafka.TLSConfig + if config.Kafka.TLS != nil { + tlsConfig = &kafka.TLSConfig{ + CACrt: config.Kafka.TLS.CACrt, + ClientCrt: config.Kafka.TLS.ClientCrt, + ClientKey: config.Kafka.TLS.ClientKey, + ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd, + InsecureSkipVerify: false, + } } + och.historyConsumerGroup, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index b9a91d30c..80b4b200b 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -35,12 +35,15 @@ type OnlineHistoryMongoConsumerHandler struct { } func NewOnlineHistoryMongoConsumerHandler(config *config.GlobalConfig, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) { - tlsConfig := &kfk.TLSConfig{ - CACrt: config.Kafka.TLS.CACrt, - ClientCrt: config.Kafka.TLS.ClientCrt, - ClientKey: config.Kafka.TLS.ClientKey, - ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd, - InsecureSkipVerify: false, + var tlsConfig *kfk.TLSConfig + if config.Kafka.TLS != nil { + tlsConfig = &kfk.TLSConfig{ + CACrt: config.Kafka.TLS.CACrt, + ClientCrt: config.Kafka.TLS.ClientCrt, + ClientKey: config.Kafka.TLS.ClientKey, + ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd, + InsecureSkipVerify: false, + } } historyConsumerGroup, err := kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{ KafkaVersion: sarama.V2_0_0_0, diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 2d7bb9f65..0f2c3606d 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -39,12 +39,15 @@ func NewConsumerHandler(config *config.GlobalConfig, pusher *Pusher) (*ConsumerH var consumerHandler ConsumerHandler consumerHandler.pusher = pusher var err error - tlsConfig := &kfk.TLSConfig{ - CACrt: config.Kafka.TLS.CACrt, - ClientCrt: config.Kafka.TLS.ClientCrt, - ClientKey: config.Kafka.TLS.ClientKey, - ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd, - InsecureSkipVerify: false, + var tlsConfig *kfk.TLSConfig + if config.Kafka.TLS != nil { + tlsConfig = &kfk.TLSConfig{ + CACrt: config.Kafka.TLS.CACrt, + ClientCrt: config.Kafka.TLS.ClientCrt, + ClientKey: config.Kafka.TLS.ClientKey, + ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd, + InsecureSkipVerify: false, + } } consumerHandler.pushConsumerGroup, err = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{ KafkaVersion: sarama.V2_0_0_0, diff --git a/pkg/common/config/parse.go b/pkg/common/config/parse.go index 1ed15e627..866982feb 100644 --- a/pkg/common/config/parse.go +++ b/pkg/common/config/parse.go @@ -120,5 +120,5 @@ func InitConfig(config *GlobalConfig, configFolderPath string) error { return err } - return initConfig(config.Notification, NotificationFileName, configFolderPath) + return initConfig(&config.Notification, NotificationFileName, configFolderPath) } diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 2716a716b..5bdb8c3da 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -133,12 +133,16 @@ func NewCommonMsgDatabase(msgDocModel unrelationtb.MsgDocModelInterface, cacheMo Username: config.Kafka.Username, Password: config.Kafka.Password, } - tlsConfig := &kafka.TLSConfig{ - CACrt: config.Kafka.TLS.CACrt, - ClientCrt: config.Kafka.TLS.ClientCrt, - ClientKey: config.Kafka.TLS.ClientKey, - ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd, - InsecureSkipVerify: config.Kafka.TLS.InsecureSkipVerify, + + var tlsConfig *kafka.TLSConfig + if config.Kafka.TLS != nil { + tlsConfig = &kafka.TLSConfig{ + CACrt: config.Kafka.TLS.CACrt, + ClientCrt: config.Kafka.TLS.ClientCrt, + ClientKey: config.Kafka.TLS.ClientKey, + ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd, + InsecureSkipVerify: false, + } } producerToRedis, err := kafka.NewKafkaProducer(config.Kafka.Addr, config.Kafka.LatestMsgToRedis.Topic, producerConfig, tlsConfig) if err != nil { diff --git a/pkg/common/kafka/consumer.go b/pkg/common/kafka/consumer.go index 49cf056b8..435b8b780 100644 --- a/pkg/common/kafka/consumer.go +++ b/pkg/common/kafka/consumer.go @@ -40,12 +40,15 @@ func NewKafkaConsumer(addr []string, topic string, config *config.GlobalConfig) consumerConfig.Net.SASL.User = config.Kafka.Username consumerConfig.Net.SASL.Password = config.Kafka.Password } - tlsConfig := &TLSConfig{ - CACrt: config.Kafka.TLS.CACrt, - ClientCrt: config.Kafka.TLS.ClientCrt, - ClientKey: config.Kafka.TLS.ClientKey, - ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd, - InsecureSkipVerify: config.Kafka.TLS.InsecureSkipVerify, + var tlsConfig *TLSConfig + if config.Kafka.TLS != nil { + tlsConfig = &TLSConfig{ + CACrt: config.Kafka.TLS.CACrt, + ClientCrt: config.Kafka.TLS.ClientCrt, + ClientKey: config.Kafka.TLS.ClientKey, + ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd, + InsecureSkipVerify: false, + } } SetupTLSConfig(consumerConfig, tlsConfig) consumer, err := sarama.NewConsumer(p.addr, consumerConfig)