feat: update Handler logic.

pull/2600/head
Monet Lee 1 year ago
parent 01b8a8726b
commit 15e5836fb8

@ -0,0 +1,115 @@
package push
import (
"context"
"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/protocol/constant"
pbpush "github.com/openimsdk/protocol/push"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mq/kafka"
"github.com/openimsdk/tools/utils/jsonutil"
"google.golang.org/protobuf/proto"
)
type OfflinePushConsumerHandler struct {
OfflinePushConsumerGroup *kafka.MConsumerGroup
offlinePusher offlinepush.OfflinePusher
}
func NewOfflinePushConsumerHandler(config *Config) (*OfflinePushConsumerHandler, error) {
var offlinePushConsumerHandler OfflinePushConsumerHandler
var err error
offlinePushConsumerHandler.OfflinePushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToOfflineGroupID,
[]string{config.KafkaConfig.ToOfflinePushTopic}, true)
if err != nil {
return nil, err
}
return &offlinePushConsumerHandler, nil
}
func (*OfflinePushConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil }
func (*OfflinePushConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (o *OfflinePushConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
ctx := o.OfflinePushConsumerGroup.GetContextFromMsg(msg)
o.handleMsg2OfflinePush(ctx, msg.Value)
sess.MarkMessage(msg, "")
}
return nil
}
func (o *OfflinePushConsumerHandler) handleMsg2OfflinePush(ctx context.Context, msg []byte) {
offlinePushMsg := pbpush.PushMsgReq{}
if err := proto.Unmarshal(msg, &offlinePushMsg); err != nil {
log.ZError(ctx, "offline push Unmarshal msg err", err, "msg", string(msg))
return
}
err := o.offlinePushMsg(ctx, offlinePushMsg.MsgData, offlinePushMsg.UserIDs)
if err != nil {
log.ZWarn(ctx, "offline push failed", err, "msg", offlinePushMsg.String())
}
}
func (c *OfflinePushConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (title, content string, opts *options.Opts, err error) {
type AtTextElem struct {
Text string `json:"text,omitempty"`
AtUserList []string `json:"atUserList,omitempty"`
IsAtSelf bool `json:"isAtSelf"`
}
opts = &options.Opts{Signal: &options.Signal{}}
if msg.OfflinePushInfo != nil {
opts.IOSBadgeCount = msg.OfflinePushInfo.IOSBadgeCount
opts.IOSPushSound = msg.OfflinePushInfo.IOSPushSound
opts.Ex = msg.OfflinePushInfo.Ex
}
if msg.OfflinePushInfo != nil {
title = msg.OfflinePushInfo.Title
content = msg.OfflinePushInfo.Desc
}
if title == "" {
switch msg.ContentType {
case constant.Text:
fallthrough
case constant.Picture:
fallthrough
case constant.Voice:
fallthrough
case constant.Video:
fallthrough
case constant.File:
title = constant.ContentType2PushContent[int64(msg.ContentType)]
case constant.AtText:
ac := AtTextElem{}
_ = jsonutil.JsonStringToStruct(string(msg.Content), &ac)
case constant.SignalingNotification:
title = constant.ContentType2PushContent[constant.SignalMsg]
default:
title = constant.ContentType2PushContent[constant.Common]
}
}
if content == "" {
content = title
}
return
}
func (c *OfflinePushConsumerHandler) offlinePushMsg(ctx context.Context, msg *sdkws.MsgData, offlinePushUserIDs []string) error {
title, content, opts, err := c.getOfflinePushInfos(msg)
if err != nil {
return err
}
err = c.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts)
if err != nil {
prommetrics.MsgOfflinePushFailedCounter.Inc()
return err
}
return nil
}

