feat: kafka

pull/2100/head
withchao 2 years ago
parent cf635918ae
commit de4f90c763

@ -16,7 +16,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/localcache v0.0.1
github.com/openimsdk/protocol v0.0.58-google
github.com/openimsdk/tools v0.0.46-alpha.14
github.com/openimsdk/tools v0.0.46-alpha.16
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/sirupsen/logrus v1.9.3 // indirect
@ -32,7 +32,7 @@ require (
require github.com/google/uuid v1.6.0
require (
github.com/IBM/sarama v1.42.2
github.com/IBM/sarama v1.43.0
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
github.com/go-redis/redis v6.15.9+incompatible
github.com/redis/go-redis/v9 v9.4.0
@ -61,7 +61,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/eapache/go-resiliency v1.5.0 // indirect
github.com/eapache/go-resiliency v1.6.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/emirpasic/gods v1.12.0 // indirect
@ -96,7 +96,7 @@ require (
github.com/jinzhu/now v1.1.5 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd // indirect
github.com/klauspost/compress v1.17.4 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/lithammer/shortuuid v3.0.0+incompatible // indirect

@ -16,8 +16,8 @@ cloud.google.com/go/storage v1.36.0/go.mod h1:M6M/3V/D3KpzMTJyPOR/HU6n2Si5QdaXYE
firebase.google.com/go v3.13.0+incompatible h1:3TdYC3DDi6aHn20qoRkxwGqNgdjtblwVAyRLQwGn/+4=
firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/IBM/sarama v1.42.2 h1:VoY4hVIZ+WQJ8G9KNY/SQlWguBQXQ9uvFPOnrcu8hEw=
github.com/IBM/sarama v1.42.2/go.mod h1:FLPGUGwYqEs62hq2bVG6Io2+5n+pS6s/WOXVKWSLFtE=
github.com/IBM/sarama v1.43.0 h1:YFFDn8mMI2QL0wOrG0J2sFoVIAFl7hS9JQi2YZsXtJc=
github.com/IBM/sarama v1.43.0/go.mod h1:zlE6HEbC/SMQ9mhEYaF7nNLYOUyrs0obySKCckWP9BM=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs=
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs=
@ -61,8 +61,8 @@ github.com/dtm-labs/rockscache v0.1.1 h1:6S1vgaHvGqrLd8Ka4hRTKeKPV7v+tT0MSkTIX81
github.com/dtm-labs/rockscache v0.1.1/go.mod h1:c76WX0kyIibmQ2ACxUXvDvaLykoPakivMqIxt+UzE7A=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/eapache/go-resiliency v1.5.0 h1:dRsaR00whmQD+SgVKlq/vCRFNgtEb5yppyeVos3Yce0=
github.com/eapache/go-resiliency v1.5.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-resiliency v1.6.0 h1:CqGDTLtpwuWKn6Nj3uNUdflaq+/kIPsg0gfNzHton30=
github.com/eapache/go-resiliency v1.6.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
@ -207,8 +207,8 @@ github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd h1:Coekwdh0v
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg=
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
@ -270,8 +270,8 @@ github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
github.com/openimsdk/protocol v0.0.58-google h1:cGNUVaXO9LqcFgIb4NvrtEOrv0spGecoQKyN8YWhyZs=
github.com/openimsdk/protocol v0.0.58-google/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.46-alpha.14 h1:9znmFmI9sQn5mNLUXhtf84lVL/KpnQNoxBi6yiM1vZM=
github.com/openimsdk/tools v0.0.46-alpha.14/go.mod h1:NAHPJyNUJm0n0WaZfIRC5s6Np+timv+xKIn5I8SKYaM=
github.com/openimsdk/tools v0.0.46-alpha.16 h1:4ouPoTrCuyREF1UPBaka+Oge4x0XsICfNMoGxJuziKU=
github.com/openimsdk/tools v0.0.46-alpha.16/go.mod h1:hMH6pHDVhOXjA8NQ25P7mOtfRXb5lsPAv/uUDR8342Y=
github.com/pelletier/go-buffruneio v0.2.0/go.mod h1:JkE26KsDizTr40EUHkXVtNPvgGtbSNq5BcowyYOWdKo=
github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4=

@ -81,6 +81,10 @@ type OnlineHistoryRedisConsumerHandler struct {
}
func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) {
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToRedis, []string{kafkaConf.LatestMsgToRedis.Topic})
if err != nil {
return nil, err
}
var och OnlineHistoryRedisConsumerHandler
och.msgDatabase = database
och.msgDistributionCh = make(chan Cmd2Value) // no buffer channel
@ -89,35 +93,9 @@ func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database cont
och.chArrays[i] = make(chan Cmd2Value, 50)
go och.Run(i)
}
och.conversationRpcClient = conversationRpcClient
och.groupRpcClient = groupRpcClient
var err error
var tlsConfig *kafka.TLSConfig
if kafkaConf.TLS != nil {
tlsConfig = &kafka.TLSConfig{
CACrt: kafkaConf.TLS.CACrt,
ClientCrt: kafkaConf.TLS.ClientCrt,
ClientKey: kafkaConf.TLS.ClientKey,
ClientKeyPwd: kafkaConf.TLS.ClientKeyPwd,
InsecureSkipVerify: false,
}
}
och.historyConsumerGroup, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest,
IsReturnErr: false,
UserName: kafkaConf.Username,
Password: kafkaConf.Password,
}, []string{kafkaConf.LatestMsgToRedis.Topic},
kafkaConf.Addr,
kafkaConf.ConsumerGroupID.MsgToRedis,
tlsConfig,
)
// statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d
// second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
och.historyConsumerGroup = historyConsumerGroup
return &och, err
}

