From bf4de7e3ed270f0ba7a8825c361dc84e4b390a1a Mon Sep 17 00:00:00 2001 From: xiazc2 Date: Fri, 21 Apr 2023 14:51:42 +0800 Subject: [PATCH] feature: add rabbitmq receiver --- .../receiver/rabbit/RabbitMqReceiver.java | 54 +++++++++++++++++++ .../src/main/resources/application.properties | 2 + 2 files changed, 56 insertions(+) create mode 100644 austin-handler/src/main/java/com/java3y/austin/handler/receiver/rabbit/RabbitMqReceiver.java diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rabbit/RabbitMqReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rabbit/RabbitMqReceiver.java new file mode 100644 index 0000000..99dbaff --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rabbit/RabbitMqReceiver.java @@ -0,0 +1,54 @@ +package com.java3y.austin.handler.receiver.rabbit; + +import com.alibaba.fastjson.JSON; +import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.handler.receiver.service.ConsumeService; +import com.java3y.austin.support.constans.MessageQueuePipeline; +import com.java3y.austin.support.domain.MessageTemplate; +import org.apache.commons.lang3.StringUtils; +import org.springframework.amqp.core.ExchangeTypes; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.List; + + +/** + * @author xzcawl + * @date 23-04-21 10:53:32 + */ +@Component +@ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.RABBIT_MQ) +public class RabbitMqReceiver { + + @Autowired + private ConsumeService consumeService; + + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = "${spring.rabbitmq.queues}", durable = "true"), + exchange = @Exchange(value = "${austin.rabbitmq.exchange.name}", type = ExchangeTypes.TOPIC), + key = "${austin.rabbitmq.routing.key}" + )) + public void onMessage(Message message) { + String messageType = message.getMessageProperties().getHeader("messageType"); + byte[] body = message.getBody(); + String messageContent = new String(body); + if (StringUtils.isBlank(messageContent)) { + return; + } + if ("send".equals(messageType)) { + // 处理发送消息 + List taskInfoLists = JSON.parseArray(messageContent, TaskInfo.class); + consumeService.consume2Send(taskInfoLists); + } else if ("recall".equals(messageType)) { + // 处理撤回消息 + MessageTemplate messageTemplate = JSON.parseObject(messageContent, MessageTemplate.class); + consumeService.consume2recall(messageTemplate); + } + + } + +} diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index 80d7ee6..e5e7695 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -57,6 +57,8 @@ spring.rabbitmq.publisher-returns=true spring.rabbitmq.virtual-host=/ austin.rabbitmq.topic.name=austinRabbit austin.rabbitmq.exchange.name=austin.point +spring.rabbitmq.queues=austin_queues +austin.rabbitmq.routing.key=austin_KEY ########################################## RabbitMq end ##########################################