插拔式MQ设计

pull/9/head
3y 3 years ago
commit 88e88caec1

@ -63,18 +63,16 @@ austin项目**核心流程**`austin-api`接收到发送消息请求,直接
目前引用的中间件教程的安装姿势均基于`Centos 7.6`austin项目**强依赖**`MySQL`/`Redis`/`Kafka`(**大概需要4G内存**)**弱依赖**`prometheus`/`graylog`/`flink`/`xxl-job`/`apollo`(**完全部署所有的服务大概8G+内存**)。如果缺少相关的组件可戳:[安装相关组件教程](INSTALL.md)。 目前引用的中间件教程的安装姿势均基于`Centos 7.6`austin项目**强依赖**`MySQL`/`Redis`/`Kafka`(**大概需要4G内存**)**弱依赖**`prometheus`/`graylog`/`flink`/`xxl-job`/`apollo`(**完全部署所有的服务大概8G+内存**)。如果缺少相关的组件可戳:[安装相关组件教程](INSTALL.md)。
> 实在想要`clone`项目后不用自己部署环境直接在本地启动`debug`,我这提供了[会员服务](https://mp.weixin.qq.com/s?__biz=MzI4Njg5MDA5NA==&mid=2247505577&idx=1&sn=5114f8f583755899c2946fbea0b22e4b&chksm=ebd497a8dca31ebe8f98344483a00c860863dfc3586e51eed95b25988151427fee8101311f4f&token=735778370&lang=zh_CN#rd)**直连**部署好的服务器 > 实在想要`clone`项目后不用自己部署环境直接在本地启动`debug`,我这提供了[会员服务](https://mp.weixin.qq.com/s?__biz=MzI4Njg5MDA5NA==&mid=2247505577&idx=1&sn=5114f8f583755899c2946fbea0b22e4b&chksm=ebd497a8dca31ebe8f98344483a00c860863dfc3586e51eed95b25988151427fee8101311f4f&token=735778370&lang=zh_CN#rd)**直连**部署好的服务器
**1**、austin使用的MySQL版本**5.7x**。如果目前使用的MySQL版本8.0,注意改变`pom.xml`所依赖的版本 **1**、austin使用的MySQL版本**5.7x**。如果目前使用的MySQL版本8.0,注意改变`pom.xml`所依赖的版本
**2**、填写`application.properties`中`austin-database`对应的`ip/port/username/password`信息 **2**、填写`application.properties`中`austin-database`对应的`ip/port/username/password`信息
**3**、执行`sql`文件夹下的`austin.sql`创建对应的表以及插入测试数据 **3**、执行`sql`文件夹下的`austin.sql`创建对应的表以及插入测试数据
**4**、填写`application.properties`中`austin-kafka`对应的`ip`/`port`信息 **4**、如果配置`austin-mq-pipeline=kafka`,需要填写`application.properties`中`austin-kafka`对应的`ip`/`port`信息
**5**、填写`application.properties`中`austin-redis`对应的`ip`/`port`信息 **5**、填写`application.properties`中`austin-redis`对应的`ip`/`port`信息
@ -100,7 +98,7 @@ curl -XPOST "127.0.0.1:8080/send" -H 'Content-Type: application/json' -d '{"co
![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/01d028359e6e4229825a7fd8cc22c6c7~tplv-k3u1fbpfcp-zoom-1.image) ![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/01d028359e6e4229825a7fd8cc22c6c7~tplv-k3u1fbpfcp-zoom-1.image)
**12**、正常使用**数据管理**(查看实时数据链路下发)需要将`austin-stream`的`jar`包上传至`Flink`,根据[部署文档](INSTALL.md)启动Flink。在打`jar`包前需要填写`com.java3y.austin.stream.constants.AustinFlinkConstant`中的`redis`和`kafka`的`ip/port`日志的topic在`application.properties`中的`austin.business.log.topic.name`。如果没有该topic需要提前创建) **12**、正常使用**数据管理**(查看实时数据链路下发)需要将`austin-stream`的`jar`包上传至`Flink`,根据[部署文档](INSTALL.md)启动Flink。在打`jar`包前需要填写`com.java3y.austin.stream.constants.AustinFlinkConstant`中的`redis`和`kafka`的`ip/port`日志的topic在`application.properties`中的`austin.business.log.topic.name`。如果没有该topic需要提前创建并使用Kafka作为消息队列实现)
**13**、正常使用**定时任务**需要部署`xxl-job`,根据[部署文档](INSTALL.md)启动xxl的调度中心并在`application.properteis`中填写 `austin-xxl-job-ip`和`austin-xxl-job-port` **13**、正常使用**定时任务**需要部署`xxl-job`,根据[部署文档](INSTALL.md)启动xxl的调度中心并在`application.properteis`中填写 `austin-xxl-job-ip`和`austin-xxl-job-port`
@ -128,6 +126,25 @@ curl -XPOST "127.0.0.1:8080/send" -H 'Content-Type: application/json' -d '{"co
详情可以看戳:[我开通了付费渠道](https://mp.weixin.qq.com/s?__biz=MzI4Njg5MDA5NA==&mid=2247505577&idx=1&sn=5114f8f583755899c2946fbea0b22e4b&chksm=ebd497a8dca31ebe8f98344483a00c860863dfc3586e51eed95b25988151427fee8101311f4f&token=319992632&lang=zh_CN#rd) 详情可以看戳:[我开通了付费渠道](https://mp.weixin.qq.com/s?__biz=MzI4Njg5MDA5NA==&mid=2247505577&idx=1&sn=5114f8f583755899c2946fbea0b22e4b&chksm=ebd497a8dca31ebe8f98344483a00c860863dfc3586e51eed95b25988151427fee8101311f4f&token=319992632&lang=zh_CN#rd)
## 项目交流
由于austin项目交流群已经超过了两百人添加我的**个人微信**备注:【**项目**】,我空的时候会拉进项目交流群里
<img align="center" src='https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/60efe6b0f4354b838244b96a15efdf49~tplv-k3u1fbpfcp-watermark.image' width=300px height=300px />
## 如何准备面试?
**对线面试官**公众号持续更新**面试系列**文章对线面试官系列深受各大开发的好评已有不少的同学通过对线面试官系列得到BATTMD等一线大厂的的offer。一个**讲人话的面试系列**,八股文不再是背诵。
![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/c4a6cae132244355b9da6bd74d38d1ee~tplv-k3u1fbpfcp-zoom-1.image)
想要获取这份电子书,**点击关注**下方公众号,回复「**对线**」得到我的联系方式即可进群获取电子书
<img align="center" src='https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/f87f574e93964921a4d02146bf3ccdac~tplv-k3u1fbpfcp-zoom-1.image' width=300px height=300px />
## 里程碑 ## 里程碑
- [x] Maven+SpringBoot项目搭建 - [x] Maven+SpringBoot项目搭建
@ -159,27 +176,11 @@ curl -XPOST "127.0.0.1:8080/send" -H 'Content-Type: application/json' -d '{"co
- [x] 接入云片短信渠道,并短信支持流量配置,拉取腾讯云短信回执 - [x] 接入云片短信渠道,并短信支持流量配置,拉取腾讯云短信回执
- [x] 完成接入钉钉机器人渠道所有类型的消息 - [x] 完成接入钉钉机器人渠道所有类型的消息
- [x] 完成接入钉钉工作渠道所有类型的消息,包括对文件素材的上传功能 - [x] 完成接入钉钉工作渠道所有类型的消息,包括对文件素材的上传功能
- [x] Kafka消息支持tag过滤
- [x] MQ层支持可插拔默认使用eventbus单机队列Kafka变为弱依赖
- [ ] 总体架构已完成,持续做基础建设和优化代码 - [ ] 总体架构已完成,持续做基础建设和优化代码
**近期更新时间**6月27号 **近期更新时间**7月11号
**近期更新功能**:飞书机器人、企业微信机器人部分消息类型接入
## 项目交流
由于austin项目交流群已经超过了两百人添加我的**个人微信**备注:【**项目**】,我空的时候会拉进项目交流群里
<img align="center" src='https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/60efe6b0f4354b838244b96a15efdf49~tplv-k3u1fbpfcp-watermark.image' width=300px height=300px /> **近期更新功能**MQ层可插拔重构
## 如何准备面试?
**对线面试官**公众号持续更新**面试系列**文章对线面试官系列深受各大开发的好评已有不少的同学通过对线面试官系列得到BATTMD等一线大厂的的offer。一个**讲人话的面试系列**,八股文不再是背诵。
![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/c4a6cae132244355b9da6bd74d38d1ee~tplv-k3u1fbpfcp-zoom-1.image)
想要获取这份电子书,**点击关注**下方公众号,回复「**对线**」得到我的联系方式即可进群获取电子书
<img align="center" src='https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/f87f574e93964921a4d02146bf3ccdac~tplv-k3u1fbpfcp-zoom-1.image' width=300px height=300px />

@ -0,0 +1,37 @@
package com.java3y.austin.handler.receiver.eventbus;
import com.google.common.eventbus.Subscribe;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.receiver.service.ConsumeService;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import com.java3y.austin.support.domain.MessageTemplate;
import com.java3y.austin.support.mq.eventbus.EventBusListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author 3y
*/
@Component
@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.EVENT_BUS)
public class EventBusReceiver implements EventBusListener {
@Autowired
private ConsumeService consumeService;
@Override
@Subscribe
public void consume(List<TaskInfo> lists) {
consumeService.consume2Send(lists);
}
@Override
@Subscribe
public void recall(MessageTemplate messageTemplate) {
consumeService.consume2recall(messageTemplate);
}
}

