diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 2841f7917..3b8ed5173 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -18,7 +18,6 @@ import ( "bytes" "context" "errors" - "fmt" "strings" "time" @@ -68,7 +67,10 @@ func NewKafkaProducer(addr []string, topic string, producerConfig *ProducerConfi configureProducerAck(&p, producerConfig.ProducerAck) // Configure message compression - configureCompression(&p, producerConfig.CompressType) + err := configureCompression(&p, producerConfig.CompressType) + if err != nil { + return nil, err + } // Get Kafka configuration from environment variables or fallback to config file kafkaUsername := getEnvOrConfig("KAFKA_USERNAME", producerConfig.Username) @@ -89,7 +91,6 @@ func NewKafkaProducer(addr []string, topic string, producerConfig *ProducerConfi SetupTLSConfig(p.config, tlsConfig) // Create the producer with retries - var err error for i := 0; i <= maxRetry; i++ { p.producer, err = sarama.NewSyncProducer(p.addr, p.config) if err == nil { @@ -120,14 +121,18 @@ func configureProducerAck(p *Producer, ackConfig string) { } // configureCompression configures the message compression type for the producer. -func configureCompression(p *Producer, compressType string) { - var compress = sarama.CompressionNone - err := compress.UnmarshalText(bytes.ToLower([]byte(compressType))) - if err != nil { - fmt.Printf("Failed to configure compression: %v\n", err) - return +func configureCompression(p *Producer, compressType string) error { + var compress sarama.CompressionCodec + if compressType == "" { + compress = sarama.CompressionNone + } else { + err := compress.UnmarshalText(bytes.ToLower([]byte(compressType))) + if err != nil { + return errs.Wrap(err) + } } p.config.Producer.Compression = compress + return nil } // GetMQHeaderWithContext extracts message queue headers from the context.