From bee9e24085c68698d5a6f28dd7e5c134f0d3029a Mon Sep 17 00:00:00 2001 From: xzxiaoshan <365384722@qq.com> Date: Fri, 1 Nov 2024 17:28:23 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B32=E4=B8=AA=E9=97=AE=E9=A2=98?= =?UTF-8?q?=EF=BC=9A=201=E3=80=81=E6=8D=95=E8=8E=B7=E5=BC=82=E5=B8=B8?= =?UTF-8?q?=E8=B0=83=E6=95=B4=E5=88=B0=E5=BE=AA=E7=8E=AF=E5=86=85=EF=BC=8C?= =?UTF-8?q?=E9=98=B2=E6=AD=A2=E5=9B=A0=E4=B8=BAredis=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E6=8A=9B=E5=87=BA=E5=BC=82=E5=B8=B8=E7=A8=8B=E5=BA=8F=E4=B8=AD?= =?UTF-8?q?=E6=96=AD=E3=80=82=202=E3=80=81=E6=96=B0=E5=A2=9E=E9=94=80?= =?UTF-8?q?=E6=AF=81=E6=96=B9=E6=B3=95=EF=BC=8C=E5=9C=A8=E7=A8=8B=E5=BA=8F?= =?UTF-8?q?=E4=BC=98=E9=9B=85=E7=BB=93=E6=9D=9F=E8=BF=9B=E7=A8=8B=E5=90=8E?= =?UTF-8?q?=EF=BC=8C=E9=80=80=E5=87=BAwhile=E5=BE=AA=E7=8E=AF=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../handler/receiver/redis/RedisReceiver.java | 39 ++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java index 01318d9..0d6a154 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java @@ -14,6 +14,7 @@ import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.List; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; @@ -22,7 +23,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** - * * Redis 消息队列实现类 * * @author xiaoxiamao @@ -43,13 +43,18 @@ public class RedisReceiver implements MessageReceiver { @Autowired private ConsumeService consumeService; + /** + * 调度线程池 + */ + private ScheduledExecutorService scheduler; + /** * 初始化调度线程池 */ @PostConstruct public void init() { // 创建调度线程池 - ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(2, + this.scheduler = new ScheduledThreadPoolExecutor(2, r -> new Thread(r, "RedisReceiverThread")); // 定时调度 scheduler.scheduleWithFixedDelay(this::receiveSendMessage, 0, 1, TimeUnit.SECONDS); @@ -80,23 +85,39 @@ public class RedisReceiver implements MessageReceiver { /** * 消息处理方法 - * + *
* 处理责任链有去重处理,此处暂不做
*
- * @param topic 消息主题
+ * @param topic 消息主题
* @param consumer 消费处理逻辑
*/
private void receiveMessage(String topic, Consumer