diff --git a/README.md b/README.md index 1563b3b..5dd3540 100644 --- a/README.md +++ b/README.md @@ -63,18 +63,16 @@ austin项目**核心流程**:`austin-api`接收到发送消息请求,直接 目前引用的中间件教程的安装姿势均基于`Centos 7.6`,austin项目**强依赖**`MySQL`/`Redis`/`Kafka`(**大概需要4G内存**),**弱依赖**`prometheus`/`graylog`/`flink`/`xxl-job`/`apollo`(**完全部署所有的服务,大概8G+内存**)。如果缺少相关的组件可戳:[安装相关组件教程](INSTALL.md)。 - > 实在想要`clone`项目后不用自己部署环境直接在本地启动`debug`,我这提供了[会员服务](https://mp.weixin.qq.com/s?__biz=MzI4Njg5MDA5NA==&mid=2247505577&idx=1&sn=5114f8f583755899c2946fbea0b22e4b&chksm=ebd497a8dca31ebe8f98344483a00c860863dfc3586e51eed95b25988151427fee8101311f4f&token=735778370&lang=zh_CN#rd),**直连**部署好的服务器 - **1**、austin使用的MySQL版本**5.7x**。如果目前使用的MySQL版本8.0,注意改变`pom.xml`所依赖的版本 **2**、填写`application.properties`中`austin-database`对应的`ip/port/username/password`信息 **3**、执行`sql`文件夹下的`austin.sql`创建对应的表以及插入测试数据 -**4**、填写`application.properties`中`austin-kafka`对应的`ip`/`port`信息 +**4**、如果配置`austin-mq-pipeline=kafka`,需要填写`application.properties`中`austin-kafka`对应的`ip`/`port`信息 **5**、填写`application.properties`中`austin-redis`对应的`ip`/`port`信息 @@ -100,7 +98,7 @@ curl -XPOST "127.0.0.1:8080/send" -H 'Content-Type: application/json' -d '{"co ![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/01d028359e6e4229825a7fd8cc22c6c7~tplv-k3u1fbpfcp-zoom-1.image) -**12**、正常使用**数据管理**(查看实时数据链路下发)需要将`austin-stream`的`jar`包上传至`Flink`,根据[部署文档](INSTALL.md)启动Flink。在打`jar`包前需要填写`com.java3y.austin.stream.constants.AustinFlinkConstant`中的`redis`和`kafka`的`ip/port`(注:日志的topic在`application.properties`中的`austin.business.log.topic.name`。如果没有该topic,需要提前创建) +**12**、正常使用**数据管理**(查看实时数据链路下发)需要将`austin-stream`的`jar`包上传至`Flink`,根据[部署文档](INSTALL.md)启动Flink。在打`jar`包前需要填写`com.java3y.austin.stream.constants.AustinFlinkConstant`中的`redis`和`kafka`的`ip/port`(注:日志的topic在`application.properties`中的`austin.business.log.topic.name`。如果没有该topic,需要提前创建,并使用Kafka作为消息队列实现) **13**、正常使用**定时任务**需要部署`xxl-job`,根据[部署文档](INSTALL.md)启动xxl的调度中心,并在`application.properteis`中填写 `austin-xxl-job-ip`和`austin-xxl-job-port` @@ -128,6 +126,25 @@ curl -XPOST "127.0.0.1:8080/send" -H 'Content-Type: application/json' -d '{"co 详情可以看戳:[我开通了付费渠道](https://mp.weixin.qq.com/s?__biz=MzI4Njg5MDA5NA==&mid=2247505577&idx=1&sn=5114f8f583755899c2946fbea0b22e4b&chksm=ebd497a8dca31ebe8f98344483a00c860863dfc3586e51eed95b25988151427fee8101311f4f&token=319992632&lang=zh_CN#rd) + +## 项目交流 + +由于austin项目交流群已经超过了两百人,添加我的**个人微信**备注:【**项目**】,我空的时候会拉进项目交流群里 + + + + +## 如何准备面试? + +**对线面试官**公众号持续更新**面试系列**文章(对线面试官系列),深受各大开发的好评,已有不少的同学通过对线面试官系列得到BATTMD等一线大厂的的offer。一个**讲人话的面试系列**,八股文不再是背诵。 + + +![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/c4a6cae132244355b9da6bd74d38d1ee~tplv-k3u1fbpfcp-zoom-1.image) + +想要获取这份电子书,**点击关注**下方公众号,回复「**对线**」得到我的联系方式即可进群获取电子书 + + + ## 里程碑 - [x] Maven+SpringBoot项目搭建 @@ -159,27 +176,11 @@ curl -XPOST "127.0.0.1:8080/send" -H 'Content-Type: application/json' -d '{"co - [x] 接入云片短信渠道,并短信支持流量配置,拉取腾讯云短信回执 - [x] 完成接入钉钉机器人渠道所有类型的消息 - [x] 完成接入钉钉工作渠道所有类型的消息,包括对文件素材的上传功能 +- [x] Kafka消息支持tag过滤 +- [x] MQ层支持可插拔,默认使用eventbus单机队列,Kafka变为弱依赖 - [ ] 总体架构已完成,持续做基础建设和优化代码 -**近期更新时间**:6月27号 - -**近期更新功能**:飞书机器人、企业微信机器人部分消息类型接入 - -## 项目交流 - -由于austin项目交流群已经超过了两百人,添加我的**个人微信**备注:【**项目**】,我空的时候会拉进项目交流群里 - - - - -## 如何准备面试? - -**对线面试官**公众号持续更新**面试系列**文章(对线面试官系列),深受各大开发的好评,已有不少的同学通过对线面试官系列得到BATTMD等一线大厂的的offer。一个**讲人话的面试系列**,八股文不再是背诵。 - - -![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/c4a6cae132244355b9da6bd74d38d1ee~tplv-k3u1fbpfcp-zoom-1.image) - -想要获取这份电子书,**点击关注**下方公众号,回复「**对线**」得到我的联系方式即可进群获取电子书 +**近期更新时间**:7月11号 - \ No newline at end of file +**近期更新功能**:MQ层可插拔重构 \ No newline at end of file diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/eventbus/EventBusReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/eventbus/EventBusReceiver.java new file mode 100644 index 0000000..fdde8e7 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/eventbus/EventBusReceiver.java @@ -0,0 +1,37 @@ +package com.java3y.austin.handler.receiver.eventbus; + +import com.google.common.eventbus.Subscribe; +import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.handler.receiver.service.ConsumeService; +import com.java3y.austin.support.constans.MessageQueuePipeline; +import com.java3y.austin.support.domain.MessageTemplate; +import com.java3y.austin.support.mq.eventbus.EventBusListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @author 3y + */ +@Component +@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.EVENT_BUS) +public class EventBusReceiver implements EventBusListener { + + @Autowired + private ConsumeService consumeService; + + @Override + @Subscribe + public void consume(List lists) { + consumeService.consume2Send(lists); + + } + + @Override + @Subscribe + public void recall(MessageTemplate messageTemplate) { + consumeService.consume2recall(messageTemplate); + } +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/Receiver.java similarity index 72% rename from austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java rename to austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/Receiver.java index 425f8c3..7826773 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/Receiver.java @@ -1,4 +1,4 @@ -package com.java3y.austin.handler.receiver; +package com.java3y.austin.handler.receiver.kafka; import cn.hutool.core.collection.CollUtil; import com.alibaba.fastjson.JSON; @@ -9,7 +9,9 @@ import com.java3y.austin.common.enums.AnchorState; import com.java3y.austin.handler.handler.HandlerHolder; import com.java3y.austin.handler.pending.Task; import com.java3y.austin.handler.pending.TaskPendingHolder; +import com.java3y.austin.handler.receiver.service.ConsumeService; import com.java3y.austin.handler.utils.GroupIdMappingUtils; +import com.java3y.austin.support.constans.MessageQueuePipeline; import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.utils.LogUtils; import lombok.extern.slf4j.Slf4j; @@ -17,6 +19,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Scope; import org.springframework.kafka.annotation.KafkaListener; @@ -34,21 +37,10 @@ import java.util.Optional; @Slf4j @Component @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.KAFKA) public class Receiver { - private static final String LOG_BIZ_TYPE = "Receiver#consumer"; - private static final String LOG_BIZ_RECALL_TYPE = "Receiver#recall"; @Autowired - private ApplicationContext context; - - @Autowired - private TaskPendingHolder taskPendingHolder; - - @Autowired - private LogUtils logUtils; - - @Autowired - private HandlerHolder handlerHolder; - + private ConsumeService consumeService; /** * 发送消息 * @@ -62,16 +54,11 @@ public class Receiver { List taskInfoLists = JSON.parseArray(kafkaMessage.get(), TaskInfo.class); String messageGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator())); - /** * 每个消费者组 只消费 他们自身关心的消息 */ if (topicGroupId.equals(messageGroupId)) { - for (TaskInfo taskInfo : taskInfoLists) { - logUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), AnchorInfo.builder().ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build()); - Task task = context.getBean(Task.class).setTaskInfo(taskInfo); - taskPendingHolder.route(topicGroupId).execute(task); - } + consumeService.consume2Send(taskInfoLists); } } } @@ -85,8 +72,7 @@ public class Receiver { Optional kafkaMessage = Optional.ofNullable(consumerRecord.value()); if(kafkaMessage.isPresent()){ MessageTemplate messageTemplate = JSON.parseObject(kafkaMessage.get(), MessageTemplate.class); - logUtils.print(LogParam.builder().bizType(LOG_BIZ_RECALL_TYPE).object(messageTemplate).build()); - handlerHolder.route(messageTemplate.getSendChannel()).recall(messageTemplate); + consumeService.consume2recall(messageTemplate); } } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/ReceiverStart.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/ReceiverStart.java similarity index 92% rename from austin-handler/src/main/java/com/java3y/austin/handler/receiver/ReceiverStart.java rename to austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/ReceiverStart.java index 754fb43..01887ae 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/ReceiverStart.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/ReceiverStart.java @@ -1,12 +1,14 @@ -package com.java3y.austin.handler.receiver; +package com.java3y.austin.handler.receiver.kafka; import com.alibaba.fastjson.JSON; import com.java3y.austin.handler.utils.GroupIdMappingUtils; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; +import com.java3y.austin.support.constans.MessageQueuePipeline; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor; @@ -29,8 +31,10 @@ import java.util.Optional; * @date 2021/12/4 */ @Service +@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.KAFKA) @Slf4j public class ReceiverStart { + @Autowired private ApplicationContext context; @Autowired diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/ConsumeService.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/ConsumeService.java new file mode 100644 index 0000000..b3ed789 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/ConsumeService.java @@ -0,0 +1,32 @@ +package com.java3y.austin.handler.receiver.service; + + +import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.support.domain.MessageTemplate; + +import java.util.List; + +/** + * 消费消息服务 + * + * @author 3y + */ +public interface ConsumeService { + + /** + * 从MQ拉到消息进行消费,发送消息 + * + * @param taskInfoLists + */ + void consume2Send(List taskInfoLists); + + + /** + * 从MQ拉到消息进行消费,撤回消息 + * + * @param messageTemplate + */ + void consume2recall(MessageTemplate messageTemplate); + + +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/impl/ConsumeServiceImpl.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/impl/ConsumeServiceImpl.java new file mode 100644 index 0000000..1983869 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/impl/ConsumeServiceImpl.java @@ -0,0 +1,56 @@ +package com.java3y.austin.handler.receiver.service.impl; + +import cn.hutool.core.collection.CollUtil; +import com.alibaba.fastjson.JSON; +import com.java3y.austin.common.domain.AnchorInfo; +import com.java3y.austin.common.domain.LogParam; +import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.common.enums.AnchorState; +import com.java3y.austin.handler.handler.HandlerHolder; +import com.java3y.austin.handler.pending.Task; +import com.java3y.austin.handler.pending.TaskPendingHolder; +import com.java3y.austin.handler.receiver.service.ConsumeService; +import com.java3y.austin.handler.utils.GroupIdMappingUtils; +import com.java3y.austin.support.domain.MessageTemplate; +import com.java3y.austin.support.utils.LogUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * @author 3y + */ +@Service +public class ConsumeServiceImpl implements ConsumeService { + private static final String LOG_BIZ_TYPE = "Receiver#consumer"; + private static final String LOG_BIZ_RECALL_TYPE = "Receiver#recall"; + @Autowired + private ApplicationContext context; + + @Autowired + private TaskPendingHolder taskPendingHolder; + + @Autowired + private LogUtils logUtils; + + @Autowired + private HandlerHolder handlerHolder; + + @Override + public void consume2Send(List taskInfoLists) { + String topicGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator())); + for (TaskInfo taskInfo : taskInfoLists) { + logUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), AnchorInfo.builder().ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build()); + Task task = context.getBean(Task.class).setTaskInfo(taskInfo); + taskPendingHolder.route(topicGroupId).execute(task); + } + } + + @Override + public void consume2recall(MessageTemplate messageTemplate) { + logUtils.print(LogParam.builder().bizType(LOG_BIZ_RECALL_TYPE).object(messageTemplate).build()); + handlerHolder.route(messageTemplate.getSendChannel()).recall(messageTemplate); + } +} diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java index d82d37c..c67b5df 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java @@ -4,10 +4,14 @@ import cn.hutool.core.collection.CollUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; import com.google.common.base.Throwables; +import com.google.common.eventbus.EventBus; +import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.enums.RespStatusEnum; import com.java3y.austin.common.vo.BasicResultVO; import com.java3y.austin.service.api.enums.BusinessCode; import com.java3y.austin.service.api.impl.domain.SendTaskModel; +import com.java3y.austin.support.mq.SendMqService; +import com.java3y.austin.support.mq.eventbus.EventBusListener; import com.java3y.austin.support.pipeline.BusinessProcess; import com.java3y.austin.support.pipeline.ProcessContext; import com.java3y.austin.support.utils.KafkaUtils; @@ -16,6 +20,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import java.util.List; + /** * @author 3y * 将消息发送到MQ @@ -24,28 +30,29 @@ import org.springframework.stereotype.Service; @Service public class SendMqAction implements BusinessProcess { + @Autowired - private KafkaUtils kafkaUtils; + private SendMqService sendMqService; @Value("${austin.business.topic.name}") private String sendMessageTopic; @Value("${austin.business.recall.topic.name}") private String austinRecall; - @Value("${austin.business.tagId.value}") private String tagId; + @Override public void process(ProcessContext context) { SendTaskModel sendTaskModel = context.getProcessModel(); try { if (BusinessCode.COMMON_SEND.getCode().equals(context.getCode())) { String message = JSON.toJSONString(sendTaskModel.getTaskInfo(), new SerializerFeature[]{SerializerFeature.WriteClassName}); - kafkaUtils.send(sendMessageTopic, message, tagId); + sendMqService.send(sendMessageTopic, message, tagId); } else if (BusinessCode.RECALL.getCode().equals(context.getCode())) { String message = JSON.toJSONString(sendTaskModel.getMessageTemplate(), new SerializerFeature[]{SerializerFeature.WriteClassName}); - kafkaUtils.send(austinRecall, message, tagId); + sendMqService.send(austinRecall, message, tagId); } } catch (Exception e) { context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR)); @@ -53,4 +60,5 @@ public class SendMqAction implements BusinessProcess { , JSON.toJSONString(CollUtil.getFirst(sendTaskModel.getTaskInfo().listIterator()))); } } + } diff --git a/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java b/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java new file mode 100644 index 0000000..3cf93fd --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java @@ -0,0 +1,14 @@ +package com.java3y.austin.support.constans; + + +/** + * 消息队列常量 + * + * @author 3y + */ +public interface MessageQueuePipeline { + String EVENT_BUS = "eventBus"; + String KAFKA = "kafka"; + String ROCKET_MQ = "rocketMq"; + +} diff --git a/austin-support/src/main/java/com/java3y/austin/support/mq/SendMqService.java b/austin-support/src/main/java/com/java3y/austin/support/mq/SendMqService.java new file mode 100644 index 0000000..8fc8e29 --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/mq/SendMqService.java @@ -0,0 +1,27 @@ +package com.java3y.austin.support.mq; + + +/** + * @author 3y + * 发送数据至消息队列 + */ +public interface SendMqService { + /** + * 发送消息 + * + * @param topic + * @param jsonValue + * @param tagId + */ + void send(String topic, String jsonValue, String tagId); + + + /** + * 发送消息 + * + * @param topic + * @param jsonValue + */ + void send(String topic, String jsonValue); + +} diff --git a/austin-support/src/main/java/com/java3y/austin/support/mq/eventbus/EventBusListener.java b/austin-support/src/main/java/com/java3y/austin/support/mq/eventbus/EventBusListener.java new file mode 100644 index 0000000..5f771fe --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/mq/eventbus/EventBusListener.java @@ -0,0 +1,27 @@ +package com.java3y.austin.support.mq.eventbus; + + +import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.support.domain.MessageTemplate; + +import java.util.List; + +/** + * @author 3y + * 监听器 + */ +public interface EventBusListener { + + + /** + * 消费消息 + * @param lists + */ + void consume(List lists); + + /** + * 撤回消息 + * @param messageTemplate + */ + void recall(MessageTemplate messageTemplate); +} diff --git a/austin-support/src/main/java/com/java3y/austin/support/mq/eventbus/EventBusSendMqServiceImpl.java b/austin-support/src/main/java/com/java3y/austin/support/mq/eventbus/EventBusSendMqServiceImpl.java new file mode 100644 index 0000000..38d5152 --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/mq/eventbus/EventBusSendMqServiceImpl.java @@ -0,0 +1,52 @@ +package com.java3y.austin.support.mq.eventbus; + +import com.alibaba.fastjson.JSON; +import com.google.common.eventbus.EventBus; +import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.support.constans.MessageQueuePipeline; +import com.java3y.austin.support.domain.MessageTemplate; +import com.java3y.austin.support.mq.SendMqService; +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + + +/** + * @author 3y + * EventBus 发送实现类 + */ +@Slf4j +@Service +@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.EVENT_BUS) +public class EventBusSendMqServiceImpl implements SendMqService { + private EventBus eventBus = new EventBus(); + + @Autowired + private EventBusListener eventBusListener; + @Value("${austin.business.topic.name}") + private String sendTopic; + @Value("${austin.business.recall.topic.name}") + private String recallTopic; + /** + * 单机 队列默认不支持 tagId过滤(单机无必要) + * @param topic + * @param jsonValue + * @param tagId + */ + @Override + public void send(String topic, String jsonValue, String tagId) { + eventBus.register(eventBusListener); + if (topic.equals(sendTopic)) { + eventBus.post(JSON.parseArray(jsonValue, TaskInfo.class)); + } else if (topic.equals(recallTopic)) { + eventBus.post(JSON.parseObject(jsonValue, MessageTemplate.class)); + } + } + @Override + public void send(String topic, String jsonValue) { + send(topic, jsonValue, null); + } +} diff --git a/austin-support/src/main/java/com/java3y/austin/support/mq/kafka/KafkaSendMqServiceImpl.java b/austin-support/src/main/java/com/java3y/austin/support/mq/kafka/KafkaSendMqServiceImpl.java new file mode 100644 index 0000000..8f0398b --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/mq/kafka/KafkaSendMqServiceImpl.java @@ -0,0 +1,50 @@ +package com.java3y.austin.support.mq.kafka; + +import cn.hutool.core.util.StrUtil; +import com.java3y.austin.support.constans.MessageQueuePipeline; +import com.java3y.austin.support.mq.SendMqService; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; + + +/** + * @author 3y + * kafka 发送实现类 + */ +@Slf4j +@Service +@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.KAFKA) +public class KafkaSendMqServiceImpl implements SendMqService { + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Value("${austin.business.tagId.key}") + private String tagIdKey; + + @Override + public void send(String topic, String jsonValue, String tagId) { + if (StrUtil.isNotBlank(tagId)) { + List
headers = Arrays.asList(new RecordHeader(tagIdKey, tagId.getBytes(StandardCharsets.UTF_8))); + kafkaTemplate.send(new ProducerRecord(topic, null, null, null, jsonValue, headers)); + } else { + kafkaTemplate.send(topic, jsonValue); + } + } + + @Override + public void send(String topic, String jsonValue) { + send(topic, jsonValue, null); + } +} diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index ecf9a99..f9950da 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -8,15 +8,19 @@ austin-database-port=5004 austin-database-username=root austin-database-password=root123_A -# todo [kafka] ip/port【must】 -austin-kafka-ip=austin.kafka -austin-kafka-port=9092 - # todo [redis] ip/port/password【must】 austin-redis-ip=austin.redis austin-redis-port=5003 austin-redis-password=austin +# TODO kafka/eventbus +austin-mq-pipeline=eventbus + +# todo [kafka] ip/port【optional】, if austin-mq-pipeline=kafka 【must】 +austin-kafka-ip=austin.kafka +austin-kafka-port=9092 + + # todo [xxl-job] switch/ip/port/【optional】 xxl-job.enabled=false austin-xxl-job-ip=127.0.0.1