parent
1e54235263
commit
aeee3f33b1
@ -0,0 +1,33 @@
|
|||||||
|
// Copyright © 2024 OpenIM open source community. All rights reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package kafka
|
||||||
|
|
||||||
|
type TLSConfig struct {
|
||||||
|
EnableTLS bool `yaml:"enableTLS"`
|
||||||
|
CACrt string `yaml:"caCrt"`
|
||||||
|
ClientCrt string `yaml:"clientCrt"`
|
||||||
|
ClientKey string `yaml:"clientKey"`
|
||||||
|
ClientKeyPwd string `yaml:"clientKeyPwd"`
|
||||||
|
InsecureSkipVerify bool `yaml:"insecureSkipVerify"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
Username string `yaml:"username"`
|
||||||
|
Password string `yaml:"password"`
|
||||||
|
ProducerAck string `yaml:"producerAck"`
|
||||||
|
CompressType string `yaml:"compressType"`
|
||||||
|
Addr []string `yaml:"addr"`
|
||||||
|
TLS TLSConfig `yaml:"tls"`
|
||||||
|
}
|
@ -0,0 +1,68 @@
|
|||||||
|
// Copyright © 2023 OpenIM. All rights reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package kafka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"github.com/IBM/sarama"
|
||||||
|
"github.com/openimsdk/tools/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MConsumerGroup struct {
|
||||||
|
sarama.ConsumerGroup
|
||||||
|
groupID string
|
||||||
|
topics []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMConsumerGroup(conf *Config, groupID string, topics []string, autoCommitEnable bool) (*MConsumerGroup, error) {
|
||||||
|
config, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, autoCommitEnable)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
group, err := NewConsumerGroup(config, conf.Addr, groupID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &MConsumerGroup{
|
||||||
|
ConsumerGroup: group,
|
||||||
|
groupID: groupID,
|
||||||
|
topics: topics,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context {
|
||||||
|
return GetContextWithMQHeader(cMsg.Headers)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) {
|
||||||
|
for {
|
||||||
|
err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler)
|
||||||
|
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if errors.Is(err, context.Canceled) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mc *MConsumerGroup) Close() error {
|
||||||
|
return mc.ConsumerGroup.Close()
|
||||||
|
}
|
@ -0,0 +1,82 @@
|
|||||||
|
// Copyright © 2023 OpenIM. All rights reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package kafka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/IBM/sarama"
|
||||||
|
"github.com/openimsdk/tools/errs"
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Producer represents a Kafka producer.
|
||||||
|
type Producer struct {
|
||||||
|
addr []string
|
||||||
|
topic string
|
||||||
|
config *sarama.Config
|
||||||
|
producer sarama.SyncProducer
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewKafkaProducer(config *sarama.Config, addr []string, topic string) (*Producer, error) {
|
||||||
|
producer, err := NewProducer(config, addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &Producer{
|
||||||
|
addr: addr,
|
||||||
|
topic: topic,
|
||||||
|
config: config,
|
||||||
|
producer: producer,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendMessage sends a message to the Kafka topic configured in the Producer.
|
||||||
|
func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error) {
|
||||||
|
// Marshal the protobuf message
|
||||||
|
bMsg, err := proto.Marshal(msg)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, errs.WrapMsg(err, "kafka proto Marshal err")
|
||||||
|
}
|
||||||
|
if len(bMsg) == 0 {
|
||||||
|
return 0, 0, errs.WrapMsg(errEmptyMsg, "kafka proto Marshal err")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepare Kafka message
|
||||||
|
kMsg := &sarama.ProducerMessage{
|
||||||
|
Topic: p.topic,
|
||||||
|
Key: sarama.StringEncoder(key),
|
||||||
|
Value: sarama.ByteEncoder(bMsg),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate message key and value
|
||||||
|
if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 {
|
||||||
|
return 0, 0, errs.Wrap(errEmptyMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attach context metadata as headers
|
||||||
|
header, err := GetMQHeaderWithContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, err
|
||||||
|
}
|
||||||
|
kMsg.Headers = header
|
||||||
|
|
||||||
|
// Send the message
|
||||||
|
partition, offset, err := p.producer.SendMessage(kMsg)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, errs.WrapMsg(err, "p.producer.SendMessage error")
|
||||||
|
}
|
||||||
|
|
||||||
|
return partition, offset, nil
|
||||||
|
}
|
@ -0,0 +1,85 @@
|
|||||||
|
package kafka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/IBM/sarama"
|
||||||
|
"github.com/openimsdk/tools/errs"
|
||||||
|
)
|
||||||
|
|
||||||
|
func BuildConsumerGroupConfig(conf *Config, initial int64, autoCommitEnable bool) (*sarama.Config, error) {
|
||||||
|
kfk := sarama.NewConfig()
|
||||||
|
kfk.Version = sarama.V2_0_0_0
|
||||||
|
kfk.Consumer.Offsets.Initial = initial
|
||||||
|
kfk.Consumer.Offsets.AutoCommit.Enable = autoCommitEnable
|
||||||
|
kfk.Consumer.Return.Errors = false
|
||||||
|
if conf.Username != "" || conf.Password != "" {
|
||||||
|
kfk.Net.SASL.Enable = true
|
||||||
|
kfk.Net.SASL.User = conf.Username
|
||||||
|
kfk.Net.SASL.Password = conf.Password
|
||||||
|
}
|
||||||
|
if conf.TLS.EnableTLS {
|
||||||
|
tls, err := newTLSConfig(conf.TLS.ClientCrt, conf.TLS.ClientKey, conf.TLS.CACrt, []byte(conf.TLS.ClientKeyPwd), conf.TLS.InsecureSkipVerify)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
kfk.Net.TLS.Config = tls
|
||||||
|
kfk.Net.TLS.Enable = true
|
||||||
|
}
|
||||||
|
return kfk, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConsumerGroup(conf *sarama.Config, addr []string, groupID string) (sarama.ConsumerGroup, error) {
|
||||||
|
cg, err := sarama.NewConsumerGroup(addr, groupID, conf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.WrapMsg(err, "NewConsumerGroup failed", "addr", addr, "groupID", groupID, "conf", *conf)
|
||||||
|
}
|
||||||
|
return cg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func BuildProducerConfig(conf Config) (*sarama.Config, error) {
|
||||||
|
kfk := sarama.NewConfig()
|
||||||
|
kfk.Producer.Return.Successes = true
|
||||||
|
kfk.Producer.Return.Errors = true
|
||||||
|
kfk.Producer.Partitioner = sarama.NewHashPartitioner
|
||||||
|
if conf.Username != "" || conf.Password != "" {
|
||||||
|
kfk.Net.SASL.Enable = true
|
||||||
|
kfk.Net.SASL.User = conf.Username
|
||||||
|
kfk.Net.SASL.Password = conf.Password
|
||||||
|
}
|
||||||
|
switch strings.ToLower(conf.ProducerAck) {
|
||||||
|
case "no_response":
|
||||||
|
kfk.Producer.RequiredAcks = sarama.NoResponse
|
||||||
|
case "wait_for_local":
|
||||||
|
kfk.Producer.RequiredAcks = sarama.WaitForLocal
|
||||||
|
case "wait_for_all":
|
||||||
|
kfk.Producer.RequiredAcks = sarama.WaitForAll
|
||||||
|
default:
|
||||||
|
kfk.Producer.RequiredAcks = sarama.WaitForAll
|
||||||
|
}
|
||||||
|
if conf.CompressType == "" {
|
||||||
|
kfk.Producer.Compression = sarama.CompressionNone
|
||||||
|
} else {
|
||||||
|
if err := kfk.Producer.Compression.UnmarshalText(bytes.ToLower([]byte(conf.CompressType))); err != nil {
|
||||||
|
return nil, errs.WrapMsg(err, "UnmarshalText failed", "compressType", conf.CompressType)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if conf.TLS.EnableTLS {
|
||||||
|
tls, err := newTLSConfig(conf.TLS.ClientCrt, conf.TLS.ClientKey, conf.TLS.CACrt, []byte(conf.TLS.ClientKeyPwd), conf.TLS.InsecureSkipVerify)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
kfk.Net.TLS.Config = tls
|
||||||
|
kfk.Net.TLS.Enable = true
|
||||||
|
}
|
||||||
|
return kfk, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewProducer(conf *sarama.Config, addr []string) (sarama.SyncProducer, error) {
|
||||||
|
producer, err := sarama.NewSyncProducer(addr, conf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.WrapMsg(err, "NewSyncProducer failed", "addr", addr, "conf", *conf)
|
||||||
|
}
|
||||||
|
return producer, nil
|
||||||
|
}
|
@ -0,0 +1,83 @@
|
|||||||
|
// Copyright © 2024 OpenIM open source community. All rights reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package kafka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
|
"encoding/pem"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/openimsdk/tools/errs"
|
||||||
|
)
|
||||||
|
|
||||||
|
// decryptPEM decrypts a PEM block using a password.
|
||||||
|
func decryptPEM(data []byte, passphrase []byte) ([]byte, error) {
|
||||||
|
if len(passphrase) == 0 {
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
b, _ := pem.Decode(data)
|
||||||
|
d, err := x509.DecryptPEMBlock(b, passphrase)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.WrapMsg(err, "DecryptPEMBlock failed")
|
||||||
|
}
|
||||||
|
return pem.EncodeToMemory(&pem.Block{
|
||||||
|
Type: b.Type,
|
||||||
|
Bytes: d,
|
||||||
|
}), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readEncryptablePEMBlock(path string, pwd []byte) ([]byte, error) {
|
||||||
|
data, err := os.ReadFile(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.WrapMsg(err, "ReadFile failed", "path", path)
|
||||||
|
}
|
||||||
|
return decryptPEM(data, pwd)
|
||||||
|
}
|
||||||
|
|
||||||
|
// newTLSConfig setup the TLS config from general config file.
|
||||||
|
func newTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byte, insecureSkipVerify bool) (*tls.Config, error) {
|
||||||
|
var tlsConfig tls.Config
|
||||||
|
if clientCertFile != "" && clientKeyFile != "" {
|
||||||
|
certPEMBlock, err := os.ReadFile(clientCertFile)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.WrapMsg(err, "ReadFile failed", "clientCertFile", clientCertFile)
|
||||||
|
}
|
||||||
|
keyPEMBlock, err := readEncryptablePEMBlock(clientKeyFile, keyPwd)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.WrapMsg(err, "X509KeyPair failed")
|
||||||
|
}
|
||||||
|
tlsConfig.Certificates = []tls.Certificate{cert}
|
||||||
|
}
|
||||||
|
|
||||||
|
if caCertFile != "" {
|
||||||
|
caCert, err := os.ReadFile(caCertFile)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errs.WrapMsg(err, "ReadFile failed", "caCertFile", caCertFile)
|
||||||
|
}
|
||||||
|
caCertPool := x509.NewCertPool()
|
||||||
|
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
|
||||||
|
return nil, errs.New("AppendCertsFromPEM failed")
|
||||||
|
}
|
||||||
|
tlsConfig.RootCAs = caCertPool
|
||||||
|
}
|
||||||
|
tlsConfig.InsecureSkipVerify = insecureSkipVerify
|
||||||
|
return &tlsConfig, nil
|
||||||
|
}
|
@ -0,0 +1,34 @@
|
|||||||
|
package kafka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"github.com/IBM/sarama"
|
||||||
|
"github.com/openimsdk/protocol/constant"
|
||||||
|
"github.com/openimsdk/tools/mcontext"
|
||||||
|
)
|
||||||
|
|
||||||
|
var errEmptyMsg = errors.New("kafka binary msg is empty")
|
||||||
|
|
||||||
|
// GetMQHeaderWithContext extracts message queue headers from the context.
|
||||||
|
func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) {
|
||||||
|
operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return []sarama.RecordHeader{
|
||||||
|
{Key: []byte(constant.OperationID), Value: []byte(operationID)},
|
||||||
|
{Key: []byte(constant.OpUserID), Value: []byte(opUserID)},
|
||||||
|
{Key: []byte(constant.OpUserPlatform), Value: []byte(platform)},
|
||||||
|
{Key: []byte(constant.ConnID), Value: []byte(connID)},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetContextWithMQHeader creates a context from message queue headers.
|
||||||
|
func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context {
|
||||||
|
var values []string
|
||||||
|
for _, recordHeader := range header {
|
||||||
|
values = append(values, string(recordHeader.Value))
|
||||||
|
}
|
||||||
|
return mcontext.WithMustInfoCtx(values) // Attach extracted values to context
|
||||||
|
}
|
@ -0,0 +1,79 @@
|
|||||||
|
// Copyright © 2024 OpenIM open source community. All rights reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package kafka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/IBM/sarama"
|
||||||
|
"github.com/openimsdk/tools/errs"
|
||||||
|
)
|
||||||
|
|
||||||
|
func CheckTopics(ctx context.Context, conf *Config, topics []string) error {
|
||||||
|
kfk, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, false)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
cli, err := sarama.NewClient(conf.Addr, kfk)
|
||||||
|
if err != nil {
|
||||||
|
return errs.WrapMsg(err, "NewClient failed", "config: ", fmt.Sprintf("%+v", conf))
|
||||||
|
}
|
||||||
|
defer cli.Close()
|
||||||
|
|
||||||
|
existingTopics, err := cli.Topics()
|
||||||
|
if err != nil {
|
||||||
|
return errs.WrapMsg(err, "Failed to list topics")
|
||||||
|
}
|
||||||
|
|
||||||
|
existingTopicsMap := make(map[string]bool)
|
||||||
|
for _, t := range existingTopics {
|
||||||
|
existingTopicsMap[t] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, topic := range topics {
|
||||||
|
if !existingTopicsMap[topic] {
|
||||||
|
return errs.New("topic not exist", "topic", topic).Wrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func CheckHealth(ctx context.Context, conf *Config) error {
|
||||||
|
kfk, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, false)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
cli, err := sarama.NewClient(conf.Addr, kfk)
|
||||||
|
if err != nil {
|
||||||
|
return errs.WrapMsg(err, "NewClient failed", "config: ", fmt.Sprintf("%+v", conf))
|
||||||
|
}
|
||||||
|
defer cli.Close()
|
||||||
|
|
||||||
|
// Get broker list
|
||||||
|
brokers := cli.Brokers()
|
||||||
|
if len(brokers) == 0 {
|
||||||
|
return errs.New("no brokers found").Wrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if all brokers are reachable
|
||||||
|
for _, broker := range brokers {
|
||||||
|
if err := broker.Open(kfk); err != nil {
|
||||||
|
return errs.WrapMsg(err, "failed to open broker", "broker", broker.Addr())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in new issue