feat:rocketmq

pull/13/head
Giorno 2 years ago
parent 07276acc24
commit ec8fa48c3f

@ -0,0 +1,18 @@
package com.java3y.austin.common.constant;
public class CommonConstant {
public final static String PERIOD = ".";
public final static String COMMA = ",";
public final static String COLON = ":";
public final static String SEMICOLON = ";";
public final static String POUND = "#";
public final static String SLASH = "/";
public final static String BACKSLASH = "\\";
public final static String EMPTY_STRING = "";
//
public final static String ONE = "1";
public final static String ZERO = "0";
public final static String MINUS_ONE = "-1";
public final static String YES = "Y";
public final static String NO = "N";
}

@ -0,0 +1,36 @@
package com.java3y.austin.handler.receiver.rocket;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.receiver.service.ConsumeService;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @program: austin
* @description:
* @author: Giorno
* @create: 2022-07-18
**/
@Component
@RocketMQMessageListener(topic = "${austin.business.topic.name}",
consumerGroup = "${austin.rocketmq.consumer.group}",
selectorType = SelectorType.TAG, selectorExpression = "${austin.business.tagId.value}")
@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.ROCKET_MQ)
public class RocketMqReceiver implements RocketMQListener<String> {
@Resource
private ConsumeService consumeService;
@Override
public void onMessage(String message) {
if (StrUtil.isBlank(message)) return;
consumeService.consume2Send(JSON.parseArray(message, TaskInfo.class));
}
}

@ -63,6 +63,11 @@
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>

@ -0,0 +1,50 @@
package com.java3y.austin.support.mq.rocket;
import cn.hutool.core.util.StrUtil;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import com.java3y.austin.support.mq.SendMqService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* @program: austin
* @description:
* @author: Giorno
* @create: 2022-07-16
**/
@Slf4j
@Service
@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.ROCKET_MQ)
public class RocketSendMqServiceImpl implements SendMqService {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Value("${austin.business.topic.name}")
private String sendMessageTopic;
@Override
public void send(String topic, String jsonValue, String tagId) {
if (StrUtil.isBlank(topic) || !sendMessageTopic.equals(topic)) {
log.error("RocketSendMqServiceImpl err:{}", topic);
return;
}
if (StrUtil.isBlank(tagId)) {
rocketMQTemplate.convertAndSend(topic, jsonValue);
return;
}
rocketMQTemplate.send(topic, MessageBuilder.withPayload(jsonValue)
.setHeader(RocketMQHeaders.TAGS, tagId)
.build());
}
@Override
public void send(String topic, String jsonValue) {
this.send(topic, jsonValue);
}
}

@ -59,6 +59,13 @@ spring.kafka.consumer.enable-auto-commit=true
austin.business.tagId.key=kafka_tag_id
austin.business.tagId.value=com.java3y.austin.3y
##################### rocket properties #####################
rocketmq.name-server=
rocketmq.producer.group=
rocketmq.producer.send-message-timeout=3000
rocketmq.producer.retry-times-when-send-async-failed=3
austin.rocketmq.consumer.group=
##################### redis properties #####################
spring.redis.host=${austin-redis-ip}
spring.redis.port=${austin-redis-port}

@ -153,6 +153,12 @@
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<!--微信服务号第三方SDK-->
<dependency>
<groupId>com.github.binarywang</groupId>
@ -191,5 +197,4 @@
</dependencyManagement>
</project>

Loading…
Cancel
Save