From ec8fa48c3f8279efa27fbd6456b5c0351da5a4fb Mon Sep 17 00:00:00 2001 From: Giorno Date: Mon, 18 Jul 2022 23:52:09 +0800 Subject: [PATCH] feat:rocketmq --- .../common/constant/CommonConstant.java | 18 +++++++ .../receiver/rocket/RocketMqReceiver.java | 36 +++++++++++++ austin-support/pom.xml | 5 ++ .../mq/rocket/RocketSendMqServiceImpl.java | 50 +++++++++++++++++++ .../src/main/resources/application.properties | 7 +++ pom.xml | 7 ++- 6 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 austin-common/src/main/java/com/java3y/austin/common/constant/CommonConstant.java create mode 100644 austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocket/RocketMqReceiver.java create mode 100644 austin-support/src/main/java/com/java3y/austin/support/mq/rocket/RocketSendMqServiceImpl.java diff --git a/austin-common/src/main/java/com/java3y/austin/common/constant/CommonConstant.java b/austin-common/src/main/java/com/java3y/austin/common/constant/CommonConstant.java new file mode 100644 index 0000000..ffc9088 --- /dev/null +++ b/austin-common/src/main/java/com/java3y/austin/common/constant/CommonConstant.java @@ -0,0 +1,18 @@ +package com.java3y.austin.common.constant; + +public class CommonConstant { + public final static String PERIOD = "."; + public final static String COMMA = ","; + public final static String COLON = ":"; + public final static String SEMICOLON = ";"; + public final static String POUND = "#"; + public final static String SLASH = "/"; + public final static String BACKSLASH = "\\"; + public final static String EMPTY_STRING = ""; + // + public final static String ONE = "1"; + public final static String ZERO = "0"; + public final static String MINUS_ONE = "-1"; + public final static String YES = "Y"; + public final static String NO = "N"; +} \ No newline at end of file diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocket/RocketMqReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocket/RocketMqReceiver.java new file mode 100644 index 0000000..c8d7625 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocket/RocketMqReceiver.java @@ -0,0 +1,36 @@ +package com.java3y.austin.handler.receiver.rocket; + +import cn.hutool.core.util.StrUtil; +import com.alibaba.fastjson.JSON; +import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.handler.receiver.service.ConsumeService; +import com.java3y.austin.support.constans.MessageQueuePipeline; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.annotation.SelectorType; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * @program: austin + * @description: + * @author: Giorno + * @create: 2022-07-18 + **/ +@Component +@RocketMQMessageListener(topic = "${austin.business.topic.name}", + consumerGroup = "${austin.rocketmq.consumer.group}", + selectorType = SelectorType.TAG, selectorExpression = "${austin.business.tagId.value}") +@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.ROCKET_MQ) +public class RocketMqReceiver implements RocketMQListener { + @Resource + private ConsumeService consumeService; + + @Override + public void onMessage(String message) { + if (StrUtil.isBlank(message)) return; + consumeService.consume2Send(JSON.parseArray(message, TaskInfo.class)); + } +} diff --git a/austin-support/pom.xml b/austin-support/pom.xml index 96ae2e0..3f9a4ec 100644 --- a/austin-support/pom.xml +++ b/austin-support/pom.xml @@ -63,6 +63,11 @@ spring-kafka + + org.apache.rocketmq + rocketmq-spring-boot-starter + + org.springframework.boot spring-boot-starter-data-redis diff --git a/austin-support/src/main/java/com/java3y/austin/support/mq/rocket/RocketSendMqServiceImpl.java b/austin-support/src/main/java/com/java3y/austin/support/mq/rocket/RocketSendMqServiceImpl.java new file mode 100644 index 0000000..c6709ad --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/mq/rocket/RocketSendMqServiceImpl.java @@ -0,0 +1,50 @@ +package com.java3y.austin.support.mq.rocket; + +import cn.hutool.core.util.StrUtil; +import com.java3y.austin.support.constans.MessageQueuePipeline; +import com.java3y.austin.support.mq.SendMqService; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.apache.rocketmq.spring.support.RocketMQHeaders; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * @program: austin + * @description: + * @author: Giorno + * @create: 2022-07-16 + **/ +@Slf4j +@Service +@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.ROCKET_MQ) +public class RocketSendMqServiceImpl implements SendMqService { + @Resource + private RocketMQTemplate rocketMQTemplate; + @Value("${austin.business.topic.name}") + private String sendMessageTopic; + + @Override + public void send(String topic, String jsonValue, String tagId) { + if (StrUtil.isBlank(topic) || !sendMessageTopic.equals(topic)) { + log.error("RocketSendMqServiceImpl err:{}", topic); + return; + } + if (StrUtil.isBlank(tagId)) { + rocketMQTemplate.convertAndSend(topic, jsonValue); + return; + } + rocketMQTemplate.send(topic, MessageBuilder.withPayload(jsonValue) + .setHeader(RocketMQHeaders.TAGS, tagId) + .build()); + } + + @Override + public void send(String topic, String jsonValue) { + this.send(topic, jsonValue); + } +} diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index ced6182..9c507d1 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -59,6 +59,13 @@ spring.kafka.consumer.enable-auto-commit=true austin.business.tagId.key=kafka_tag_id austin.business.tagId.value=com.java3y.austin.3y +##################### rocket properties ##################### +rocketmq.name-server= +rocketmq.producer.group= +rocketmq.producer.send-message-timeout=3000 +rocketmq.producer.retry-times-when-send-async-failed=3 +austin.rocketmq.consumer.group= + ##################### redis properties ##################### spring.redis.host=${austin-redis-ip} spring.redis.port=${austin-redis-port} diff --git a/pom.xml b/pom.xml index 49b1152..62b6fe2 100644 --- a/pom.xml +++ b/pom.xml @@ -153,6 +153,12 @@ ${flink.version} + + org.apache.rocketmq + rocketmq-spring-boot-starter + 2.1.0 + + com.github.binarywang @@ -191,5 +197,4 @@ -