diff --git a/go.mod b/go.mod index 6d8177ab9..55bfb6173 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/localcache v0.0.1 github.com/openimsdk/protocol v0.0.58-google - github.com/openimsdk/tools v0.0.47-alpha.9 + github.com/openimsdk/tools v0.0.47-alpha.10 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.8.4 diff --git a/go.sum b/go.sum index 318a2a757..0ba17e372 100644 --- a/go.sum +++ b/go.sum @@ -259,8 +259,8 @@ github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= github.com/openimsdk/protocol v0.0.58-google h1:cGNUVaXO9LqcFgIb4NvrtEOrv0spGecoQKyN8YWhyZs= github.com/openimsdk/protocol v0.0.58-google/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.47-alpha.9 h1:s+P5+PVam26SSJeWrvoBslbjQYp5/SHdbBBgLV3UMW4= -github.com/openimsdk/tools v0.0.47-alpha.9/go.mod h1:mUsH+ANKbdmhUih43ijJHvuYcU8owm7X3kdFH7FsIec= +github.com/openimsdk/tools v0.0.47-alpha.10 h1:bel44PB4xcC1uO+1y/LYhgsPmAGpxrlNd8JaFL4yc50= +github.com/openimsdk/tools v0.0.47-alpha.10/go.mod h1:mUsH+ANKbdmhUih43ijJHvuYcU8owm7X3kdFH7FsIec= github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/go.work b/go.work index 889f20cd8..848da501c 100644 --- a/go.work +++ b/go.work @@ -1,4 +1,4 @@ -go 1.19 +go 1.21 use ( . diff --git a/internal/api/route.go b/internal/api/route.go index bd361288d..d584c517e 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "github.com/openimsdk/tools/db/redisutil" "net" "net/http" "os" @@ -56,7 +57,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, port int, proPort i wrappedErr := errs.WrapMsg(errors.New("port or proPort is empty"), "validation error", "port", port, "proPort", proPort) return wrappedErr } - rdb, err := cache.NewRedis(ctx, &config.Redis) + rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build()) if err != nil { return err } diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index c4db01b33..59a323152 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -16,6 +16,7 @@ package msggateway import ( "context" + "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -32,7 +33,7 @@ import ( ) func (s *Server) InitServer(ctx context.Context, config *config.GlobalConfig, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := cache.NewRedis(ctx, &config.Redis) + rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build()) if err != nil { return err } diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 5a59ddeb0..0c4d55415 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -26,7 +26,6 @@ import ( "github.com/go-redis/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" - "github.com/openimsdk/open-im-server/v3/pkg/common/kafka" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/constant" @@ -34,6 +33,7 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" + "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/utils/idutil" "github.com/openimsdk/tools/utils/stringutil" "google.golang.org/protobuf/proto" diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 699211545..703a9ba91 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -20,20 +20,20 @@ import ( "github.com/IBM/sarama" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" - kfk "github.com/openimsdk/open-im-server/v3/pkg/common/kafka" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/mq/kafka" "google.golang.org/protobuf/proto" ) type OnlineHistoryMongoConsumerHandler struct { - historyConsumerGroup *kfk.MConsumerGroup + historyConsumerGroup *kafka.MConsumerGroup msgDatabase controller.CommonMsgDatabase } func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) { - historyConsumerGroup, err := kfk.NewMConsumerGroup(&kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToMongo, []string{kafkaConf.MsgToMongo.Topic}) + historyConsumerGroup, err := kafka.NewMConsumerGroup(&kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToMongo, []string{kafkaConf.MsgToMongo.Topic}) if err != nil { return nil, err } diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 9df2199de..d994fdcde 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -19,23 +19,23 @@ import ( "github.com/IBM/sarama" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - kfk "github.com/openimsdk/open-im-server/v3/pkg/common/kafka" "github.com/openimsdk/protocol/constant" pbchat "github.com/openimsdk/protocol/msg" pbpush "github.com/openimsdk/protocol/push" "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/timeutil" "google.golang.org/protobuf/proto" ) type ConsumerHandler struct { - pushConsumerGroup *kfk.MConsumerGroup + pushConsumerGroup *kafka.MConsumerGroup pusher *Pusher } func NewConsumerHandler(kafkaConf *config.Kafka, pusher *Pusher) (*ConsumerHandler, error) { - pushConsumerGroup, err := kfk.NewMConsumerGroup(&kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToPush, []string{kafkaConf.MsgToPush.Topic}) + pushConsumerGroup, err := kafka.NewMConsumerGroup(&kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToPush, []string{kafkaConf.MsgToPush.Topic}) if err != nil { return nil, err } @@ -84,11 +84,11 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { } } -func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } -func (ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } -func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, - claim sarama.ConsumerGroupClaim, -) error { +func (*ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil } + +func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil } + +func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { ctx := c.pushConsumerGroup.GetContextFromMsg(msg) c.handleMs2PsChat(ctx, msg.Value) diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index 9881aa76a..afb141091 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -16,6 +16,7 @@ package push import ( "context" + "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" @@ -35,7 +36,7 @@ type pushServer struct { } func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := cache.NewRedis(ctx, &config.Redis) + rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build()) if err != nil { return err } diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 19a5b283f..cee89f691 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -16,13 +16,13 @@ package tools import ( "context" + "github.com/openimsdk/tools/db/redisutil" "os" "os/signal" "syscall" "time" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/redis/go-redis/v9" @@ -40,7 +40,7 @@ func StartTask(ctx context.Context, config *config.GlobalConfig) error { msgTool.convertTools() - rdb, err := cache.NewRedis(ctx, &config.Redis) + rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build()) if err != nil { return err } diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 46a5d3f28..959a20096 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -25,13 +25,13 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" - "github.com/openimsdk/open-im-server/v3/pkg/common/kafka" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/protocol/constant" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/timeutil" "github.com/redis/go-redis/v9" diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go deleted file mode 100644 index 6f8e67e3f..000000000 --- a/pkg/common/kafka/consumer_group.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kafka - -import ( - "context" - "errors" - - "github.com/IBM/sarama" - "github.com/openimsdk/tools/log" - kfk "github.com/openimsdk/tools/mq/kafka" -) - -type MConsumerGroup struct { - sarama.ConsumerGroup - groupID string - topics []string -} - -type MConsumerGroupConfig struct { - KafkaVersion sarama.KafkaVersion - OffsetsInitial int64 - IsReturnErr bool - UserName string - Password string -} - -func NewMConsumerGroup(conf *kfk.Config, groupID string, topics []string) (*MConsumerGroup, error) { - config, err := kfk.BuildConsumerGroupConfig(conf, sarama.OffsetNewest) - if err != nil { - return nil, err - } - group, err := kfk.NewConsumerGroup(config, conf.Addr, groupID) - if err != nil { - return nil, err - } - return &MConsumerGroup{ - ConsumerGroup: group, - groupID: groupID, - topics: topics, - }, nil -} - -func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context { - return GetContextWithMQHeader(cMsg.Headers) -} - -func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) { - log.ZDebug(ctx, "register consumer group", "groupID", mc.groupID) - for { - err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler) - if errors.Is(err, sarama.ErrClosedConsumerGroup) { - return - } - if errors.Is(err, context.Canceled) { - return - } - if err != nil { - log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID) - } - } -} - -func (mc *MConsumerGroup) Close() error { - return mc.ConsumerGroup.Close() -} diff --git a/pkg/common/kafka/doc.go b/pkg/common/kafka/doc.go deleted file mode 100644 index 579a1779a..000000000 --- a/pkg/common/kafka/doc.go +++ /dev/null @@ -1 +0,0 @@ -package kafka // import "github.com/openimsdk/open-im-server/v3/pkg/common/kafka" diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go deleted file mode 100644 index be806aa5f..000000000 --- a/pkg/common/kafka/producer.go +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kafka - -import ( - "context" - "errors" - - "github.com/IBM/sarama" - "github.com/openimsdk/protocol/constant" - "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mcontext" - kfk "github.com/openimsdk/tools/mq/kafka" - "google.golang.org/protobuf/proto" -) - -var errEmptyMsg = errors.New("kafka binary msg is empty") - -// Producer represents a Kafka producer. -type Producer struct { - addr []string - topic string - config *sarama.Config - producer sarama.SyncProducer -} - -type ProducerConfig struct { - ProducerAck string - CompressType string - Username string - Password string -} - -func BuildProducerConfig(conf kfk.Config) (*sarama.Config, error) { - return kfk.BuildProducerConfig(conf) -} - -func NewKafkaProducer(config *sarama.Config, addr []string, topic string) (*Producer, error) { - producer, err := kfk.NewProducer(config, addr) - if err != nil { - return nil, err - } - return &Producer{ - addr: addr, - topic: topic, - config: config, - producer: producer, - }, nil -} - -// GetMQHeaderWithContext extracts message queue headers from the context. -func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) { - operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx) - if err != nil { - return nil, err - } - return []sarama.RecordHeader{ - {Key: []byte(constant.OperationID), Value: []byte(operationID)}, - {Key: []byte(constant.OpUserID), Value: []byte(opUserID)}, - {Key: []byte(constant.OpUserPlatform), Value: []byte(platform)}, - {Key: []byte(constant.ConnID), Value: []byte(connID)}, - }, nil -} - -// GetContextWithMQHeader creates a context from message queue headers. -func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context { - var values []string - for _, recordHeader := range header { - values = append(values, string(recordHeader.Value)) - } - return mcontext.WithMustInfoCtx(values) // Attach extracted values to context -} - -// SendMessage sends a message to the Kafka topic configured in the Producer. -func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error) { - log.ZDebug(ctx, "SendMessage", "msg", msg, "topic", p.topic, "key", key) - - // Marshal the protobuf message - bMsg, err := proto.Marshal(msg) - if err != nil { - return 0, 0, errs.WrapMsg(err, "kafka proto Marshal err") - } - if len(bMsg) == 0 { - return 0, 0, errs.WrapMsg(errEmptyMsg, "kafka proto Marshal err") - } - - // Prepare Kafka message - kMsg := &sarama.ProducerMessage{ - Topic: p.topic, - Key: sarama.StringEncoder(key), - Value: sarama.ByteEncoder(bMsg), - } - - // Validate message key and value - if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 { - return 0, 0, errs.Wrap(errEmptyMsg) - } - - // Attach context metadata as headers - header, err := GetMQHeaderWithContext(ctx) - if err != nil { - return 0, 0, err - } - kMsg.Headers = header - - // Send the message - partition, offset, err := p.producer.SendMessage(kMsg) - if err != nil { - log.ZWarn(ctx, "p.producer.SendMessage error", err) - return 0, 0, errs.Wrap(err) - } - - log.ZDebug(ctx, "ByteEncoder SendMessage end", "key", kMsg.Key, "key length", kMsg.Value.Length()) - return partition, offset, nil -}