You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Open-IM-Server/internal/push/push_handler.go

68 lines
2.3 KiB

2 years ago
package push
4 years ago
import (
2 years ago
"context"
2 years ago
2 years ago
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
kfk "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
pbChat "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
pbPush "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
4 years ago
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
)
2 years ago
type ConsumerHandler struct {
4 years ago
pushConsumerGroup *kfk.MConsumerGroup
2 years ago
pusher *Pusher
4 years ago
}
2 years ago
func NewConsumerHandler(pusher *Pusher) *ConsumerHandler {
var consumerHandler ConsumerHandler
consumerHandler.pusher = pusher
consumerHandler.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
4 years ago
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ms2pschat.Topic}, config.Config.Kafka.Ms2pschat.Addr,
config.Config.Kafka.ConsumerGroupID.MsgToPush)
2 years ago
return &consumerHandler
4 years ago
}
2 years ago
2 years ago
func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
msgFromMQ := pbChat.PushMsgDataToMQ{}
if err := proto.Unmarshal(msg, &msgFromMQ); err != nil {
2 years ago
log.ZError(ctx, "push Unmarshal msg err", err, "msg", string(msg))
4 years ago
return
}
2 years ago
pbData := &pbPush.PushMsgReq{
2 years ago
MsgData: msgFromMQ.MsgData,
2 years ago
SourceID: msgFromMQ.SourceID,
2 years ago
}
2 years ago
sec := msgFromMQ.MsgData.SendTime / 1000
nowSec := utils.GetCurrentTimestampBySecond()
if nowSec-sec > 10 {
return
}
2 years ago
var err error
2 years ago
switch msgFromMQ.MsgData.SessionType {
case constant.SuperGroupChatType:
2 years ago
err = c.pusher.MsgToSuperGroupUser(ctx, pbData.SourceID, pbData.MsgData)
2 years ago
default:
2 years ago
err = c.pusher.MsgToUser(ctx, pbData.SourceID, pbData.MsgData)
}
if err != nil {
2 years ago
log.ZError(ctx, "push failed", err, "msg", pbData.String())
2 years ago
}
4 years ago
}
2 years ago
func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
4 years ago
claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
2 years ago
ctx := c.pushConsumerGroup.GetContextFromMsg(msg)
2 years ago
c.handleMs2PsChat(ctx, msg.Value)
2 years ago
sess.MarkMessage(msg, "")
4 years ago
}
return nil
}