diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java index 01318d9..0d6a154 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java @@ -14,6 +14,7 @@ import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.List; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; @@ -22,7 +23,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** - * * Redis 消息队列实现类 * * @author xiaoxiamao @@ -43,13 +43,18 @@ public class RedisReceiver implements MessageReceiver { @Autowired private ConsumeService consumeService; + /** + * 调度线程池 + */ + private ScheduledExecutorService scheduler; + /** * 初始化调度线程池 */ @PostConstruct public void init() { // 创建调度线程池 - ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(2, + this.scheduler = new ScheduledThreadPoolExecutor(2, r -> new Thread(r, "RedisReceiverThread")); // 定时调度 scheduler.scheduleWithFixedDelay(this::receiveSendMessage, 0, 1, TimeUnit.SECONDS); @@ -80,23 +85,39 @@ public class RedisReceiver implements MessageReceiver { /** * 消息处理方法 - * + *

* 处理责任链有去重处理,此处暂不做 * - * @param topic 消息主题 + * @param topic 消息主题 * @param consumer 消费处理逻辑 */ private void receiveMessage(String topic, Consumer consumer) { - try { - while (true) { + while (true) { + try { // 阻塞操作,减少CPU,IO消耗 Optional message = Optional.ofNullable( stringRedisTemplate.opsForList().rightPop(topic, 20, TimeUnit.SECONDS)); message.ifPresent(consumer); + } catch (Exception e) { + log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}", + topic, e.getMessage()); + try { + TimeUnit.SECONDS.sleep(10); + } catch (InterruptedException ex) { + log.error("RedisReceiver#receiveMessage interrupted: {}", e.getMessage()); + Thread.currentThread().interrupt(); + break; + } } - } catch (Exception e) { - log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}", - topic, e.getMessage()); } } + + /** + * 销毁调用 + */ + @PreDestroy + public void onDestroy() { + scheduler.shutdown(); + } + }