fix: kafka producer add default compression codec.

pull/2100/head
Gordon 2 years ago
parent bf9b2edda9
commit fdfb75a66f

@ -18,7 +18,6 @@ import (
"bytes" "bytes"
"context" "context"
"errors" "errors"
"fmt"
"strings" "strings"
"time" "time"
@ -68,7 +67,10 @@ func NewKafkaProducer(addr []string, topic string, producerConfig *ProducerConfi
configureProducerAck(&p, producerConfig.ProducerAck) configureProducerAck(&p, producerConfig.ProducerAck)
// Configure message compression // Configure message compression
configureCompression(&p, producerConfig.CompressType) err := configureCompression(&p, producerConfig.CompressType)
if err != nil {
return nil, err
}
// Get Kafka configuration from environment variables or fallback to config file // Get Kafka configuration from environment variables or fallback to config file
kafkaUsername := getEnvOrConfig("KAFKA_USERNAME", producerConfig.Username) kafkaUsername := getEnvOrConfig("KAFKA_USERNAME", producerConfig.Username)
@ -89,7 +91,6 @@ func NewKafkaProducer(addr []string, topic string, producerConfig *ProducerConfi
SetupTLSConfig(p.config, tlsConfig) SetupTLSConfig(p.config, tlsConfig)
// Create the producer with retries // Create the producer with retries
var err error
for i := 0; i <= maxRetry; i++ { for i := 0; i <= maxRetry; i++ {
p.producer, err = sarama.NewSyncProducer(p.addr, p.config) p.producer, err = sarama.NewSyncProducer(p.addr, p.config)
if err == nil { if err == nil {
@ -120,14 +121,18 @@ func configureProducerAck(p *Producer, ackConfig string) {
} }
// configureCompression configures the message compression type for the producer. // configureCompression configures the message compression type for the producer.
func configureCompression(p *Producer, compressType string) { func configureCompression(p *Producer, compressType string) error {
var compress = sarama.CompressionNone var compress sarama.CompressionCodec
err := compress.UnmarshalText(bytes.ToLower([]byte(compressType))) if compressType == "" {
if err != nil { compress = sarama.CompressionNone
fmt.Printf("Failed to configure compression: %v\n", err) } else {
return err := compress.UnmarshalText(bytes.ToLower([]byte(compressType)))
if err != nil {
return errs.Wrap(err)
}
} }
p.config.Producer.Compression = compress p.config.Producer.Compression = compress
return nil
} }
// GetMQHeaderWithContext extracts message queue headers from the context. // GetMQHeaderWithContext extracts message queue headers from the context.

Loading…
Cancel
Save