diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java index 83f0ef3..01318d9 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java @@ -11,11 +11,13 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; import java.util.List; import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -31,26 +33,35 @@ import java.util.function.Consumer; @ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.REDIS) public class RedisReceiver implements MessageReceiver { + @Value("${austin.business.topic.name}") + private String sendTopic; + @Value("${austin.business.recall.topic.name}") + private String recallTopic; + @Autowired private StringRedisTemplate stringRedisTemplate; - @Autowired private ConsumeService consumeService; - @Value("${austin.business.topic.name}") - private String sendTopic; - @Value("${austin.business.recall.topic.name}") - private String recallTopic; + /** + * 初始化调度线程池 + */ + @PostConstruct + public void init() { + // 创建调度线程池 + ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(2, + r -> new Thread(r, "RedisReceiverThread")); + // 定时调度 + scheduler.scheduleWithFixedDelay(this::receiveSendMessage, 0, 1, TimeUnit.SECONDS); + scheduler.scheduleWithFixedDelay(this::receiveRecallMessage, 0, 1, TimeUnit.SECONDS); + } /** * 消费发送消息 - * - * @Scheduled() 程序异常退出后拉起 - * */ - @Scheduled(fixedDelay = 1000) public void receiveSendMessage() { receiveMessage(sendTopic, message -> { + log.debug("RedisReceiver#receiveSendMessage message:{}", message); List taskInfoList = JSON.parseArray(message, TaskInfo.class); consumeService.consume2Send(taskInfoList); }); @@ -59,9 +70,9 @@ public class RedisReceiver implements MessageReceiver { /** * 消费撤回消息 */ - @Scheduled(fixedDelay = 1000) public void receiveRecallMessage() { receiveMessage(recallTopic, message -> { + log.debug("RedisReceiver#receiveRecallMessage recallTaskInfo:{}", message); RecallTaskInfo recallTaskInfo = JSON.parseObject(message, RecallTaskInfo.class); consumeService.consume2recall(recallTaskInfo); }); @@ -80,14 +91,12 @@ public class RedisReceiver implements MessageReceiver { while (true) { // 阻塞操作,减少CPU,IO消耗 Optional message = Optional.ofNullable( - stringRedisTemplate.opsForList().rightPop(topic, 0, TimeUnit.SECONDS)); - log.debug("RedisReceiver#receiveMessage Received message from Redis topic {}: {}", topic, message); - if (message.isPresent()) { - consumer.accept(message.get()); - } + stringRedisTemplate.opsForList().rightPop(topic, 20, TimeUnit.SECONDS)); + message.ifPresent(consumer); } } catch (Exception e) { - log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}", topic, e); + log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}", + topic, e.getMessage()); } } }