@ -33,27 +33,7 @@ type OnlineHistoryMongoConsumerHandler struct {
}
func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) {
var tlsConfig *kfk.TLSConfig
if kafkaConf.TLS != nil {
tlsConfig = &kfk.TLSConfig{
CACrt: kafkaConf.TLS.CACrt,
ClientCrt: kafkaConf.TLS.ClientCrt,
ClientKey: kafkaConf.TLS.ClientKey,
ClientKeyPwd: kafkaConf.TLS.ClientKeyPwd,
InsecureSkipVerify: false,
}
}
historyConsumerGroup, err := kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest,
IsReturnErr: false,
UserName: kafkaConf.Username,
Password: kafkaConf.Password,
}, []string{kafkaConf.MsgToMongo.Topic},
kafkaConf.Addr,
kafkaConf.ConsumerGroupID.MsgToMongo,
tlsConfig,
)
historyConsumerGroup, err := kfk.NewMConsumerGroup(kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToMongo, []string{kafkaConf.MsgToMongo.Topic})
if err != nil {
return nil, err
}

@ -34,32 +34,14 @@ type ConsumerHandler struct {
}
func NewConsumerHandler(kafkaConf *config.Kafka, pusher *Pusher) (*ConsumerHandler, error) {
var consumerHandler ConsumerHandler
consumerHandler.pusher = pusher
var err error
var tlsConfig *kfk.TLSConfig
if kafkaConf.TLS != nil {
tlsConfig = &kfk.TLSConfig{
CACrt: kafkaConf.TLS.CACrt,
ClientCrt: kafkaConf.TLS.ClientCrt,
ClientKey: kafkaConf.TLS.ClientKey,
ClientKeyPwd: kafkaConf.TLS.ClientKeyPwd,
InsecureSkipVerify: false,
}
}
consumerHandler.pushConsumerGroup, err = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest,
IsReturnErr: false,
UserName: kafkaConf.Username,
Password: kafkaConf.Password,
}, []string{kafkaConf.MsgToPush.Topic}, kafkaConf.Addr,
kafkaConf.ConsumerGroupID.MsgToPush,
tlsConfig)
pushConsumerGroup, err := kfk.NewMConsumerGroup(kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToPush, []string{kafkaConf.MsgToPush.Topic})
if err != nil {
return nil, err
}
return &consumerHandler, nil
return &ConsumerHandler{
pushConsumerGroup: pushConsumerGroup,
pusher: pusher,
}, nil
}
func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {

@ -16,6 +16,7 @@ package config
import (
"bytes"
"github.com/openimsdk/tools/db/kafka"
"gopkg.in/yaml.v3"
"time"
@ -82,18 +83,7 @@ type Redis struct {
}
type Kafka struct {
Username string `yaml:"username"`
Password string `yaml:"password"`
ProducerAck string `yaml:"producerAck"`
CompressType string `yaml:"compressType"`
Addr []string `yaml:"addr"`
TLS *struct {
CACrt string `yaml:"caCrt"`
ClientCrt string `yaml:"clientCrt"`
ClientKey string `yaml:"clientKey"`
ClientKeyPwd string `yaml:"clientKeyPwd"`
InsecureSkipVerify bool `yaml:"insecureSkipVerify"`
} `yaml:"tls"`
kafka.Config
LatestMsgToRedis struct {
Topic string `yaml:"topic"`
} `yaml:"latestMsgToRedis"`

@ -106,32 +106,19 @@ type CommonMsgDatabase interface {
}
func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, cacheModel cache.MsgModel, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
producerConfig := &kafka.ProducerConfig{
ProducerAck: kafkaConf.ProducerAck,
CompressType: kafkaConf.CompressType,
Username: kafkaConf.Username,
Password: kafkaConf.Password,
}
var tlsConfig *kafka.TLSConfig
if kafkaConf.TLS != nil {
tlsConfig = &kafka.TLSConfig{
CACrt: kafkaConf.TLS.CACrt,
ClientCrt: kafkaConf.TLS.ClientCrt,
ClientKey: kafkaConf.TLS.ClientKey,
ClientKeyPwd: kafkaConf.TLS.ClientKeyPwd,
InsecureSkipVerify: false,
}
conf, err := kafka.BuildProducerConfig(kafkaConf.Config)
if err != nil {
return nil, err
}
producerToRedis, err := kafka.NewKafkaProducer(kafkaConf.Addr, kafkaConf.LatestMsgToRedis.Topic, producerConfig, tlsConfig)
producerToRedis, err := kafka.NewKafkaProducer(conf, kafkaConf.Config.Addr, kafkaConf.LatestMsgToRedis.Topic)
if err != nil {
return nil, err
}
producerToMongo, err := kafka.NewKafkaProducer(kafkaConf.Addr, kafkaConf.MsgToMongo.Topic, producerConfig, tlsConfig)
producerToMongo, err := kafka.NewKafkaProducer(conf, kafkaConf.Config.Addr, kafkaConf.MsgToMongo.Topic)
if err != nil {
return nil, err
}
producerToPush, err := kafka.NewKafkaProducer(kafkaConf.Addr, kafkaConf.MsgToPush.Topic, producerConfig, tlsConfig)
producerToPush, err := kafka.NewKafkaProducer(conf, kafkaConf.Config.Addr, kafkaConf.MsgToPush.Topic)
if err != nil {
return nil, err
}

@ -1,71 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafka
import (
"sync"
"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/errs"
)
type Consumer struct {
addr []string
WG sync.WaitGroup
Topic string
PartitionList []int32
Consumer sarama.Consumer
}
func NewKafkaConsumer(addr []string, topic string, config *config.GlobalConfig) (*Consumer, error) {
p := Consumer{}
p.Topic = topic
p.addr = addr
consumerConfig := sarama.NewConfig()
if config.Kafka.Username != "" && config.Kafka.Password != "" {
consumerConfig.Net.SASL.Enable = true
consumerConfig.Net.SASL.User = config.Kafka.Username
consumerConfig.Net.SASL.Password = config.Kafka.Password
}
var tlsConfig *TLSConfig
if config.Kafka.TLS != nil {
tlsConfig = &TLSConfig{
CACrt: config.Kafka.TLS.CACrt,
ClientCrt: config.Kafka.TLS.ClientCrt,
ClientKey: config.Kafka.TLS.ClientKey,
ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd,
InsecureSkipVerify: false,
}
}
err := SetupTLSConfig(consumerConfig, tlsConfig)
if err != nil {
return nil, err
}
consumer, err := sarama.NewConsumer(p.addr, consumerConfig)
if err != nil {
return nil, errs.WrapMsg(err, "NewKafkaConsumer: creating consumer failed")
}
p.Consumer = consumer
partitionList, err := consumer.Partitions(p.Topic)
if err != nil {
return nil, errs.WrapMsg(err, "NewKafkaConsumer: getting partitions failed")
}
p.PartitionList = partitionList
return &p, nil
}

@ -17,10 +17,8 @@ package kafka
import (
"context"
"errors"
"strings"
"github.com/IBM/sarama"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/db/kafka"
"github.com/openimsdk/tools/log"
)
@ -38,27 +36,19 @@ type MConsumerGroupConfig struct {
Password string
}
func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string, tlsConfig *TLSConfig) (*MConsumerGroup, error) {
consumerGroupConfig := sarama.NewConfig()
consumerGroupConfig.Version = consumerConfig.KafkaVersion
consumerGroupConfig.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial
consumerGroupConfig.Consumer.Return.Errors = consumerConfig.IsReturnErr
if consumerConfig.UserName != "" && consumerConfig.Password != "" {
consumerGroupConfig.Net.SASL.Enable = true
consumerGroupConfig.Net.SASL.User = consumerConfig.UserName
consumerGroupConfig.Net.SASL.Password = consumerConfig.Password
func NewMConsumerGroup(conf kafka.Config, groupID string, topics []string) (*MConsumerGroup, error) {
kfk, err := kafka.BuildConsumerGroupConfig(conf, sarama.OffsetNewest)
if err != nil {
return nil, err
}
SetupTLSConfig(consumerGroupConfig, tlsConfig)
consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, consumerGroupConfig)
group, err := kafka.NewConsumerGroup(kfk, conf.Addr, groupID)
if err != nil {
return nil, errs.WrapMsg(err, strings.Join(topics, ","), strings.Join(addrs, ","), groupID, consumerConfig.UserName, consumerConfig.Password)
return nil, err
}
return &MConsumerGroup{
consumerGroup,
groupID,
topics,
ConsumerGroup: group,
groupID: groupID,
topics: topics,
}, nil
}

@ -1,15 +0,0 @@
// Copyright © 2024 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafka // import "github.com/openimsdk/open-im-server/v3/pkg/common/kafka"

@ -15,14 +15,11 @@
package kafka
import (
"bytes"
"context"
"errors"
"strings"
"time"
"github.com/IBM/sarama"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/db/kafka"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
@ -48,91 +45,21 @@ type ProducerConfig struct {
Password string
}
// NewKafkaProducer initializes a new Kafka producer.
func NewKafkaProducer(addr []string, topic string, producerConfig *ProducerConfig, tlsConfig *TLSConfig) (*Producer, error) {
p := Producer{
addr: addr,
topic: topic,
config: sarama.NewConfig(),
}
// Set producer return flags
p.config.Producer.Return.Successes = true
p.config.Producer.Return.Errors = true
// Set partitioner strategy
p.config.Producer.Partitioner = sarama.NewHashPartitioner
// Configure producer acknowledgement level
configureProducerAck(&p, producerConfig.ProducerAck)
func BuildProducerConfig(conf kafka.Config) (*sarama.Config, error) {
return kafka.BuildProducerConfig(conf)
}
// Configure message compression
err := configureCompression(&p, producerConfig.CompressType)
func NewKafkaProducer(kfk *sarama.Config, addr []string, topic string) (*Producer, error) {
producer, err := kafka.NewProducer(kfk, addr)
if err != nil {
return nil, err
}
// Get Kafka configuration from environment variables or fallback to config file
kafkaUsername := getEnvOrConfig("KAFKA_USERNAME", producerConfig.Username)
kafkaPassword := getEnvOrConfig("KAFKA_PASSWORD", producerConfig.Password)
kafkaAddr := getKafkaAddrFromEnv(addr) // Updated to use the new function
// Configure SASL authentication if credentials are provided
if kafkaUsername != "" && kafkaPassword != "" {
p.config.Net.SASL.Enable = true
p.config.Net.SASL.User = kafkaUsername
p.config.Net.SASL.Password = kafkaPassword
}
// Set the Kafka address
p.addr = kafkaAddr
// Set up TLS configuration (if required)
SetupTLSConfig(p.config, tlsConfig)
// Create the producer with retries
for i := 0; i <= maxRetry; i++ {
p.producer, err = sarama.NewSyncProducer(p.addr, p.config)
if err == nil {
return &p, errs.Wrap(err)
}
time.Sleep(1 * time.Second) // Wait before retrying
}
// Panic if unable to create producer after retries
if err != nil {
return nil, errs.Wrap(errors.New("failed to create Kafka producer: " + err.Error()))
}
return &p, nil
}
// configureProducerAck configures the producer's acknowledgement level.
func configureProducerAck(p *Producer, ackConfig string) {
switch strings.ToLower(ackConfig) {
case "no_response":
p.config.Producer.RequiredAcks = sarama.NoResponse
case "wait_for_local":
p.config.Producer.RequiredAcks = sarama.WaitForLocal
case "wait_for_all":
p.config.Producer.RequiredAcks = sarama.WaitForAll
default:
p.config.Producer.RequiredAcks = sarama.WaitForAll
}
}
// configureCompression configures the message compression type for the producer.
func configureCompression(p *Producer, compressType string) error {
var compress sarama.CompressionCodec
if compressType == "" {
compress = sarama.CompressionNone
} else {
err := compress.UnmarshalText(bytes.ToLower([]byte(compressType)))
if err != nil {
return errs.Wrap(err)
}
}
p.config.Producer.Compression = compress
return nil
return &Producer{
addr: addr,
topic: topic,
config: kfk,
producer: producer,
}, nil
}
// GetMQHeaderWithContext extracts message queue headers from the context.

@ -18,7 +18,6 @@ import (
"errors"
"flag"
"fmt"
"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/pkg/common/kafka"
"gopkg.in/yaml.v2"
"os"
@ -244,45 +243,36 @@ func checkKafka(config *config.GlobalConfig) error {
}
}
var tlsConfig *kafka.TLSConfig
if config.Kafka.TLS != nil {
tlsConfig = &kafka.TLSConfig{
CACrt: config.Kafka.TLS.CACrt,
ClientCrt: config.Kafka.TLS.ClientCrt,
ClientKey: config.Kafka.TLS.ClientKey,
ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd,
InsecureSkipVerify: config.Kafka.TLS.InsecureSkipVerify,
}
}
_, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest,
IsReturnErr: false,
UserName: config.Kafka.Username,
Password: config.Kafka.Password,
}, []string{config.Kafka.LatestMsgToRedis.Topic},
config.Kafka.Addr, config.Kafka.ConsumerGroupID.MsgToRedis, tlsConfig)
if err != nil {
return err
type Item struct {
Topic string
GroupID string
}
_, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false,
}, []string{config.Kafka.MsgToMongo.Topic},
config.Kafka.Addr, config.Kafka.ConsumerGroupID.MsgToMongo, tlsConfig)
if err != nil {
return err
items := []Item{
{
Topic: config.Kafka.LatestMsgToRedis.Topic,
GroupID: config.Kafka.ConsumerGroupID.MsgToRedis,
},
{
Topic: config.Kafka.MsgToMongo.Topic,
GroupID: config.Kafka.ConsumerGroupID.MsgToMongo,
},
{
Topic: config.Kafka.MsgToPush.Topic,
GroupID: config.Kafka.ConsumerGroupID.MsgToPush,
},
}
_, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false,
}, []string{config.Kafka.MsgToPush.Topic}, config.Kafka.Addr,
config.Kafka.ConsumerGroupID.MsgToPush, tlsConfig)
if err != nil {
return err
for _, item := range items {
cg, err := kafka.NewMConsumerGroup(config.Kafka.Config, item.GroupID, []string{item.Topic})
if err != nil {
return err
}
if err := cg.Close(); err != nil {
return err
}
}
return nil

Loading…
Cancel
Save