From da6b5a8ad14b4e844b5750f3e30b37f95119bf2e Mon Sep 17 00:00:00 2001 From: xiaoxiamo <82970607@qq.com> Date: Sat, 13 Jul 2024 23:43:11 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0Redis=E5=81=9A?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E9=98=9F=E5=88=97=EF=BC=88=E4=B8=89=EF=BC=89?= =?UTF-8?q?=EF=BC=9A=E6=B5=8B=E8=AF=95Redis=E6=B6=88=E6=81=AF=E9=98=9F?= =?UTF-8?q?=E5=88=97=E9=80=9A=E8=BF=87=EF=BC=8C=E5=87=8F=E5=B0=91=E9=9D=9E?= =?UTF-8?q?=E5=BF=85=E8=A6=81=E6=97=A5=E5=BF=97=E6=89=93=E5=8D=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../handler/receiver/redis/RedisReceiver.java | 43 +++++++++++-------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java index 83f0ef3..01318d9 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java @@ -11,11 +11,13 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; import java.util.List; import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -31,26 +33,35 @@ import java.util.function.Consumer; @ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.REDIS) public class RedisReceiver implements MessageReceiver { + @Value("${austin.business.topic.name}") + private String sendTopic; + @Value("${austin.business.recall.topic.name}") + private String recallTopic; + @Autowired private StringRedisTemplate stringRedisTemplate; - @Autowired private ConsumeService consumeService; - @Value("${austin.business.topic.name}") - private String sendTopic; - @Value("${austin.business.recall.topic.name}") - private String recallTopic; + /** + * 初始化调度线程池 + */ + @PostConstruct + public void init() { + // 创建调度线程池 + ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(2, + r -> new Thread(r, "RedisReceiverThread")); + // 定时调度 + scheduler.scheduleWithFixedDelay(this::receiveSendMessage, 0, 1, TimeUnit.SECONDS); + scheduler.scheduleWithFixedDelay(this::receiveRecallMessage, 0, 1, TimeUnit.SECONDS); + } /** * 消费发送消息 - * - * @Scheduled() 程序异常退出后拉起 - * */ - @Scheduled(fixedDelay = 1000) public void receiveSendMessage() { receiveMessage(sendTopic, message -> { + log.debug("RedisReceiver#receiveSendMessage message:{}", message); List taskInfoList = JSON.parseArray(message, TaskInfo.class); consumeService.consume2Send(taskInfoList); }); @@ -59,9 +70,9 @@ public class RedisReceiver implements MessageReceiver { /** * 消费撤回消息 */ - @Scheduled(fixedDelay = 1000) public void receiveRecallMessage() { receiveMessage(recallTopic, message -> { + log.debug("RedisReceiver#receiveRecallMessage recallTaskInfo:{}", message); RecallTaskInfo recallTaskInfo = JSON.parseObject(message, RecallTaskInfo.class); consumeService.consume2recall(recallTaskInfo); }); @@ -80,14 +91,12 @@ public class RedisReceiver implements MessageReceiver { while (true) { // 阻塞操作,减少CPU,IO消耗 Optional message = Optional.ofNullable( - stringRedisTemplate.opsForList().rightPop(topic, 0, TimeUnit.SECONDS)); - log.debug("RedisReceiver#receiveMessage Received message from Redis topic {}: {}", topic, message); - if (message.isPresent()) { - consumer.accept(message.get()); - } + stringRedisTemplate.opsForList().rightPop(topic, 20, TimeUnit.SECONDS)); + message.ifPresent(consumer); } } catch (Exception e) { - log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}", topic, e); + log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}", + topic, e.getMessage()); } } }