diff --git a/austin-data-house/src/main/java/com/java3y/austin/datahouse/AustinHiveBootStrap.java b/austin-data-house/src/main/java/com/java3y/austin/datahouse/AustinHiveBootStrap.java index 656e264..b796e69 100644 --- a/austin-data-house/src/main/java/com/java3y/austin/datahouse/AustinHiveBootStrap.java +++ b/austin-data-house/src/main/java/com/java3y/austin/datahouse/AustinHiveBootStrap.java @@ -8,7 +8,7 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; /** - * flink启动类 + * hive启动类 *

* 接受Kafka的消息 写入hive表中 * diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/config/SensitiveWordsConfig.java b/austin-handler/src/main/java/com/java3y/austin/handler/config/SensitiveWordsConfig.java index d69a8e0..46a757a 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/config/SensitiveWordsConfig.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/config/SensitiveWordsConfig.java @@ -9,9 +9,11 @@ import org.springframework.core.io.Resource; import org.springframework.core.io.ResourceLoader; import org.springframework.core.task.TaskExecutor; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.ObjectUtils; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; @@ -67,6 +69,11 @@ public class SensitiveWordsConfig { @Autowired private ResourceLoader resourceLoader; + /** + * 是否终止线程 + */ + private volatile boolean stop = false; + /** * 初始化敏感词字典 */ @@ -120,7 +127,7 @@ public class SensitiveWordsConfig { * 实现热更新,修改词典后自动加载 */ private void startScheduledUpdate() { - while (true) { + while (!stop) { try { TimeUnit.SECONDS.sleep(UPDATE_TIME_SECONDS); log.debug("SensitiveWordConfig#startScheduledUpdate start update..."); @@ -128,9 +135,22 @@ public class SensitiveWordsConfig { storeSensWords(); } catch (InterruptedException e) { log.error("SensitiveWordConfig#startScheduledUpdate interrupted: {}", e.getMessage()); + Thread.currentThread().interrupt(); break; } } } + /** + * onDestroy + */ + @PreDestroy + public void onDestroy() { + stop = true; + if (taskExecutor instanceof ThreadPoolTaskExecutor) { + ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) taskExecutor; + threadPoolTaskExecutor.shutdown(); + } + } + } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receipt/MessageReceipt.java b/austin-handler/src/main/java/com/java3y/austin/handler/receipt/MessageReceipt.java index 01f00f7..a5b10b0 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receipt/MessageReceipt.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receipt/MessageReceipt.java @@ -9,7 +9,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.List; +import java.util.concurrent.TimeUnit; /** * 拉取回执信息 入口 @@ -23,20 +25,38 @@ public class MessageReceipt { @Autowired private List receiptMessageStaterList; + /** + * 是否终止线程 + */ + private volatile boolean stop = false; + @PostConstruct private void init() { SupportThreadPoolConfig.getPendingSingleThreadPool().execute(() -> { - while (true) { + while (!stop) { try { for (ReceiptMessageStater receiptMessageStater : receiptMessageStaterList) { //receiptMessageStater.start(); } - Thread.sleep(2000); + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException ex) { + log.error("MessageReceipt#init interrupted: {}", ex.getMessage()); + Thread.currentThread().interrupt(); + break; } catch (Exception e) { log.error("MessageReceipt#init fail:{}", Throwables.getStackTraceAsString(e)); - Thread.currentThread().interrupt(); } } }); } + + /** + * 销毁调用 + */ + @PreDestroy + public void onDestroy() { + this.stop = true; + SupportThreadPoolConfig.getPendingSingleThreadPool().shutdown(); + } + } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java index 01318d9..15cfebc 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java @@ -14,6 +14,7 @@ import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.List; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; @@ -22,7 +23,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** - * * Redis 消息队列实现类 * * @author xiaoxiamao @@ -43,13 +43,23 @@ public class RedisReceiver implements MessageReceiver { @Autowired private ConsumeService consumeService; + /** + * 调度线程池 + */ + private ScheduledExecutorService scheduler; + + /** + * 是否终止线程 + */ + private volatile boolean stop = false; + /** * 初始化调度线程池 */ @PostConstruct public void init() { // 创建调度线程池 - ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(2, + this.scheduler = new ScheduledThreadPoolExecutor(2, r -> new Thread(r, "RedisReceiverThread")); // 定时调度 scheduler.scheduleWithFixedDelay(this::receiveSendMessage, 0, 1, TimeUnit.SECONDS); @@ -80,23 +90,40 @@ public class RedisReceiver implements MessageReceiver { /** * 消息处理方法 - * + *

* 处理责任链有去重处理,此处暂不做 * - * @param topic 消息主题 + * @param topic 消息主题 * @param consumer 消费处理逻辑 */ private void receiveMessage(String topic, Consumer consumer) { - try { - while (true) { + while (!stop) { + try { // 阻塞操作,减少CPU,IO消耗 Optional message = Optional.ofNullable( stringRedisTemplate.opsForList().rightPop(topic, 20, TimeUnit.SECONDS)); message.ifPresent(consumer); + } catch (Exception e) { + log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}", + topic, e.getMessage()); + try { + TimeUnit.SECONDS.sleep(10); + } catch (InterruptedException ex) { + log.error("RedisReceiver#receiveMessage interrupted: {}", e.getMessage()); + Thread.currentThread().interrupt(); + break; + } } - } catch (Exception e) { - log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}", - topic, e.getMessage()); } } + + /** + * 销毁调用 + */ + @PreDestroy + public void onDestroy() { + stop = true; + scheduler.shutdown(); + } + }