@ -1,4 +1,4 @@
package com.java3y.austin.handler.receiver; package com.java3y.austin.handler.receiver.kafka;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
@ -9,7 +9,9 @@ import com.java3y.austin.common.enums.AnchorState;
import com.java3y.austin.handler.handler.HandlerHolder; import com.java3y.austin.handler.handler.HandlerHolder;
import com.java3y.austin.handler.pending.Task; import com.java3y.austin.handler.pending.Task;
import com.java3y.austin.handler.pending.TaskPendingHolder; import com.java3y.austin.handler.pending.TaskPendingHolder;
import com.java3y.austin.handler.receiver.service.ConsumeService;
import com.java3y.austin.handler.utils.GroupIdMappingUtils; import com.java3y.austin.handler.utils.GroupIdMappingUtils;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.domain.MessageTemplate;
import com.java3y.austin.support.utils.LogUtils; import com.java3y.austin.support.utils.LogUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -17,6 +19,7 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Scope; import org.springframework.context.annotation.Scope;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
@ -34,21 +37,10 @@ import java.util.Optional;
@Slf4j @Slf4j
@Component @Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.KAFKA)
public class Receiver { public class Receiver {
private static final String LOG_BIZ_TYPE = "Receiver#consumer";
private static final String LOG_BIZ_RECALL_TYPE = "Receiver#recall";
@Autowired @Autowired
private ApplicationContext context; private ConsumeService consumeService;
@Autowired
private TaskPendingHolder taskPendingHolder;
@Autowired
private LogUtils logUtils;
@Autowired
private HandlerHolder handlerHolder;
/** /**
* *
* *
@ -62,16 +54,11 @@ public class Receiver {
List<TaskInfo> taskInfoLists = JSON.parseArray(kafkaMessage.get(), TaskInfo.class); List<TaskInfo> taskInfoLists = JSON.parseArray(kafkaMessage.get(), TaskInfo.class);
String messageGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator())); String messageGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator()));
/** /**
* *
*/ */
if (topicGroupId.equals(messageGroupId)) { if (topicGroupId.equals(messageGroupId)) {
for (TaskInfo taskInfo : taskInfoLists) { consumeService.consume2Send(taskInfoLists);
logUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), AnchorInfo.builder().ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build());
Task task = context.getBean(Task.class).setTaskInfo(taskInfo);
taskPendingHolder.route(topicGroupId).execute(task);
}
} }
} }
} }
@ -85,8 +72,7 @@ public class Receiver {
Optional<String> kafkaMessage = Optional.ofNullable(consumerRecord.value()); Optional<String> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if(kafkaMessage.isPresent()){ if(kafkaMessage.isPresent()){
MessageTemplate messageTemplate = JSON.parseObject(kafkaMessage.get(), MessageTemplate.class); MessageTemplate messageTemplate = JSON.parseObject(kafkaMessage.get(), MessageTemplate.class);
logUtils.print(LogParam.builder().bizType(LOG_BIZ_RECALL_TYPE).object(messageTemplate).build()); consumeService.consume2recall(messageTemplate);
handlerHolder.route(messageTemplate.getSendChannel()).recall(messageTemplate);
} }
} }
} }

