feat: extract kafka

pull/2148/head
withchao 2 years ago
parent 53b64ae7c8
commit 519a9153e3

@ -17,7 +17,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/localcache v0.0.1
github.com/openimsdk/protocol v0.0.58-google
github.com/openimsdk/tools v0.0.47-alpha.9
github.com/openimsdk/tools v0.0.47-alpha.10
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.8.4

@ -259,8 +259,8 @@ github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
github.com/openimsdk/protocol v0.0.58-google h1:cGNUVaXO9LqcFgIb4NvrtEOrv0spGecoQKyN8YWhyZs=
github.com/openimsdk/protocol v0.0.58-google/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.47-alpha.9 h1:s+P5+PVam26SSJeWrvoBslbjQYp5/SHdbBBgLV3UMW4=
github.com/openimsdk/tools v0.0.47-alpha.9/go.mod h1:mUsH+ANKbdmhUih43ijJHvuYcU8owm7X3kdFH7FsIec=
github.com/openimsdk/tools v0.0.47-alpha.10 h1:bel44PB4xcC1uO+1y/LYhgsPmAGpxrlNd8JaFL4yc50=
github.com/openimsdk/tools v0.0.47-alpha.10/go.mod h1:mUsH+ANKbdmhUih43ijJHvuYcU8owm7X3kdFH7FsIec=
github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=

@ -1,4 +1,4 @@
go 1.19
go 1.21
use (
.

@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"github.com/openimsdk/tools/db/redisutil"
"net"
"net/http"
"os"
@ -56,7 +57,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, port int, proPort i
wrappedErr := errs.WrapMsg(errors.New("port or proPort is empty"), "validation error", "port", port, "proPort", proPort)
return wrappedErr
}
rdb, err := cache.NewRedis(ctx, &config.Redis)
rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build())
if err != nil {
return err
}

@ -16,6 +16,7 @@ package msggateway
import (
"context"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -32,7 +33,7 @@ import (
)
func (s *Server) InitServer(ctx context.Context, config *config.GlobalConfig, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(ctx, &config.Redis)
rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build())
if err != nil {
return err
}

@ -26,7 +26,6 @@ import (
"github.com/go-redis/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/kafka"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/constant"
@ -34,6 +33,7 @@ import (
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mq/kafka"
"github.com/openimsdk/tools/utils/idutil"
"github.com/openimsdk/tools/utils/stringutil"
"google.golang.org/protobuf/proto"

@ -20,20 +20,20 @@ import (
"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
kfk "github.com/openimsdk/open-im-server/v3/pkg/common/kafka"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
pbmsg "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mq/kafka"
"google.golang.org/protobuf/proto"
)
type OnlineHistoryMongoConsumerHandler struct {
historyConsumerGroup *kfk.MConsumerGroup
historyConsumerGroup *kafka.MConsumerGroup
msgDatabase controller.CommonMsgDatabase
}
func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) {
historyConsumerGroup, err := kfk.NewMConsumerGroup(&kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToMongo, []string{kafkaConf.MsgToMongo.Topic})
historyConsumerGroup, err := kafka.NewMConsumerGroup(&kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToMongo, []string{kafkaConf.MsgToMongo.Topic})
if err != nil {
return nil, err
}

@ -19,23 +19,23 @@ import (
"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
kfk "github.com/openimsdk/open-im-server/v3/pkg/common/kafka"
"github.com/openimsdk/protocol/constant"
pbchat "github.com/openimsdk/protocol/msg"
pbpush "github.com/openimsdk/protocol/push"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mq/kafka"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/timeutil"
"google.golang.org/protobuf/proto"
)
type ConsumerHandler struct {
pushConsumerGroup *kfk.MConsumerGroup
pushConsumerGroup *kafka.MConsumerGroup
pusher *Pusher
}
func NewConsumerHandler(kafkaConf *config.Kafka, pusher *Pusher) (*ConsumerHandler, error) {
pushConsumerGroup, err := kfk.NewMConsumerGroup(&kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToPush, []string{kafkaConf.MsgToPush.Topic})
pushConsumerGroup, err := kafka.NewMConsumerGroup(&kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToPush, []string{kafkaConf.MsgToPush.Topic})
if err != nil {
return nil, err
}
@ -84,11 +84,11 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
}
}
func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim,
) error {
func (*ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil }
func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
ctx := c.pushConsumerGroup.GetContextFromMsg(msg)
c.handleMs2PsChat(ctx, msg.Value)

@ -16,6 +16,7 @@ package push
import (
"context"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
@ -35,7 +36,7 @@ type pushServer struct {
}
func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(ctx, &config.Redis)
rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build())
if err != nil {
return err
}

@ -16,13 +16,13 @@ package tools
import (
"context"
"github.com/openimsdk/tools/db/redisutil"
"os"
"os/signal"
"syscall"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
@ -40,7 +40,7 @@ func StartTask(ctx context.Context, config *config.GlobalConfig) error {
msgTool.convertTools()
rdb, err := cache.NewRedis(ctx, &config.Redis)
rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build())
if err != nil {
return err
}

@ -25,13 +25,13 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/kafka"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/protocol/constant"
pbmsg "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mq/kafka"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/timeutil"
"github.com/redis/go-redis/v9"

@ -1,78 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafka
import (
"context"
"errors"
"github.com/IBM/sarama"
"github.com/openimsdk/tools/log"
kfk "github.com/openimsdk/tools/mq/kafka"
)
type MConsumerGroup struct {
sarama.ConsumerGroup
groupID string
topics []string
}
type MConsumerGroupConfig struct {
KafkaVersion sarama.KafkaVersion
OffsetsInitial int64
IsReturnErr bool
UserName string
Password string
}
func NewMConsumerGroup(conf *kfk.Config, groupID string, topics []string) (*MConsumerGroup, error) {
config, err := kfk.BuildConsumerGroupConfig(conf, sarama.OffsetNewest)
if err != nil {
return nil, err
}
group, err := kfk.NewConsumerGroup(config, conf.Addr, groupID)
if err != nil {
return nil, err
}
return &MConsumerGroup{
ConsumerGroup: group,
groupID: groupID,
topics: topics,
}, nil
}
func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context {
return GetContextWithMQHeader(cMsg.Headers)
}
func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) {
log.ZDebug(ctx, "register consumer group", "groupID", mc.groupID)
for {
err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler)
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
return
}
if errors.Is(err, context.Canceled) {
return
}
if err != nil {
log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID)
}
}
}
func (mc *MConsumerGroup) Close() error {
return mc.ConsumerGroup.Close()
}

