diff --git a/go.mod b/go.mod index 1019a8a07..a282f6fa7 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/localcache v0.0.1 github.com/openimsdk/protocol v0.0.58-google - github.com/openimsdk/tools v0.0.46-alpha.14 + github.com/openimsdk/tools v0.0.46-alpha.16 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/sirupsen/logrus v1.9.3 // indirect @@ -32,7 +32,7 @@ require ( require github.com/google/uuid v1.6.0 require ( - github.com/IBM/sarama v1.42.2 + github.com/IBM/sarama v1.43.0 github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible github.com/go-redis/redis v6.15.9+incompatible github.com/redis/go-redis/v9 v9.4.0 @@ -61,7 +61,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.1 // indirect - github.com/eapache/go-resiliency v1.5.0 // indirect + github.com/eapache/go-resiliency v1.6.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/emirpasic/gods v1.12.0 // indirect @@ -96,7 +96,7 @@ require ( github.com/jinzhu/now v1.1.5 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd // indirect - github.com/klauspost/compress v1.17.4 // indirect + github.com/klauspost/compress v1.17.7 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/lithammer/shortuuid v3.0.0+incompatible // indirect diff --git a/go.sum b/go.sum index 3985aad83..57ea6df3c 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,8 @@ cloud.google.com/go/storage v1.36.0/go.mod h1:M6M/3V/D3KpzMTJyPOR/HU6n2Si5QdaXYE firebase.google.com/go v3.13.0+incompatible h1:3TdYC3DDi6aHn20qoRkxwGqNgdjtblwVAyRLQwGn/+4= firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/IBM/sarama v1.42.2 h1:VoY4hVIZ+WQJ8G9KNY/SQlWguBQXQ9uvFPOnrcu8hEw= -github.com/IBM/sarama v1.42.2/go.mod h1:FLPGUGwYqEs62hq2bVG6Io2+5n+pS6s/WOXVKWSLFtE= +github.com/IBM/sarama v1.43.0 h1:YFFDn8mMI2QL0wOrG0J2sFoVIAFl7hS9JQi2YZsXtJc= +github.com/IBM/sarama v1.43.0/go.mod h1:zlE6HEbC/SMQ9mhEYaF7nNLYOUyrs0obySKCckWP9BM= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs= @@ -61,8 +61,8 @@ github.com/dtm-labs/rockscache v0.1.1 h1:6S1vgaHvGqrLd8Ka4hRTKeKPV7v+tT0MSkTIX81 github.com/dtm-labs/rockscache v0.1.1/go.mod h1:c76WX0kyIibmQ2ACxUXvDvaLykoPakivMqIxt+UzE7A= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= -github.com/eapache/go-resiliency v1.5.0 h1:dRsaR00whmQD+SgVKlq/vCRFNgtEb5yppyeVos3Yce0= -github.com/eapache/go-resiliency v1.5.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-resiliency v1.6.0 h1:CqGDTLtpwuWKn6Nj3uNUdflaq+/kIPsg0gfNzHton30= +github.com/eapache/go-resiliency v1.6.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= @@ -207,8 +207,8 @@ github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd h1:Coekwdh0v github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= -github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= +github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= +github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= @@ -270,8 +270,8 @@ github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= github.com/openimsdk/protocol v0.0.58-google h1:cGNUVaXO9LqcFgIb4NvrtEOrv0spGecoQKyN8YWhyZs= github.com/openimsdk/protocol v0.0.58-google/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.46-alpha.14 h1:9znmFmI9sQn5mNLUXhtf84lVL/KpnQNoxBi6yiM1vZM= -github.com/openimsdk/tools v0.0.46-alpha.14/go.mod h1:NAHPJyNUJm0n0WaZfIRC5s6Np+timv+xKIn5I8SKYaM= +github.com/openimsdk/tools v0.0.46-alpha.16 h1:4ouPoTrCuyREF1UPBaka+Oge4x0XsICfNMoGxJuziKU= +github.com/openimsdk/tools v0.0.46-alpha.16/go.mod h1:hMH6pHDVhOXjA8NQ25P7mOtfRXb5lsPAv/uUDR8342Y= github.com/pelletier/go-buffruneio v0.2.0/go.mod h1:JkE26KsDizTr40EUHkXVtNPvgGtbSNq5BcowyYOWdKo= github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index b956b0399..a2c7ba05d 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -81,6 +81,10 @@ type OnlineHistoryRedisConsumerHandler struct { } func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) { + historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToRedis, []string{kafkaConf.LatestMsgToRedis.Topic}) + if err != nil { + return nil, err + } var och OnlineHistoryRedisConsumerHandler och.msgDatabase = database och.msgDistributionCh = make(chan Cmd2Value) // no buffer channel @@ -89,35 +93,9 @@ func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database cont och.chArrays[i] = make(chan Cmd2Value, 50) go och.Run(i) } - och.conversationRpcClient = conversationRpcClient och.groupRpcClient = groupRpcClient - var err error - - var tlsConfig *kafka.TLSConfig - if kafkaConf.TLS != nil { - tlsConfig = &kafka.TLSConfig{ - CACrt: kafkaConf.TLS.CACrt, - ClientCrt: kafkaConf.TLS.ClientCrt, - ClientKey: kafkaConf.TLS.ClientKey, - ClientKeyPwd: kafkaConf.TLS.ClientKeyPwd, - InsecureSkipVerify: false, - } - } - - och.historyConsumerGroup, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ - KafkaVersion: sarama.V2_0_0_0, - OffsetsInitial: sarama.OffsetNewest, - IsReturnErr: false, - UserName: kafkaConf.Username, - Password: kafkaConf.Password, - }, []string{kafkaConf.LatestMsgToRedis.Topic}, - kafkaConf.Addr, - kafkaConf.ConsumerGroupID.MsgToRedis, - tlsConfig, - ) - // statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d - // second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) + och.historyConsumerGroup = historyConsumerGroup return &och, err } diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index fb4486f99..39a6a4906 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -33,27 +33,7 @@ type OnlineHistoryMongoConsumerHandler struct { } func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) { - var tlsConfig *kfk.TLSConfig - if kafkaConf.TLS != nil { - tlsConfig = &kfk.TLSConfig{ - CACrt: kafkaConf.TLS.CACrt, - ClientCrt: kafkaConf.TLS.ClientCrt, - ClientKey: kafkaConf.TLS.ClientKey, - ClientKeyPwd: kafkaConf.TLS.ClientKeyPwd, - InsecureSkipVerify: false, - } - } - historyConsumerGroup, err := kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{ - KafkaVersion: sarama.V2_0_0_0, - OffsetsInitial: sarama.OffsetNewest, - IsReturnErr: false, - UserName: kafkaConf.Username, - Password: kafkaConf.Password, - }, []string{kafkaConf.MsgToMongo.Topic}, - kafkaConf.Addr, - kafkaConf.ConsumerGroupID.MsgToMongo, - tlsConfig, - ) + historyConsumerGroup, err := kfk.NewMConsumerGroup(kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToMongo, []string{kafkaConf.MsgToMongo.Topic}) if err != nil { return nil, err } diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 702c257db..ef666d1fb 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -34,32 +34,14 @@ type ConsumerHandler struct { } func NewConsumerHandler(kafkaConf *config.Kafka, pusher *Pusher) (*ConsumerHandler, error) { - var consumerHandler ConsumerHandler - consumerHandler.pusher = pusher - var err error - var tlsConfig *kfk.TLSConfig - if kafkaConf.TLS != nil { - tlsConfig = &kfk.TLSConfig{ - CACrt: kafkaConf.TLS.CACrt, - ClientCrt: kafkaConf.TLS.ClientCrt, - ClientKey: kafkaConf.TLS.ClientKey, - ClientKeyPwd: kafkaConf.TLS.ClientKeyPwd, - InsecureSkipVerify: false, - } - } - consumerHandler.pushConsumerGroup, err = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{ - KafkaVersion: sarama.V2_0_0_0, - OffsetsInitial: sarama.OffsetNewest, - IsReturnErr: false, - UserName: kafkaConf.Username, - Password: kafkaConf.Password, - }, []string{kafkaConf.MsgToPush.Topic}, kafkaConf.Addr, - kafkaConf.ConsumerGroupID.MsgToPush, - tlsConfig) + pushConsumerGroup, err := kfk.NewMConsumerGroup(kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToPush, []string{kafkaConf.MsgToPush.Topic}) if err != nil { return nil, err } - return &consumerHandler, nil + return &ConsumerHandler{ + pushConsumerGroup: pushConsumerGroup, + pusher: pusher, + }, nil } func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 33ff554e8..42aea0c5f 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -16,6 +16,7 @@ package config import ( "bytes" + "github.com/openimsdk/tools/db/kafka" "gopkg.in/yaml.v3" "time" @@ -82,18 +83,7 @@ type Redis struct { } type Kafka struct { - Username string `yaml:"username"` - Password string `yaml:"password"` - ProducerAck string `yaml:"producerAck"` - CompressType string `yaml:"compressType"` - Addr []string `yaml:"addr"` - TLS *struct { - CACrt string `yaml:"caCrt"` - ClientCrt string `yaml:"clientCrt"` - ClientKey string `yaml:"clientKey"` - ClientKeyPwd string `yaml:"clientKeyPwd"` - InsecureSkipVerify bool `yaml:"insecureSkipVerify"` - } `yaml:"tls"` + kafka.Config LatestMsgToRedis struct { Topic string `yaml:"topic"` } `yaml:"latestMsgToRedis"` diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 9fefcaafb..485663c64 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -106,32 +106,19 @@ type CommonMsgDatabase interface { } func NewCommonMsgDatabase(msgDocModel relation.MsgDocModelInterface, cacheModel cache.MsgModel, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { - producerConfig := &kafka.ProducerConfig{ - ProducerAck: kafkaConf.ProducerAck, - CompressType: kafkaConf.CompressType, - Username: kafkaConf.Username, - Password: kafkaConf.Password, - } - - var tlsConfig *kafka.TLSConfig - if kafkaConf.TLS != nil { - tlsConfig = &kafka.TLSConfig{ - CACrt: kafkaConf.TLS.CACrt, - ClientCrt: kafkaConf.TLS.ClientCrt, - ClientKey: kafkaConf.TLS.ClientKey, - ClientKeyPwd: kafkaConf.TLS.ClientKeyPwd, - InsecureSkipVerify: false, - } + conf, err := kafka.BuildProducerConfig(kafkaConf.Config) + if err != nil { + return nil, err } - producerToRedis, err := kafka.NewKafkaProducer(kafkaConf.Addr, kafkaConf.LatestMsgToRedis.Topic, producerConfig, tlsConfig) + producerToRedis, err := kafka.NewKafkaProducer(conf, kafkaConf.Config.Addr, kafkaConf.LatestMsgToRedis.Topic) if err != nil { return nil, err } - producerToMongo, err := kafka.NewKafkaProducer(kafkaConf.Addr, kafkaConf.MsgToMongo.Topic, producerConfig, tlsConfig) + producerToMongo, err := kafka.NewKafkaProducer(conf, kafkaConf.Config.Addr, kafkaConf.MsgToMongo.Topic) if err != nil { return nil, err } - producerToPush, err := kafka.NewKafkaProducer(kafkaConf.Addr, kafkaConf.MsgToPush.Topic, producerConfig, tlsConfig) + producerToPush, err := kafka.NewKafkaProducer(conf, kafkaConf.Config.Addr, kafkaConf.MsgToPush.Topic) if err != nil { return nil, err } diff --git a/pkg/common/kafka/consumer.go b/pkg/common/kafka/consumer.go deleted file mode 100644 index 90d386911..000000000 --- a/pkg/common/kafka/consumer.go +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kafka - -import ( - "sync" - - "github.com/IBM/sarama" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/tools/errs" -) - -type Consumer struct { - addr []string - WG sync.WaitGroup - Topic string - PartitionList []int32 - Consumer sarama.Consumer -} - -func NewKafkaConsumer(addr []string, topic string, config *config.GlobalConfig) (*Consumer, error) { - p := Consumer{} - p.Topic = topic - p.addr = addr - consumerConfig := sarama.NewConfig() - if config.Kafka.Username != "" && config.Kafka.Password != "" { - consumerConfig.Net.SASL.Enable = true - consumerConfig.Net.SASL.User = config.Kafka.Username - consumerConfig.Net.SASL.Password = config.Kafka.Password - } - var tlsConfig *TLSConfig - if config.Kafka.TLS != nil { - tlsConfig = &TLSConfig{ - CACrt: config.Kafka.TLS.CACrt, - ClientCrt: config.Kafka.TLS.ClientCrt, - ClientKey: config.Kafka.TLS.ClientKey, - ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd, - InsecureSkipVerify: false, - } - } - err := SetupTLSConfig(consumerConfig, tlsConfig) - if err != nil { - return nil, err - } - consumer, err := sarama.NewConsumer(p.addr, consumerConfig) - if err != nil { - return nil, errs.WrapMsg(err, "NewKafkaConsumer: creating consumer failed") - } - p.Consumer = consumer - - partitionList, err := consumer.Partitions(p.Topic) - if err != nil { - return nil, errs.WrapMsg(err, "NewKafkaConsumer: getting partitions failed") - } - p.PartitionList = partitionList - - return &p, nil - -} diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index ce8d65ab9..3741c95d4 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -17,10 +17,8 @@ package kafka import ( "context" "errors" - "strings" - "github.com/IBM/sarama" - "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/db/kafka" "github.com/openimsdk/tools/log" ) @@ -38,27 +36,19 @@ type MConsumerGroupConfig struct { Password string } -func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string, tlsConfig *TLSConfig) (*MConsumerGroup, error) { - consumerGroupConfig := sarama.NewConfig() - consumerGroupConfig.Version = consumerConfig.KafkaVersion - consumerGroupConfig.Consumer.Offsets.Initial = consumerConfig.OffsetsInitial - consumerGroupConfig.Consumer.Return.Errors = consumerConfig.IsReturnErr - if consumerConfig.UserName != "" && consumerConfig.Password != "" { - consumerGroupConfig.Net.SASL.Enable = true - consumerGroupConfig.Net.SASL.User = consumerConfig.UserName - consumerGroupConfig.Net.SASL.Password = consumerConfig.Password +func NewMConsumerGroup(conf kafka.Config, groupID string, topics []string) (*MConsumerGroup, error) { + kfk, err := kafka.BuildConsumerGroupConfig(conf, sarama.OffsetNewest) + if err != nil { + return nil, err } - - SetupTLSConfig(consumerGroupConfig, tlsConfig) - consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, consumerGroupConfig) + group, err := kafka.NewConsumerGroup(kfk, conf.Addr, groupID) if err != nil { - return nil, errs.WrapMsg(err, strings.Join(topics, ","), strings.Join(addrs, ","), groupID, consumerConfig.UserName, consumerConfig.Password) + return nil, err } - return &MConsumerGroup{ - consumerGroup, - groupID, - topics, + ConsumerGroup: group, + groupID: groupID, + topics: topics, }, nil } diff --git a/pkg/common/kafka/doc.go b/pkg/common/kafka/doc.go deleted file mode 100644 index 880be4f6a..000000000 --- a/pkg/common/kafka/doc.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright © 2024 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kafka // import "github.com/openimsdk/open-im-server/v3/pkg/common/kafka" diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 3b8ed5173..42bb14b4e 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -15,14 +15,11 @@ package kafka import ( - "bytes" "context" "errors" - "strings" - "time" - "github.com/IBM/sarama" "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/tools/db/kafka" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" @@ -48,91 +45,21 @@ type ProducerConfig struct { Password string } -// NewKafkaProducer initializes a new Kafka producer. -func NewKafkaProducer(addr []string, topic string, producerConfig *ProducerConfig, tlsConfig *TLSConfig) (*Producer, error) { - p := Producer{ - addr: addr, - topic: topic, - config: sarama.NewConfig(), - } - - // Set producer return flags - p.config.Producer.Return.Successes = true - p.config.Producer.Return.Errors = true - - // Set partitioner strategy - p.config.Producer.Partitioner = sarama.NewHashPartitioner - - // Configure producer acknowledgement level - configureProducerAck(&p, producerConfig.ProducerAck) +func BuildProducerConfig(conf kafka.Config) (*sarama.Config, error) { + return kafka.BuildProducerConfig(conf) +} - // Configure message compression - err := configureCompression(&p, producerConfig.CompressType) +func NewKafkaProducer(kfk *sarama.Config, addr []string, topic string) (*Producer, error) { + producer, err := kafka.NewProducer(kfk, addr) if err != nil { return nil, err } - - // Get Kafka configuration from environment variables or fallback to config file - kafkaUsername := getEnvOrConfig("KAFKA_USERNAME", producerConfig.Username) - kafkaPassword := getEnvOrConfig("KAFKA_PASSWORD", producerConfig.Password) - kafkaAddr := getKafkaAddrFromEnv(addr) // Updated to use the new function - - // Configure SASL authentication if credentials are provided - if kafkaUsername != "" && kafkaPassword != "" { - p.config.Net.SASL.Enable = true - p.config.Net.SASL.User = kafkaUsername - p.config.Net.SASL.Password = kafkaPassword - } - - // Set the Kafka address - p.addr = kafkaAddr - - // Set up TLS configuration (if required) - SetupTLSConfig(p.config, tlsConfig) - - // Create the producer with retries - for i := 0; i <= maxRetry; i++ { - p.producer, err = sarama.NewSyncProducer(p.addr, p.config) - if err == nil { - return &p, errs.Wrap(err) - } - time.Sleep(1 * time.Second) // Wait before retrying - } - // Panic if unable to create producer after retries - if err != nil { - return nil, errs.Wrap(errors.New("failed to create Kafka producer: " + err.Error())) - } - - return &p, nil -} - -// configureProducerAck configures the producer's acknowledgement level. -func configureProducerAck(p *Producer, ackConfig string) { - switch strings.ToLower(ackConfig) { - case "no_response": - p.config.Producer.RequiredAcks = sarama.NoResponse - case "wait_for_local": - p.config.Producer.RequiredAcks = sarama.WaitForLocal - case "wait_for_all": - p.config.Producer.RequiredAcks = sarama.WaitForAll - default: - p.config.Producer.RequiredAcks = sarama.WaitForAll - } -} - -// configureCompression configures the message compression type for the producer. -func configureCompression(p *Producer, compressType string) error { - var compress sarama.CompressionCodec - if compressType == "" { - compress = sarama.CompressionNone - } else { - err := compress.UnmarshalText(bytes.ToLower([]byte(compressType))) - if err != nil { - return errs.Wrap(err) - } - } - p.config.Producer.Compression = compress - return nil + return &Producer{ + addr: addr, + topic: topic, + config: kfk, + producer: producer, + }, nil } // GetMQHeaderWithContext extracts message queue headers from the context. diff --git a/tools/component/component.go b/tools/component/component.go index c2b2e689d..22335131e 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -18,7 +18,6 @@ import ( "errors" "flag" "fmt" - "github.com/IBM/sarama" "github.com/openimsdk/open-im-server/v3/pkg/common/kafka" "gopkg.in/yaml.v2" "os" @@ -244,45 +243,36 @@ func checkKafka(config *config.GlobalConfig) error { } } - var tlsConfig *kafka.TLSConfig - if config.Kafka.TLS != nil { - tlsConfig = &kafka.TLSConfig{ - CACrt: config.Kafka.TLS.CACrt, - ClientCrt: config.Kafka.TLS.ClientCrt, - ClientKey: config.Kafka.TLS.ClientKey, - ClientKeyPwd: config.Kafka.TLS.ClientKeyPwd, - InsecureSkipVerify: config.Kafka.TLS.InsecureSkipVerify, - } - } - - _, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ - KafkaVersion: sarama.V2_0_0_0, - OffsetsInitial: sarama.OffsetNewest, - IsReturnErr: false, - UserName: config.Kafka.Username, - Password: config.Kafka.Password, - }, []string{config.Kafka.LatestMsgToRedis.Topic}, - config.Kafka.Addr, config.Kafka.ConsumerGroupID.MsgToRedis, tlsConfig) - if err != nil { - return err + type Item struct { + Topic string + GroupID string } - _, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ - KafkaVersion: sarama.V2_0_0_0, - OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, - }, []string{config.Kafka.MsgToMongo.Topic}, - config.Kafka.Addr, config.Kafka.ConsumerGroupID.MsgToMongo, tlsConfig) - if err != nil { - return err + items := []Item{ + { + Topic: config.Kafka.LatestMsgToRedis.Topic, + GroupID: config.Kafka.ConsumerGroupID.MsgToRedis, + }, + + { + Topic: config.Kafka.MsgToMongo.Topic, + GroupID: config.Kafka.ConsumerGroupID.MsgToMongo, + }, + + { + Topic: config.Kafka.MsgToPush.Topic, + GroupID: config.Kafka.ConsumerGroupID.MsgToPush, + }, } - _, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{ - KafkaVersion: sarama.V2_0_0_0, - OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, - }, []string{config.Kafka.MsgToPush.Topic}, config.Kafka.Addr, - config.Kafka.ConsumerGroupID.MsgToPush, tlsConfig) - if err != nil { - return err + for _, item := range items { + cg, err := kafka.NewMConsumerGroup(config.Kafka.Config, item.GroupID, []string{item.Topic}) + if err != nil { + return err + } + if err := cg.Close(); err != nil { + return err + } } return nil