@ -1,12 +1,14 @@
package com.java3y.austin.handler.receiver; package com.java3y.austin.handler.receiver.kafka;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.java3y.austin.handler.utils.GroupIdMappingUtils; import com.java3y.austin.handler.utils.GroupIdMappingUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Header;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor; import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
@ -29,8 +31,10 @@ import java.util.Optional;
* @date 2021/12/4 * @date 2021/12/4
*/ */
@Service @Service
@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.KAFKA)
@Slf4j @Slf4j
public class ReceiverStart { public class ReceiverStart {
@Autowired @Autowired
private ApplicationContext context; private ApplicationContext context;
@Autowired @Autowired

@ -0,0 +1,32 @@
package com.java3y.austin.handler.receiver.service;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.support.domain.MessageTemplate;
import java.util.List;
/**
*
*
* @author 3y
*/
public interface ConsumeService {
/**
* MQ
*
* @param taskInfoLists
*/
void consume2Send(List<TaskInfo> taskInfoLists);
/**
* MQ
*
* @param messageTemplate
*/
void consume2recall(MessageTemplate messageTemplate);
}

@ -0,0 +1,56 @@
package com.java3y.austin.handler.receiver.service.impl;
import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.common.domain.AnchorInfo;
import com.java3y.austin.common.domain.LogParam;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.AnchorState;
import com.java3y.austin.handler.handler.HandlerHolder;
import com.java3y.austin.handler.pending.Task;
import com.java3y.austin.handler.pending.TaskPendingHolder;
import com.java3y.austin.handler.receiver.service.ConsumeService;
import com.java3y.austin.handler.utils.GroupIdMappingUtils;
import com.java3y.austin.support.domain.MessageTemplate;
import com.java3y.austin.support.utils.LogUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @author 3y
*/
@Service
public class ConsumeServiceImpl implements ConsumeService {
private static final String LOG_BIZ_TYPE = "Receiver#consumer";
private static final String LOG_BIZ_RECALL_TYPE = "Receiver#recall";
@Autowired
private ApplicationContext context;
@Autowired
private TaskPendingHolder taskPendingHolder;
@Autowired
private LogUtils logUtils;
@Autowired
private HandlerHolder handlerHolder;
@Override
public void consume2Send(List<TaskInfo> taskInfoLists) {
String topicGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator()));
for (TaskInfo taskInfo : taskInfoLists) {
logUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), AnchorInfo.builder().ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build());
Task task = context.getBean(Task.class).setTaskInfo(taskInfo);
taskPendingHolder.route(topicGroupId).execute(task);
}
}
@Override
public void consume2recall(MessageTemplate messageTemplate) {
logUtils.print(LogParam.builder().bizType(LOG_BIZ_RECALL_TYPE).object(messageTemplate).build());
handlerHolder.route(messageTemplate.getSendChannel()).recall(messageTemplate);
}
}

