From d1af343b136e8d4fa5edbb5e497acd0cd9455220 Mon Sep 17 00:00:00 2001 From: "fengyun.rui" Date: Fri, 10 Nov 2023 22:04:16 +0800 Subject: [PATCH] fix: add kafka compress type and producer ack params (#1310) Signed-off-by: rfyiamcool --- pkg/common/config/config.go | 10 ++++++---- pkg/common/kafka/producer.go | 19 ++++++++++++++++++- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index d7cecc616..261e3d8c0 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -85,10 +85,12 @@ type configStruct struct { } `yaml:"redis"` Kafka struct { - Username string `yaml:"username"` - Password string `yaml:"password"` - Addr []string `yaml:"addr"` - TLS *struct { + Username string `yaml:"username"` + Password string `yaml:"password"` + ProducerAck string `yaml:"producerAck"` + CompressType string `yaml:"compressType"` + Addr []string `yaml:"addr"` + TLS *struct { CACrt string `yaml:"caCrt"` ClientCrt string `yaml:"clientCrt"` ClientKey string `yaml:"clientKey"` diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 1766afa97..1dad33f9c 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -15,8 +15,10 @@ package kafka import ( + "bytes" "context" "errors" + "strings" "time" "github.com/OpenIMSDK/protocol/constant" @@ -49,8 +51,23 @@ func NewKafkaProducer(addr []string, topic string) *Producer { p.config = sarama.NewConfig() // Instantiate a sarama Config p.config.Producer.Return.Successes = true // Whether to enable the successes channel to be notified after the message is sent successfully p.config.Producer.Return.Errors = true - p.config.Producer.RequiredAcks = sarama.WaitForAll // Set producer Message Reply level 0 1 all p.config.Producer.Partitioner = sarama.NewHashPartitioner // Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly + + var producerAck = sarama.WaitForAll // default: WaitForAll + switch strings.ToLower(config.Config.Kafka.ProducerAck) { + case "no_response": + producerAck = sarama.NoResponse + case "wait_for_local": + producerAck = sarama.WaitForLocal + case "wait_for_all": + producerAck = sarama.WaitForAll + } + p.config.Producer.RequiredAcks = producerAck + + var compress = sarama.CompressionNone // default: no compress + _ = compress.UnmarshalText(bytes.ToLower([]byte(config.Config.Kafka.CompressType))) + p.config.Producer.Compression = compress + if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" { p.config.Net.SASL.Enable = true p.config.Net.SASL.User = config.Config.Kafka.Username