Merge branch 'vip' of github.com:ZhongFuCheng3y/austin

pull/11/head
3y 2 years ago
commit e127e1d9a6

@ -0,0 +1,43 @@
package com.java3y.austin.handler.receiver.rocketmq;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.receiver.service.ConsumeService;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Description:
*
* @author elpsycongroo
* create date: 2022/7/16
*/
@Component
@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.ROCKET_MQ)
@RocketMQMessageListener(topic = "${austin.business.topic.name}",
consumerGroup = "${austin-rocketmq-biz-consumer-group}",
selectorType = SelectorType.TAG,
selectorExpression = "${austin.business.tagId.value}"
)
public class RocketMqBizReceiver implements RocketMQListener<String> {
@Autowired
private ConsumeService consumeService;
@Override
public void onMessage(String message) {
if (StringUtils.isBlank(message)) {
return;
}
List<TaskInfo> taskInfoLists = JSON.parseArray(message, TaskInfo.class);
consumeService.consume2Send(taskInfoLists);
}
}

@ -0,0 +1,41 @@
package com.java3y.austin.handler.receiver.rocketmq;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.handler.receiver.service.ConsumeService;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import com.java3y.austin.support.domain.MessageTemplate;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* Description:
*
* @author elpsycongroo
* create date: 2022/7/16
*/
@Component
@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.ROCKET_MQ)
@RocketMQMessageListener(topic = "${austin.business.recall.topic.name}",
consumerGroup = "${austin-rocketmq-recall-consumer-group}",
selectorType = SelectorType.TAG,
selectorExpression = "${austin.business.tagId.value}"
)
public class RocketMqRecallReceiver implements RocketMQListener<String> {
@Autowired
private ConsumeService consumeService;
@Override
public void onMessage(String message) {
if (StringUtils.isBlank(message)) {
return;
}
MessageTemplate messageTemplate = JSON.parseObject(message, MessageTemplate.class);
consumeService.consume2recall(messageTemplate);
}
}

@ -63,6 +63,11 @@
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>

@ -0,0 +1,39 @@
package com.java3y.austin.support.mq.rocketmq;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import com.java3y.austin.support.mq.SendMqService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
/**
* Description:
*
* @author elpsycongroo
* create date: 2022/7/15
*/
@Slf4j
@Service
@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.ROCKET_MQ)
public class RocketMqSendMqServiceImpl implements SendMqService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void send(String topic, String jsonValue, String tagId) {
if (StringUtils.isNotBlank(tagId)) {
topic = topic + ":" + tagId;
}
send(topic, jsonValue);
}
@Override
public void send(String topic, String jsonValue) {
rocketMQTemplate.send(topic, MessageBuilder.withPayload(jsonValue).build());
}
}

@ -22,6 +22,12 @@ austin-mq-pipeline=eventbus
austin-kafka-ip=
austin-kafka-port=
# todo [rocketmq] 【optional】, if austin-mq-pipeline=rocketMq【must】
austin-rocketmq-nameserver-ip=127.0.0.1
austin-rocketmq-nameserver-port=9876
austin-rocketmq-producer-group=unique-producer-group
austin-rocketmq-biz-consumer-group=unique-biz-consumer-group
austin-rocketmq-recall-consumer-group=unique-recall-consumer-group
# todo [xxl-job] switch/ip/port/【optional】
xxl-job.enabled=false
@ -55,6 +61,10 @@ spring.kafka.consumer.auto.offset.reset=earliest
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.enable-auto-commit=true
##################### rocketmq properties #####################
rocketmq.name-server=${austin-rocketmq-nameserver-ip}:${austin-rocketmq-nameserver-port}
rocketmq.producer.group=${austin-rocketmq-producer-group}
##################### Rabbit properties #####################
server.port=8080
spring.application.name=cl
@ -74,6 +84,7 @@ spring.rabbitmq.virtual-host=/
austin.rabbitmq.topic.name=austinRabbit
austin.rabbitmq.exchange.name=austin.point
##################### redis properties #####################
spring.redis.host=${austin-redis-ip}
spring.redis.port=${austin-redis-port}

@ -195,6 +195,12 @@
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
</dependencies>
</dependencyManagement>

Loading…
Cancel
Save