diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocketmq/RocketMqBizReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocketmq/RocketMqBizReceiver.java new file mode 100644 index 0000000..92e4efb --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocketmq/RocketMqBizReceiver.java @@ -0,0 +1,43 @@ +package com.java3y.austin.handler.receiver.rocketmq; + +import com.alibaba.fastjson.JSON; +import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.handler.receiver.service.ConsumeService; +import com.java3y.austin.support.constans.MessageQueuePipeline; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.annotation.SelectorType; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * Description: + * + * @author elpsycongroo + * create date: 2022/7/16 + */ +@Component +@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.ROCKET_MQ) +@RocketMQMessageListener(topic = "${austin.business.topic.name}", + consumerGroup = "${austin-rocketmq-biz-consumer-group}", + selectorType = SelectorType.TAG, + selectorExpression = "${austin.business.tagId.value}" +) +public class RocketMqBizReceiver implements RocketMQListener { + + @Autowired + private ConsumeService consumeService; + + @Override + public void onMessage(String message) { + if (StringUtils.isBlank(message)) { + return; + } + List taskInfoLists = JSON.parseArray(message, TaskInfo.class); + consumeService.consume2Send(taskInfoLists); + } +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocketmq/RocketMqRecallReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocketmq/RocketMqRecallReceiver.java new file mode 100644 index 0000000..aebacd0 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocketmq/RocketMqRecallReceiver.java @@ -0,0 +1,41 @@ +package com.java3y.austin.handler.receiver.rocketmq; + +import com.alibaba.fastjson.JSON; +import com.java3y.austin.handler.receiver.service.ConsumeService; +import com.java3y.austin.support.constans.MessageQueuePipeline; +import com.java3y.austin.support.domain.MessageTemplate; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.annotation.SelectorType; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * Description: + * + * @author elpsycongroo + * create date: 2022/7/16 + */ +@Component +@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.ROCKET_MQ) +@RocketMQMessageListener(topic = "${austin.business.recall.topic.name}", + consumerGroup = "${austin-rocketmq-recall-consumer-group}", + selectorType = SelectorType.TAG, + selectorExpression = "${austin.business.tagId.value}" +) +public class RocketMqRecallReceiver implements RocketMQListener { + + @Autowired + private ConsumeService consumeService; + + @Override + public void onMessage(String message) { + if (StringUtils.isBlank(message)) { + return; + } + MessageTemplate messageTemplate = JSON.parseObject(message, MessageTemplate.class); + consumeService.consume2recall(messageTemplate); + } +} diff --git a/austin-support/pom.xml b/austin-support/pom.xml index afc9ab8..29a3619 100644 --- a/austin-support/pom.xml +++ b/austin-support/pom.xml @@ -63,6 +63,11 @@ spring-kafka + + org.apache.rocketmq + rocketmq-spring-boot-starter + + org.springframework.boot spring-boot-starter-data-redis diff --git a/austin-support/src/main/java/com/java3y/austin/support/mq/rocketmq/RocketMqSendMqServiceImpl.java b/austin-support/src/main/java/com/java3y/austin/support/mq/rocketmq/RocketMqSendMqServiceImpl.java new file mode 100644 index 0000000..3749ee9 --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/mq/rocketmq/RocketMqSendMqServiceImpl.java @@ -0,0 +1,39 @@ +package com.java3y.austin.support.mq.rocketmq; + +import com.java3y.austin.support.constans.MessageQueuePipeline; +import com.java3y.austin.support.mq.SendMqService; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; + +/** + * Description: + * + * @author elpsycongroo + * create date: 2022/7/15 + */ +@Slf4j +@Service +@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.ROCKET_MQ) +public class RocketMqSendMqServiceImpl implements SendMqService { + + @Autowired + private RocketMQTemplate rocketMQTemplate; + + @Override + public void send(String topic, String jsonValue, String tagId) { + if (StringUtils.isNotBlank(tagId)) { + topic = topic + ":" + tagId; + } + send(topic, jsonValue); + } + + @Override + public void send(String topic, String jsonValue) { + rocketMQTemplate.send(topic, MessageBuilder.withPayload(jsonValue).build()); + } +} diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index f966799..6284a79 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -22,6 +22,12 @@ austin-mq-pipeline=eventbus austin-kafka-ip= austin-kafka-port= +# todo [rocketmq] 【optional】, if austin-mq-pipeline=rocketMq【must】 +austin-rocketmq-nameserver-ip=127.0.0.1 +austin-rocketmq-nameserver-port=9876 +austin-rocketmq-producer-group=unique-producer-group +austin-rocketmq-biz-consumer-group=unique-biz-consumer-group +austin-rocketmq-recall-consumer-group=unique-recall-consumer-group # todo [xxl-job] switch/ip/port/【optional】 xxl-job.enabled=false @@ -55,6 +61,10 @@ spring.kafka.consumer.auto.offset.reset=earliest spring.kafka.consumer.auto-commit-interval=1000 spring.kafka.consumer.enable-auto-commit=true +##################### rocketmq properties ##################### +rocketmq.name-server=${austin-rocketmq-nameserver-ip}:${austin-rocketmq-nameserver-port} +rocketmq.producer.group=${austin-rocketmq-producer-group} + ##################### Rabbit properties ##################### server.port=8080 spring.application.name=cl @@ -74,6 +84,7 @@ spring.rabbitmq.virtual-host=/ austin.rabbitmq.topic.name=austinRabbit austin.rabbitmq.exchange.name=austin.point + ##################### redis properties ##################### spring.redis.host=${austin-redis-ip} spring.redis.port=${austin-redis-port} diff --git a/pom.xml b/pom.xml index ea836cb..e69a5da 100644 --- a/pom.xml +++ b/pom.xml @@ -195,6 +195,12 @@ 2.0.0 + + org.apache.rocketmq + rocketmq-spring-boot-starter + 2.2.2 + +