|
|
|
@ -31,15 +31,14 @@ import (
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
maxRetry = 10 // Maximum number of retries for producer creation
|
|
|
|
|
)
|
|
|
|
|
const maxRetry = 10 // number of retries
|
|
|
|
|
|
|
|
|
|
var errEmptyMsg = errors.New("binary msg is empty")
|
|
|
|
|
var errEmptyMsg = errors.New("kafka binary msg is empty")
|
|
|
|
|
|
|
|
|
|
// Producer represents a Kafka producer.
|
|
|
|
|
type Producer struct {
|
|
|
|
|
topic string
|
|
|
|
|
addr []string
|
|
|
|
|
topic string
|
|
|
|
|
config *sarama.Config
|
|
|
|
|
producer sarama.SyncProducer
|
|
|
|
|
}
|
|
|
|
@ -68,7 +67,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
|
|
|
|
|
// Get Kafka configuration from environment variables or fallback to config file
|
|
|
|
|
kafkaUsername := getEnvOrConfig("KAFKA_USERNAME", config.Config.Kafka.Username)
|
|
|
|
|
kafkaPassword := getEnvOrConfig("KAFKA_PASSWORD", config.Config.Kafka.Password)
|
|
|
|
|
kafkaAddr := getEnvOrConfig("KAFKA_ADDRESS", addr[0]) // Assuming addr[0] contains address from config
|
|
|
|
|
kafkaAddr := getKafkaAddrFromEnv(addr) // Updated to use the new function
|
|
|
|
|
|
|
|
|
|
// Configure SASL authentication if credentials are provided
|
|
|
|
|
if kafkaUsername != "" && kafkaPassword != "" {
|
|
|
|
@ -78,7 +77,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Set the Kafka address
|
|
|
|
|
p.addr = []string{kafkaAddr}
|
|
|
|
|
p.addr = kafkaAddr
|
|
|
|
|
|
|
|
|
|
// Set up TLS configuration (if required)
|
|
|
|
|
SetupTLSConfig(p.config)
|
|
|
|
|