feat: implement kafka producer and consumer.

pull/2600/head
Monet Lee 1 year ago
parent 0a71bef823
commit d4669d9ab0

@ -22,6 +22,8 @@ toRedisGroupID: redis
toMongoGroupID: mongo
# Consumer group ID for push notifications topic
toPushGroupID: push
# Consumer group ID for offline push notifications topic
toOfflinePushGroupID: offlinePush
# TLS (Transport Layer Security) configuration
tls:
# Enable or disable TLS

@ -7,9 +7,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
pbpush "github.com/openimsdk/protocol/push"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/discovery"
"google.golang.org/grpc"
@ -25,7 +23,6 @@ type pushServer struct {
type Config struct {
RpcConfig config.Push
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
NotificationConfig config.Notification
Share config.Share
@ -49,10 +46,6 @@ func (p pushServer) DelUserPushToken(ctx context.Context,
}
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return err
}
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
if err != nil {
return err
@ -62,29 +55,9 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
database := controller.NewPushDatabase(cacheModel)
msgModel := redis.NewMsgCache(rdb)
msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB())
if err != nil {
return err
}
seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB())
if err != nil {
return err
}
seqConversationCache := redis.NewSeqConversationCacheRedis(rdb, seqConversation)
seqUser, err := mgo.NewSeqUserMongo(mgocli.GetDB())
if err != nil {
return err
}
seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser)
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, &config.KafkaConfig)
if err != nil {
return err
}
database := controller.NewPushDatabase(cacheModel, &config.KafkaConfig)
consumer, err := NewConsumerHandler(config, msgDatabase, offlinePusher, rdb, client)
consumer, err := NewConsumerHandler(config, database, offlinePusher, rdb, client)
if err != nil {
return err
}