@ -4,10 +4,14 @@ import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature; import com.alibaba.fastjson.serializer.SerializerFeature;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.eventbus.EventBus;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.RespStatusEnum; import com.java3y.austin.common.enums.RespStatusEnum;
import com.java3y.austin.common.vo.BasicResultVO; import com.java3y.austin.common.vo.BasicResultVO;
import com.java3y.austin.service.api.enums.BusinessCode; import com.java3y.austin.service.api.enums.BusinessCode;
import com.java3y.austin.service.api.impl.domain.SendTaskModel; import com.java3y.austin.service.api.impl.domain.SendTaskModel;
import com.java3y.austin.support.mq.SendMqService;
import com.java3y.austin.support.mq.eventbus.EventBusListener;
import com.java3y.austin.support.pipeline.BusinessProcess; import com.java3y.austin.support.pipeline.BusinessProcess;
import com.java3y.austin.support.pipeline.ProcessContext; import com.java3y.austin.support.pipeline.ProcessContext;
import com.java3y.austin.support.utils.KafkaUtils; import com.java3y.austin.support.utils.KafkaUtils;
@ -16,6 +20,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.List;
/** /**
* @author 3y * @author 3y
* MQ * MQ
@ -24,28 +30,29 @@ import org.springframework.stereotype.Service;
@Service @Service
public class SendMqAction implements BusinessProcess<SendTaskModel> { public class SendMqAction implements BusinessProcess<SendTaskModel> {
@Autowired @Autowired
private KafkaUtils kafkaUtils; private SendMqService sendMqService;
@Value("${austin.business.topic.name}") @Value("${austin.business.topic.name}")
private String sendMessageTopic; private String sendMessageTopic;
@Value("${austin.business.recall.topic.name}") @Value("${austin.business.recall.topic.name}")
private String austinRecall; private String austinRecall;
@Value("${austin.business.tagId.value}") @Value("${austin.business.tagId.value}")
private String tagId; private String tagId;
@Override @Override
public void process(ProcessContext<SendTaskModel> context) { public void process(ProcessContext<SendTaskModel> context) {
SendTaskModel sendTaskModel = context.getProcessModel(); SendTaskModel sendTaskModel = context.getProcessModel();
try { try {
if (BusinessCode.COMMON_SEND.getCode().equals(context.getCode())) { if (BusinessCode.COMMON_SEND.getCode().equals(context.getCode())) {
String message = JSON.toJSONString(sendTaskModel.getTaskInfo(), new SerializerFeature[]{SerializerFeature.WriteClassName}); String message = JSON.toJSONString(sendTaskModel.getTaskInfo(), new SerializerFeature[]{SerializerFeature.WriteClassName});
kafkaUtils.send(sendMessageTopic, message, tagId); sendMqService.send(sendMessageTopic, message, tagId);
} else if (BusinessCode.RECALL.getCode().equals(context.getCode())) { } else if (BusinessCode.RECALL.getCode().equals(context.getCode())) {
String message = JSON.toJSONString(sendTaskModel.getMessageTemplate(), new SerializerFeature[]{SerializerFeature.WriteClassName}); String message = JSON.toJSONString(sendTaskModel.getMessageTemplate(), new SerializerFeature[]{SerializerFeature.WriteClassName});
kafkaUtils.send(austinRecall, message, tagId); sendMqService.send(austinRecall, message, tagId);
} }
} catch (Exception e) { } catch (Exception e) {
context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR)); context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR));
@ -53,4 +60,5 @@ public class SendMqAction implements BusinessProcess<SendTaskModel> {
, JSON.toJSONString(CollUtil.getFirst(sendTaskModel.getTaskInfo().listIterator()))); , JSON.toJSONString(CollUtil.getFirst(sendTaskModel.getTaskInfo().listIterator())));
} }
} }
} }

@ -0,0 +1,14 @@
package com.java3y.austin.support.constans;
/**
*
*
* @author 3y
*/
public interface MessageQueuePipeline {
String EVENT_BUS = "eventBus";
String KAFKA = "kafka";
String ROCKET_MQ = "rocketMq";
}

