From d4669d9ab0e5443a068e671414d1db2f9689e2c4 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Mon, 9 Sep 2024 14:48:45 +0800 Subject: [PATCH] feat: implement kafka producer and consumer. --- config/kafka.yml | 2 ++ internal/push/push.go | 31 ++---------------- internal/push/push_handler.go | 46 +++++++++++++++------------ pkg/common/config/config.go | 28 ++++++++-------- pkg/common/storage/controller/msg.go | 44 ++++++++++--------------- pkg/common/storage/controller/push.go | 26 +++++++++++++-- 6 files changed, 83 insertions(+), 94 deletions(-) diff --git a/config/kafka.yml b/config/kafka.yml index f35df7409..fd06ae2bb 100644 --- a/config/kafka.yml +++ b/config/kafka.yml @@ -22,6 +22,8 @@ toRedisGroupID: redis toMongoGroupID: mongo # Consumer group ID for push notifications topic toPushGroupID: push +# Consumer group ID for offline push notifications topic +toOfflinePushGroupID: offlinePush # TLS (Transport Layer Security) configuration tls: # Enable or disable TLS diff --git a/internal/push/push.go b/internal/push/push.go index dff26f210..4513b6fa9 100644 --- a/internal/push/push.go +++ b/internal/push/push.go @@ -7,9 +7,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" pbpush "github.com/openimsdk/protocol/push" - "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/discovery" "google.golang.org/grpc" @@ -25,7 +23,6 @@ type pushServer struct { type Config struct { RpcConfig config.Push RedisConfig config.Redis - MongodbConfig config.Mongo KafkaConfig config.Kafka NotificationConfig config.Notification Share config.Share @@ -49,10 +46,6 @@ func (p pushServer) DelUserPushToken(ctx context.Context, } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { - mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) - if err != nil { - return err - } rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) if err != nil { return err @@ -62,29 +55,9 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } - database := controller.NewPushDatabase(cacheModel) - msgModel := redis.NewMsgCache(rdb) - msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB()) - if err != nil { - return err - } - seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB()) - if err != nil { - return err - } - seqConversationCache := redis.NewSeqConversationCacheRedis(rdb, seqConversation) - seqUser, err := mgo.NewSeqUserMongo(mgocli.GetDB()) - if err != nil { - return err - } - seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser) - - msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, &config.KafkaConfig) - if err != nil { - return err - } + database := controller.NewPushDatabase(cacheModel, &config.KafkaConfig) - consumer, err := NewConsumerHandler(config, msgDatabase, offlinePusher, rdb, client) + consumer, err := NewConsumerHandler(config, database, offlinePusher, rdb, client) if err != nil { return err } diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 150d115cd..bf0fb649d 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -33,7 +33,7 @@ type ConsumerHandler struct { pushConsumerGroup *kafka.MConsumerGroup offlinePusher offlinepush.OfflinePusher onlinePusher OnlinePusher - msgDatabase controller.CommonMsgDatabase + pushDatabase controller.PushDatabase onlineCache *rpccache.OnlineCache groupLocalCache *rpccache.GroupLocalCache conversationLocalCache *rpccache.ConversationLocalCache @@ -44,15 +44,16 @@ type ConsumerHandler struct { config *Config } -func NewConsumerHandler(config *Config, database controller.CommonMsgDatabase, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient, +func NewConsumerHandler(config *Config, database controller.PushDatabase, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient, client discovery.SvcDiscoveryRegistry) (*ConsumerHandler, error) { var consumerHandler ConsumerHandler var err error consumerHandler.pushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToPushGroupID, - []string{config.KafkaConfig.ToPushTopic}, true) + []string{config.KafkaConfig.ToPushTopic, config.KafkaConfig.ToOfflinePushTopic}, true) if err != nil { return nil, err } + userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) consumerHandler.offlinePusher = offlinePusher consumerHandler.onlinePusher = NewOnlinePusher(client, config) @@ -63,8 +64,7 @@ func NewConsumerHandler(config *Config, database controller.CommonMsgDatabase, o consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationRpcClient, &config.LocalCacheConfig, rdb) consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL) consumerHandler.config = config - consumerHandler.msgDatabase = database - // + consumerHandler.pushDatabase = database consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, nil) return &consumerHandler, nil } @@ -85,14 +85,6 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { } var err error - if len(msgFromMQ.GetUserIDs()) > 0 { - err := c.offlinePushMsg(ctx, msgFromMQ.MsgData, msgFromMQ.UserIDs) - if err != nil { - log.ZWarn(ctx, "offline push failed", err, "msg", msgFromMQ.String()) - } - return - } - switch msgFromMQ.MsgData.SessionType { case constant.ReadGroupChatType: err = c.Push2Group(ctx, msgFromMQ.MsgData.GroupID, msgFromMQ.MsgData) @@ -111,6 +103,19 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { } } +func (c *ConsumerHandler) handleMsg2OfflinePush(ctx context.Context, msg []byte) { + offlinePushMsg := pbpush.PushMsgReq{} + if err := proto.Unmarshal(msg, &offlinePushMsg); err != nil { + log.ZError(ctx, "offline push Unmarshal msg err", err, "msg", string(msg)) + return + } + err := c.offlinePushMsg(ctx, offlinePushMsg.MsgData, offlinePushMsg.UserIDs) + if err != nil { + log.ZWarn(ctx, "offline push failed", err, "msg", offlinePushMsg.String()) + } + +} + func (*ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil } func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil } @@ -118,7 +123,12 @@ func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { ctx := c.pushConsumerGroup.GetContextFromMsg(msg) - c.handleMs2PsChat(ctx, msg.Value) + switch msg.Topic { + case c.config.KafkaConfig.ToPushTopic: + c.handleMs2PsChat(ctx, msg.Value) + case c.config.KafkaConfig.ToOfflinePushTopic: + c.handleMsg2OfflinePush(ctx, msg.Value) + } sess.MarkMessage(msg, "") } return nil @@ -262,18 +272,12 @@ func (c *ConsumerHandler) asyncOfflinePush(ctx context.Context, needOfflinePushU if len(offlinePushUserIDs) > 0 { needOfflinePushUserIDs = offlinePushUserIDs } - if err := c.msgDatabase.MsgToOfflinePushMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(msg.SendID, msg.RecvID), needOfflinePushUserIDs, msg); err != nil { + if err := c.pushDatabase.MsgToOfflinePushMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(msg.SendID, msg.RecvID), needOfflinePushUserIDs, msg); err != nil { prommetrics.SingleChatMsgProcessFailedCounter.Inc() return } } -func (c *ConsumerHandler) handleMsg2OfflinePush(ctx context.Context, needOfflinePushUserIDs []string, msg *sdkws.MsgData) { - if err := c.offlinePushMsg(ctx, msg, needOfflinePushUserIDs); err != nil { - log.ZWarn(ctx, "offlinePushMsg failed", err, "needOfflinePushUserIDs", needOfflinePushUserIDs, "msg", msg) - } -} - func (c *ConsumerHandler) groupMessagesHandler(ctx context.Context, groupID string, pushToUserIDs *[]string, msg *sdkws.MsgData) (err error) { if len(*pushToUserIDs) == 0 { *pushToUserIDs, err = c.groupLocalCache.GetGroupMemberIDs(ctx, groupID) diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 75b476841..80736d5cc 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -73,19 +73,21 @@ type Mongo struct { MaxRetry int `mapstructure:"maxRetry"` } type Kafka struct { - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - ProducerAck string `mapstructure:"producerAck"` - CompressType string `mapstructure:"compressType"` - Address []string `mapstructure:"address"` - ToRedisTopic string `mapstructure:"toRedisTopic"` - ToMongoTopic string `mapstructure:"toMongoTopic"` - ToPushTopic string `mapstructure:"toPushTopic"` - ToOfflinePushTopic string `mapstructure:"toOfflinePushTopic"` - ToRedisGroupID string `mapstructure:"toRedisGroupID"` - ToMongoGroupID string `mapstructure:"toMongoGroupID"` - ToPushGroupID string `mapstructure:"toPushGroupID"` - Tls TLSConfig `mapstructure:"tls"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + ProducerAck string `mapstructure:"producerAck"` + CompressType string `mapstructure:"compressType"` + Address []string `mapstructure:"address"` + ToRedisTopic string `mapstructure:"toRedisTopic"` + ToMongoTopic string `mapstructure:"toMongoTopic"` + ToPushTopic string `mapstructure:"toPushTopic"` + ToOfflinePushTopic string `mapstructure:"toOfflinePushTopic"` + ToRedisGroupID string `mapstructure:"toRedisGroupID"` + ToMongoGroupID string `mapstructure:"toMongoGroupID"` + ToPushGroupID string `mapstructure:"toPushGroupID"` + ToOfflineGroupID string `mapstructure:"toOfflinePushGroupID"` + + Tls TLSConfig `mapstructure:"tls"` } type TLSConfig struct { EnableTLS bool `mapstructure:"enableTLS"` diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index 594e5ac54..b08a939cb 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -30,7 +30,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/protocol/constant" pbmsg "github.com/openimsdk/protocol/msg" - "github.com/openimsdk/protocol/push" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" @@ -93,7 +92,6 @@ type CommonMsgDatabase interface { // to mq MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) - MsgToOfflinePushMQ(ctx context.Context, key string, userIDs []string, msg2mq *sdkws.MsgData) error MsgToMongoMQ(ctx context.Context, key, conversationID string, msgs []*sdkws.MsgData, lastSeq int64) error RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) @@ -124,32 +122,27 @@ func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser if err != nil { return nil, err } - producerToOfflinePush, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToOfflinePushTopic) - if err != nil { - return nil, err - } + return &commonMsgDatabase{ - msgDocDatabase: msgDocModel, - msg: msg, - seqUser: seqUser, - seqConversation: seqConversation, - producer: producerToRedis, - producerToMongo: producerToMongo, - producerToPush: producerToPush, - producerToOfflinePush: producerToOfflinePush, + msgDocDatabase: msgDocModel, + msg: msg, + seqUser: seqUser, + seqConversation: seqConversation, + producer: producerToRedis, + producerToMongo: producerToMongo, + producerToPush: producerToPush, }, nil } type commonMsgDatabase struct { - msgDocDatabase database.Msg - msgTable model.MsgDocModel - msg cache.MsgCache - seqConversation cache.SeqConversationCache - seqUser cache.SeqUser - producer *kafka.Producer - producerToMongo *kafka.Producer - producerToPush *kafka.Producer - producerToOfflinePush *kafka.Producer + msgDocDatabase database.Msg + msgTable model.MsgDocModel + msg cache.MsgCache + seqConversation cache.SeqConversationCache + seqUser cache.SeqUser + producer *kafka.Producer + producerToMongo *kafka.Producer + producerToPush *kafka.Producer } func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error { @@ -166,11 +159,6 @@ func (db *commonMsgDatabase) MsgToPushMQ(ctx context.Context, key, conversationI return partition, offset, nil } -func (db *commonMsgDatabase) MsgToOfflinePushMQ(ctx context.Context, key string, userIDs []string, msg2mq *sdkws.MsgData) error { - _, _, err := db.producerToOfflinePush.SendMessage(ctx, key, &push.PushMsgReq{MsgData: msg2mq, UserIDs: userIDs}) - return err -} - func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error { if len(messages) > 0 { _, _, err := db.producerToMongo.SendMessage(ctx, key, &pbmsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages}) diff --git a/pkg/common/storage/controller/push.go b/pkg/common/storage/controller/push.go index 199a0ba67..88b39d708 100644 --- a/pkg/common/storage/controller/push.go +++ b/pkg/common/storage/controller/push.go @@ -17,21 +17,41 @@ package controller import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" + "github.com/openimsdk/protocol/push" + "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/tools/mq/kafka" ) type PushDatabase interface { DelFcmToken(ctx context.Context, userID string, platformID int) error + MsgToOfflinePushMQ(ctx context.Context, key string, userIDs []string, msg2mq *sdkws.MsgData) error } type pushDataBase struct { - cache cache.ThirdCache + cache cache.ThirdCache + producerToOfflinePush *kafka.Producer } -func NewPushDatabase(cache cache.ThirdCache) PushDatabase { - return &pushDataBase{cache: cache} +func NewPushDatabase(cache cache.ThirdCache, kafkaConf *config.Kafka) PushDatabase { + conf, err := kafka.BuildProducerConfig(*kafkaConf.Build()) + if err != nil { + return nil + } + producerToOfflinePush, err := kafka.NewKafkaProducer(conf, kafkaConf.Address, kafkaConf.ToOfflinePushTopic) + if err != nil { + return nil + } + return &pushDataBase{cache: cache, + producerToOfflinePush: producerToOfflinePush} } func (p *pushDataBase) DelFcmToken(ctx context.Context, userID string, platformID int) error { return p.cache.DelFcmToken(ctx, userID, platformID) } + +func (p *pushDataBase) MsgToOfflinePushMQ(ctx context.Context, key string, userIDs []string, msg2mq *sdkws.MsgData) error { + _, _, err := p.producerToOfflinePush.SendMessage(ctx, key, &push.PushMsgReq{MsgData: msg2mq, UserIDs: userIDs}) + return err +}