@ -1 +0,0 @@
package kafka // import "github.com/openimsdk/open-im-server/v3/pkg/common/kafka"

@ -1,128 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafka
import (
"context"
"errors"
"github.com/IBM/sarama"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
kfk "github.com/openimsdk/tools/mq/kafka"
"google.golang.org/protobuf/proto"
)
var errEmptyMsg = errors.New("kafka binary msg is empty")
// Producer represents a Kafka producer.
type Producer struct {
addr []string
topic string
config *sarama.Config
producer sarama.SyncProducer
}
type ProducerConfig struct {
ProducerAck string
CompressType string
Username string
Password string
}
func BuildProducerConfig(conf kfk.Config) (*sarama.Config, error) {
return kfk.BuildProducerConfig(conf)
}
func NewKafkaProducer(config *sarama.Config, addr []string, topic string) (*Producer, error) {
producer, err := kfk.NewProducer(config, addr)
if err != nil {
return nil, err
}
return &Producer{
addr: addr,
topic: topic,
config: config,
producer: producer,
}, nil
}
// GetMQHeaderWithContext extracts message queue headers from the context.
func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) {
operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx)
if err != nil {
return nil, err
}
return []sarama.RecordHeader{
{Key: []byte(constant.OperationID), Value: []byte(operationID)},
{Key: []byte(constant.OpUserID), Value: []byte(opUserID)},
{Key: []byte(constant.OpUserPlatform), Value: []byte(platform)},
{Key: []byte(constant.ConnID), Value: []byte(connID)},
}, nil
}
// GetContextWithMQHeader creates a context from message queue headers.
func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context {
var values []string
for _, recordHeader := range header {
values = append(values, string(recordHeader.Value))
}
return mcontext.WithMustInfoCtx(values) // Attach extracted values to context
}
// SendMessage sends a message to the Kafka topic configured in the Producer.
func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error) {
log.ZDebug(ctx, "SendMessage", "msg", msg, "topic", p.topic, "key", key)
// Marshal the protobuf message
bMsg, err := proto.Marshal(msg)
if err != nil {
return 0, 0, errs.WrapMsg(err, "kafka proto Marshal err")
}
if len(bMsg) == 0 {
return 0, 0, errs.WrapMsg(errEmptyMsg, "kafka proto Marshal err")
}
// Prepare Kafka message
kMsg := &sarama.ProducerMessage{
Topic: p.topic,
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(bMsg),
}
// Validate message key and value
if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 {
return 0, 0, errs.Wrap(errEmptyMsg)
}
// Attach context metadata as headers
header, err := GetMQHeaderWithContext(ctx)
if err != nil {
return 0, 0, err
}
kMsg.Headers = header
// Send the message
partition, offset, err := p.producer.SendMessage(kMsg)
if err != nil {
log.ZWarn(ctx, "p.producer.SendMessage error", err)
return 0, 0, errs.Wrap(err)
}
log.ZDebug(ctx, "ByteEncoder SendMessage end", "key", kMsg.Key, "key length", kMsg.Value.Length())
return partition, offset, nil
}
Loading…
Cancel
Save