@ -18,6 +18,7 @@ type pushServer struct {
disCov discovery.SvcDiscoveryRegistry disCov discovery.SvcDiscoveryRegistry
offlinePusher offlinepush.OfflinePusher offlinePusher offlinepush.OfflinePusher
pushCh *ConsumerHandler pushCh *ConsumerHandler
offlinePushCh *OfflinePushConsumerHandler
} }
type Config struct { type Config struct {
@ -55,18 +56,30 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil { if err != nil {
return err return err
} }
database := controller.NewPushDatabase(cacheModel, &config.KafkaConfig) database := controller.NewPushDatabase(cacheModel, &config.KafkaConfig)
consumer, err := NewConsumerHandler(config, database, offlinePusher, rdb, client) consumer, err := NewConsumerHandler(config, database, offlinePusher, rdb, client)
if err != nil { if err != nil {
return err return err
} }
offlinePushConsumer, err := NewOfflinePushConsumerHandler(config)
if err != nil {
return err
}
pbpush.RegisterPushMsgServiceServer(server, &pushServer{ pbpush.RegisterPushMsgServiceServer(server, &pushServer{
database: database, database: database,
disCov: client, disCov: client,
offlinePusher: offlinePusher, offlinePusher: offlinePusher,
pushCh: consumer, pushCh: consumer,
offlinePushCh: offlinePushConsumer,
}) })
go consumer.pushConsumerGroup.RegisterHandleAndConsumer(ctx, consumer) go consumer.pushConsumerGroup.RegisterHandleAndConsumer(ctx, consumer)
go offlinePushConsumer.OfflinePushConsumerGroup.RegisterHandleAndConsumer(ctx, offlinePushConsumer)
return nil return nil
} }

@ -49,7 +49,7 @@ func NewConsumerHandler(config *Config, database controller.PushDatabase, offlin
var consumerHandler ConsumerHandler var consumerHandler ConsumerHandler
var err error var err error
consumerHandler.pushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToPushGroupID, consumerHandler.pushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToPushGroupID,
[]string{config.KafkaConfig.ToPushTopic, config.KafkaConfig.ToOfflinePushTopic}, true) []string{config.KafkaConfig.ToPushTopic}, true)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -103,19 +103,6 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
} }
} }
func (c *ConsumerHandler) handleMsg2OfflinePush(ctx context.Context, msg []byte) {
offlinePushMsg := pbpush.PushMsgReq{}
if err := proto.Unmarshal(msg, &offlinePushMsg); err != nil {
log.ZError(ctx, "offline push Unmarshal msg err", err, "msg", string(msg))
return
}
err := c.offlinePushMsg(ctx, offlinePushMsg.MsgData, offlinePushMsg.UserIDs)
if err != nil {
log.ZWarn(ctx, "offline push failed", err, "msg", offlinePushMsg.String())
}
}
func (*ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil } func (*ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil }
func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil } func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
@ -123,12 +110,7 @@ func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil
func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() { for msg := range claim.Messages() {
ctx := c.pushConsumerGroup.GetContextFromMsg(msg) ctx := c.pushConsumerGroup.GetContextFromMsg(msg)
switch msg.Topic { c.handleMs2PsChat(ctx, msg.Value)
case c.config.KafkaConfig.ToPushTopic:
c.handleMs2PsChat(ctx, msg.Value)
case c.config.KafkaConfig.ToOfflinePushTopic:
c.handleMsg2OfflinePush(ctx, msg.Value)
}
sess.MarkMessage(msg, "") sess.MarkMessage(msg, "")
} }
return nil return nil

@ -37,7 +37,6 @@ func NewPushRpcCmd() *PushRpcCmd {
ret.configMap = map[string]any{ ret.configMap = map[string]any{
OpenIMPushCfgFileName: &pushConfig.RpcConfig, OpenIMPushCfgFileName: &pushConfig.RpcConfig,
RedisConfigFileName: &pushConfig.RedisConfig, RedisConfigFileName: &pushConfig.RedisConfig,
MongodbConfigFileName: &pushConfig.MongodbConfig,
KafkaConfigFileName: &pushConfig.KafkaConfig, KafkaConfigFileName: &pushConfig.KafkaConfig,
ShareFileName: &pushConfig.Share, ShareFileName: &pushConfig.Share,
NotificationFileName: &pushConfig.NotificationConfig, NotificationFileName: &pushConfig.NotificationConfig,

Loading…
Cancel
Save