@ -33,7 +33,7 @@ type ConsumerHandler struct {
pushConsumerGroup *kafka.MConsumerGroup
offlinePusher offlinepush.OfflinePusher
onlinePusher OnlinePusher
msgDatabase controller.CommonMsgDatabase
pushDatabase controller.PushDatabase
onlineCache *rpccache.OnlineCache
groupLocalCache *rpccache.GroupLocalCache
conversationLocalCache *rpccache.ConversationLocalCache
@ -44,15 +44,16 @@ type ConsumerHandler struct {
config *Config
}
func NewConsumerHandler(config *Config, database controller.CommonMsgDatabase, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient,
func NewConsumerHandler(config *Config, database controller.PushDatabase, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient,
client discovery.SvcDiscoveryRegistry) (*ConsumerHandler, error) {
var consumerHandler ConsumerHandler
var err error
consumerHandler.pushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToPushGroupID,
[]string{config.KafkaConfig.ToPushTopic}, true)
[]string{config.KafkaConfig.ToPushTopic, config.KafkaConfig.ToOfflinePushTopic}, true)
if err != nil {
return nil, err
}
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
consumerHandler.offlinePusher = offlinePusher
consumerHandler.onlinePusher = NewOnlinePusher(client, config)
@ -63,8 +64,7 @@ func NewConsumerHandler(config *Config, database controller.CommonMsgDatabase, o
consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationRpcClient, &config.LocalCacheConfig, rdb)
consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL)
consumerHandler.config = config
consumerHandler.msgDatabase = database
//
consumerHandler.pushDatabase = database
consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, nil)
return &consumerHandler, nil
}
@ -85,14 +85,6 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
}
var err error
if len(msgFromMQ.GetUserIDs()) > 0 {
err := c.offlinePushMsg(ctx, msgFromMQ.MsgData, msgFromMQ.UserIDs)
if err != nil {
log.ZWarn(ctx, "offline push failed", err, "msg", msgFromMQ.String())
}
return
}
switch msgFromMQ.MsgData.SessionType {
case constant.ReadGroupChatType:
err = c.Push2Group(ctx, msgFromMQ.MsgData.GroupID, msgFromMQ.MsgData)
@ -111,6 +103,19 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
}
}
func (c *ConsumerHandler) handleMsg2OfflinePush(ctx context.Context, msg []byte) {
offlinePushMsg := pbpush.PushMsgReq{}
if err := proto.Unmarshal(msg, &offlinePushMsg); err != nil {
log.ZError(ctx, "offline push Unmarshal msg err", err, "msg", string(msg))
return
}
err := c.offlinePushMsg(ctx, offlinePushMsg.MsgData, offlinePushMsg.UserIDs)
if err != nil {
log.ZWarn(ctx, "offline push failed", err, "msg", offlinePushMsg.String())
}
}
func (*ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil }
func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
@ -118,7 +123,12 @@ func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil
func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
ctx := c.pushConsumerGroup.GetContextFromMsg(msg)
c.handleMs2PsChat(ctx, msg.Value)
switch msg.Topic {
case c.config.KafkaConfig.ToPushTopic:
c.handleMs2PsChat(ctx, msg.Value)
case c.config.KafkaConfig.ToOfflinePushTopic:
c.handleMsg2OfflinePush(ctx, msg.Value)
}
sess.MarkMessage(msg, "")
}
return nil
@ -262,18 +272,12 @@ func (c *ConsumerHandler) asyncOfflinePush(ctx context.Context, needOfflinePushU
if len(offlinePushUserIDs) > 0 {
needOfflinePushUserIDs = offlinePushUserIDs
}
if err := c.msgDatabase.MsgToOfflinePushMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(msg.SendID, msg.RecvID), needOfflinePushUserIDs, msg); err != nil {
if err := c.pushDatabase.MsgToOfflinePushMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(msg.SendID, msg.RecvID), needOfflinePushUserIDs, msg); err != nil {
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
return
}
}
func (c *ConsumerHandler) handleMsg2OfflinePush(ctx context.Context, needOfflinePushUserIDs []string, msg *sdkws.MsgData) {
if err := c.offlinePushMsg(ctx, msg, needOfflinePushUserIDs); err != nil {
log.ZWarn(ctx, "offlinePushMsg failed", err, "needOfflinePushUserIDs", needOfflinePushUserIDs, "msg", msg)
}
}
func (c *ConsumerHandler) groupMessagesHandler(ctx context.Context, groupID string, pushToUserIDs *[]string, msg *sdkws.MsgData) (err error) {
if len(*pushToUserIDs) == 0 {
*pushToUserIDs, err = c.groupLocalCache.GetGroupMemberIDs(ctx, groupID)

@ -73,19 +73,21 @@ type Mongo struct {
MaxRetry int `mapstructure:"maxRetry"`
}
type Kafka struct {
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
ProducerAck string `mapstructure:"producerAck"`
CompressType string `mapstructure:"compressType"`
Address []string `mapstructure:"address"`
ToRedisTopic string `mapstructure:"toRedisTopic"`
ToMongoTopic string `mapstructure:"toMongoTopic"`
ToPushTopic string `mapstructure:"toPushTopic"`
ToOfflinePushTopic string `mapstructure:"toOfflinePushTopic"`
ToRedisGroupID string `mapstructure:"toRedisGroupID"`
ToMongoGroupID string `mapstructure:"toMongoGroupID"`
ToPushGroupID string `mapstructure:"toPushGroupID"`
Tls TLSConfig `mapstructure:"tls"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
ProducerAck string `mapstructure:"producerAck"`
CompressType string `mapstructure:"compressType"`
Address []string `mapstructure:"address"`
ToRedisTopic string `mapstructure:"toRedisTopic"`
ToMongoTopic string `mapstructure:"toMongoTopic"`
ToPushTopic string `mapstructure:"toPushTopic"`
ToOfflinePushTopic string `mapstructure:"toOfflinePushTopic"`
ToRedisGroupID string `mapstructure:"toRedisGroupID"`
ToMongoGroupID string `mapstructure:"toMongoGroupID"`
ToPushGroupID string `mapstructure:"toPushGroupID"`
ToOfflineGroupID string `mapstructure:"toOfflinePushGroupID"`
Tls TLSConfig `mapstructure:"tls"`
}
type TLSConfig struct {
EnableTLS bool `mapstructure:"enableTLS"`

@ -30,7 +30,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/protocol/constant"
pbmsg "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/push"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
@ -93,7 +92,6 @@ type CommonMsgDatabase interface {
// to mq
MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error
MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error)
MsgToOfflinePushMQ(ctx context.Context, key string, userIDs []string, msg2mq *sdkws.MsgData) error
MsgToMongoMQ(ctx context.Context, key, conversationID string, msgs []*sdkws.MsgData, lastSeq int64) error
RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error)
@ -124,32 +122,27 @@ func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser
if err != nil {
return nil, err
}
producerToOfflinePush, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToOfflinePushTopic)
if err != nil {
return nil, err
}
return &commonMsgDatabase{
msgDocDatabase: msgDocModel,
msg: msg,
seqUser: seqUser,
seqConversation: seqConversation,
producer: producerToRedis,
producerToMongo: producerToMongo,
producerToPush: producerToPush,
producerToOfflinePush: producerToOfflinePush,
msgDocDatabase: msgDocModel,
msg: msg,
seqUser: seqUser,
seqConversation: seqConversation,
producer: producerToRedis,
producerToMongo: producerToMongo,
producerToPush: producerToPush,
}, nil
}
type commonMsgDatabase struct {
msgDocDatabase database.Msg
msgTable model.MsgDocModel
msg cache.MsgCache
seqConversation cache.SeqConversationCache
seqUser cache.SeqUser
producer *kafka.Producer
producerToMongo *kafka.Producer
producerToPush *kafka.Producer
producerToOfflinePush *kafka.Producer
msgDocDatabase database.Msg
msgTable model.MsgDocModel
msg cache.MsgCache
seqConversation cache.SeqConversationCache
seqUser cache.SeqUser
producer *kafka.Producer
producerToMongo *kafka.Producer
producerToPush *kafka.Producer
}
func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error {
@ -166,11 +159,6 @@ func (db *commonMsgDatabase) MsgToPushMQ(ctx context.Context, key, conversationI
return partition, offset, nil
}
func (db *commonMsgDatabase) MsgToOfflinePushMQ(ctx context.Context, key string, userIDs []string, msg2mq *sdkws.MsgData) error {
_, _, err := db.producerToOfflinePush.SendMessage(ctx, key, &push.PushMsgReq{MsgData: msg2mq, UserIDs: userIDs})
return err
}
func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error {
if len(messages) > 0 {
_, _, err := db.producerToMongo.SendMessage(ctx, key, &pbmsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages})

@ -17,21 +17,41 @@ package controller
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/protocol/push"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/mq/kafka"
)
type PushDatabase interface {
DelFcmToken(ctx context.Context, userID string, platformID int) error
MsgToOfflinePushMQ(ctx context.Context, key string, userIDs []string, msg2mq *sdkws.MsgData) error
}
type pushDataBase struct {
cache cache.ThirdCache
cache cache.ThirdCache
producerToOfflinePush *kafka.Producer
}
func NewPushDatabase(cache cache.ThirdCache) PushDatabase {
return &pushDataBase{cache: cache}
func NewPushDatabase(cache cache.ThirdCache, kafkaConf *config.Kafka) PushDatabase {
conf, err := kafka.BuildProducerConfig(*kafkaConf.Build())
if err != nil {
return nil
}
producerToOfflinePush, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToOfflinePushTopic)
if err != nil {
return nil
}
return &pushDataBase{cache: cache,
producerToOfflinePush: producerToOfflinePush}
}
func (p *pushDataBase) DelFcmToken(ctx context.Context, userID string, platformID int) error {
return p.cache.DelFcmToken(ctx, userID, platformID)
}
func (p *pushDataBase) MsgToOfflinePushMQ(ctx context.Context, key string, userIDs []string, msg2mq *sdkws.MsgData) error {
_, _, err := p.producerToOfflinePush.SendMessage(ctx, key, &push.PushMsgReq{MsgData: msg2mq, UserIDs: userIDs})
return err
}

Loading…
Cancel
Save