@ -0,0 +1,27 @@
package com.java3y.austin.support.mq;
/**
* @author 3y
*
*/
public interface SendMqService {
/**
*
*
* @param topic
* @param jsonValue
* @param tagId
*/
void send(String topic, String jsonValue, String tagId);
/**
*
*
* @param topic
* @param jsonValue
*/
void send(String topic, String jsonValue);
}

@ -0,0 +1,27 @@
package com.java3y.austin.support.mq.eventbus;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.support.domain.MessageTemplate;
import java.util.List;
/**
* @author 3y
*
*/
public interface EventBusListener {
/**
*
* @param lists
*/
void consume(List<TaskInfo> lists);
/**
*
* @param messageTemplate
*/
void recall(MessageTemplate messageTemplate);
}

@ -0,0 +1,52 @@
package com.java3y.austin.support.mq.eventbus;
import com.alibaba.fastjson.JSON;
import com.google.common.eventbus.EventBus;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import com.java3y.austin.support.domain.MessageTemplate;
import com.java3y.austin.support.mq.SendMqService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
/**
* @author 3y
* EventBus
*/
@Slf4j
@Service
@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.EVENT_BUS)
public class EventBusSendMqServiceImpl implements SendMqService {
private EventBus eventBus = new EventBus();
@Autowired
private EventBusListener eventBusListener;
@Value("${austin.business.topic.name}")
private String sendTopic;
@Value("${austin.business.recall.topic.name}")
private String recallTopic;
/**
* tagId
* @param topic
* @param jsonValue
* @param tagId
*/
@Override
public void send(String topic, String jsonValue, String tagId) {
eventBus.register(eventBusListener);
if (topic.equals(sendTopic)) {
eventBus.post(JSON.parseArray(jsonValue, TaskInfo.class));
} else if (topic.equals(recallTopic)) {
eventBus.post(JSON.parseObject(jsonValue, MessageTemplate.class));
}
}
@Override
public void send(String topic, String jsonValue) {
send(topic, jsonValue, null);
}
}

