From 15e5836fb82d7cbdb95b7be003fac0099c9fcc67 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Tue, 10 Sep 2024 11:09:17 +0800 Subject: [PATCH] feat: update Handler logic. --- internal/push/offlinepush_handler.go | 115 +++++++++++++++++++++++++++ internal/push/push.go | 13 +++ internal/push/push_handler.go | 22 +---- pkg/common/cmd/push.go | 1 - 4 files changed, 130 insertions(+), 21 deletions(-) create mode 100644 internal/push/offlinepush_handler.go diff --git a/internal/push/offlinepush_handler.go b/internal/push/offlinepush_handler.go new file mode 100644 index 000000000..f2c0deb88 --- /dev/null +++ b/internal/push/offlinepush_handler.go @@ -0,0 +1,115 @@ +package push + +import ( + "context" + + "github.com/IBM/sarama" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" + "github.com/openimsdk/protocol/constant" + pbpush "github.com/openimsdk/protocol/push" + "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/mq/kafka" + "github.com/openimsdk/tools/utils/jsonutil" + "google.golang.org/protobuf/proto" +) + +type OfflinePushConsumerHandler struct { + OfflinePushConsumerGroup *kafka.MConsumerGroup + offlinePusher offlinepush.OfflinePusher +} + +func NewOfflinePushConsumerHandler(config *Config) (*OfflinePushConsumerHandler, error) { + var offlinePushConsumerHandler OfflinePushConsumerHandler + var err error + offlinePushConsumerHandler.OfflinePushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToOfflineGroupID, + []string{config.KafkaConfig.ToOfflinePushTopic}, true) + if err != nil { + return nil, err + } + return &offlinePushConsumerHandler, nil +} + +func (*OfflinePushConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil } +func (*OfflinePushConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil } +func (o *OfflinePushConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for msg := range claim.Messages() { + ctx := o.OfflinePushConsumerGroup.GetContextFromMsg(msg) + o.handleMsg2OfflinePush(ctx, msg.Value) + sess.MarkMessage(msg, "") + } + return nil +} + +func (o *OfflinePushConsumerHandler) handleMsg2OfflinePush(ctx context.Context, msg []byte) { + offlinePushMsg := pbpush.PushMsgReq{} + if err := proto.Unmarshal(msg, &offlinePushMsg); err != nil { + log.ZError(ctx, "offline push Unmarshal msg err", err, "msg", string(msg)) + return + } + err := o.offlinePushMsg(ctx, offlinePushMsg.MsgData, offlinePushMsg.UserIDs) + if err != nil { + log.ZWarn(ctx, "offline push failed", err, "msg", offlinePushMsg.String()) + } + +} + +func (c *OfflinePushConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (title, content string, opts *options.Opts, err error) { + type AtTextElem struct { + Text string `json:"text,omitempty"` + AtUserList []string `json:"atUserList,omitempty"` + IsAtSelf bool `json:"isAtSelf"` + } + + opts = &options.Opts{Signal: &options.Signal{}} + if msg.OfflinePushInfo != nil { + opts.IOSBadgeCount = msg.OfflinePushInfo.IOSBadgeCount + opts.IOSPushSound = msg.OfflinePushInfo.IOSPushSound + opts.Ex = msg.OfflinePushInfo.Ex + } + + if msg.OfflinePushInfo != nil { + title = msg.OfflinePushInfo.Title + content = msg.OfflinePushInfo.Desc + } + if title == "" { + switch msg.ContentType { + case constant.Text: + fallthrough + case constant.Picture: + fallthrough + case constant.Voice: + fallthrough + case constant.Video: + fallthrough + case constant.File: + title = constant.ContentType2PushContent[int64(msg.ContentType)] + case constant.AtText: + ac := AtTextElem{} + _ = jsonutil.JsonStringToStruct(string(msg.Content), &ac) + case constant.SignalingNotification: + title = constant.ContentType2PushContent[constant.SignalMsg] + default: + title = constant.ContentType2PushContent[constant.Common] + } + } + if content == "" { + content = title + } + return +} + +func (c *OfflinePushConsumerHandler) offlinePushMsg(ctx context.Context, msg *sdkws.MsgData, offlinePushUserIDs []string) error { + title, content, opts, err := c.getOfflinePushInfos(msg) + if err != nil { + return err + } + err = c.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts) + if err != nil { + prommetrics.MsgOfflinePushFailedCounter.Inc() + return err + } + return nil +} diff --git a/internal/push/push.go b/internal/push/push.go index 4513b6fa9..152c4ea96 100644 --- a/internal/push/push.go +++ b/internal/push/push.go @@ -18,6 +18,7 @@ type pushServer struct { disCov discovery.SvcDiscoveryRegistry offlinePusher offlinepush.OfflinePusher pushCh *ConsumerHandler + offlinePushCh *OfflinePushConsumerHandler } type Config struct { @@ -55,18 +56,30 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } + database := controller.NewPushDatabase(cacheModel, &config.KafkaConfig) consumer, err := NewConsumerHandler(config, database, offlinePusher, rdb, client) if err != nil { return err } + + offlinePushConsumer, err := NewOfflinePushConsumerHandler(config) + if err != nil { + return err + } + pbpush.RegisterPushMsgServiceServer(server, &pushServer{ database: database, disCov: client, offlinePusher: offlinePusher, pushCh: consumer, + offlinePushCh: offlinePushConsumer, }) + go consumer.pushConsumerGroup.RegisterHandleAndConsumer(ctx, consumer) + + go offlinePushConsumer.OfflinePushConsumerGroup.RegisterHandleAndConsumer(ctx, offlinePushConsumer) + return nil } diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index b478185ba..e4634c1d9 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -49,7 +49,7 @@ func NewConsumerHandler(config *Config, database controller.PushDatabase, offlin var consumerHandler ConsumerHandler var err error consumerHandler.pushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToPushGroupID, - []string{config.KafkaConfig.ToPushTopic, config.KafkaConfig.ToOfflinePushTopic}, true) + []string{config.KafkaConfig.ToPushTopic}, true) if err != nil { return nil, err } @@ -103,19 +103,6 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { } } -func (c *ConsumerHandler) handleMsg2OfflinePush(ctx context.Context, msg []byte) { - offlinePushMsg := pbpush.PushMsgReq{} - if err := proto.Unmarshal(msg, &offlinePushMsg); err != nil { - log.ZError(ctx, "offline push Unmarshal msg err", err, "msg", string(msg)) - return - } - err := c.offlinePushMsg(ctx, offlinePushMsg.MsgData, offlinePushMsg.UserIDs) - if err != nil { - log.ZWarn(ctx, "offline push failed", err, "msg", offlinePushMsg.String()) - } - -} - func (*ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil } func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil } @@ -123,12 +110,7 @@ func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { ctx := c.pushConsumerGroup.GetContextFromMsg(msg) - switch msg.Topic { - case c.config.KafkaConfig.ToPushTopic: - c.handleMs2PsChat(ctx, msg.Value) - case c.config.KafkaConfig.ToOfflinePushTopic: - c.handleMsg2OfflinePush(ctx, msg.Value) - } + c.handleMs2PsChat(ctx, msg.Value) sess.MarkMessage(msg, "") } return nil diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index c9b8b1c24..ca22a697d 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -37,7 +37,6 @@ func NewPushRpcCmd() *PushRpcCmd { ret.configMap = map[string]any{ OpenIMPushCfgFileName: &pushConfig.RpcConfig, RedisConfigFileName: &pushConfig.RedisConfig, - MongodbConfigFileName: &pushConfig.MongodbConfig, KafkaConfigFileName: &pushConfig.KafkaConfig, ShareFileName: &pushConfig.Share, NotificationFileName: &pushConfig.NotificationConfig,