@ -0,0 +1,50 @@
package com.java3y.austin.support.mq.kafka;
import cn.hutool.core.util.StrUtil;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import com.java3y.austin.support.mq.SendMqService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
/**
* @author 3y
* kafka
*/
@Slf4j
@Service
@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.KAFKA)
public class KafkaSendMqServiceImpl implements SendMqService {
@Autowired
private KafkaTemplate kafkaTemplate;
@Value("${austin.business.tagId.key}")
private String tagIdKey;
@Override
public void send(String topic, String jsonValue, String tagId) {
if (StrUtil.isNotBlank(tagId)) {
List<Header> headers = Arrays.asList(new RecordHeader(tagIdKey, tagId.getBytes(StandardCharsets.UTF_8)));
kafkaTemplate.send(new ProducerRecord(topic, null, null, null, jsonValue, headers));
} else {
kafkaTemplate.send(topic, jsonValue);
}
}
@Override
public void send(String topic, String jsonValue) {
send(topic, jsonValue, null);
}
}

@ -8,15 +8,19 @@ austin-database-port=5004
austin-database-username=root austin-database-username=root
austin-database-password=root123_A austin-database-password=root123_A
# todo [kafka] ip/port【must】
austin-kafka-ip=austin.kafka
austin-kafka-port=9092
# todo [redis] ip/port/password【must】 # todo [redis] ip/port/password【must】
austin-redis-ip=austin.redis austin-redis-ip=austin.redis
austin-redis-port=5003 austin-redis-port=5003
austin-redis-password=austin austin-redis-password=austin
# TODO kafka/eventbus
austin-mq-pipeline=eventbus
# todo [kafka] ip/port【optional】, if austin-mq-pipeline=kafka 【must】
austin-kafka-ip=austin.kafka
austin-kafka-port=9092
# todo [xxl-job] switch/ip/port/【optional】 # todo [xxl-job] switch/ip/port/【optional】
xxl-job.enabled=false xxl-job.enabled=false
austin-xxl-job-ip=127.0.0.1 austin-xxl-job-ip=127.0.0.1

Loading…
Cancel
Save