From bee9e24085c68698d5a6f28dd7e5c134f0d3029a Mon Sep 17 00:00:00 2001 From: xzxiaoshan <365384722@qq.com> Date: Fri, 1 Nov 2024 17:28:23 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E8=A7=A3=E5=86=B32=E4=B8=AA=E9=97=AE?= =?UTF-8?q?=E9=A2=98=EF=BC=9A=201=E3=80=81=E6=8D=95=E8=8E=B7=E5=BC=82?= =?UTF-8?q?=E5=B8=B8=E8=B0=83=E6=95=B4=E5=88=B0=E5=BE=AA=E7=8E=AF=E5=86=85?= =?UTF-8?q?=EF=BC=8C=E9=98=B2=E6=AD=A2=E5=9B=A0=E4=B8=BAredis=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=E6=8A=9B=E5=87=BA=E5=BC=82=E5=B8=B8=E7=A8=8B=E5=BA=8F?= =?UTF-8?q?=E4=B8=AD=E6=96=AD=E3=80=82=202=E3=80=81=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E9=94=80=E6=AF=81=E6=96=B9=E6=B3=95=EF=BC=8C=E5=9C=A8=E7=A8=8B?= =?UTF-8?q?=E5=BA=8F=E4=BC=98=E9=9B=85=E7=BB=93=E6=9D=9F=E8=BF=9B=E7=A8=8B?= =?UTF-8?q?=E5=90=8E=EF=BC=8C=E9=80=80=E5=87=BAwhile=E5=BE=AA=E7=8E=AF?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../handler/receiver/redis/RedisReceiver.java | 39 ++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java index 01318d9..0d6a154 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java @@ -14,6 +14,7 @@ import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.List; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; @@ -22,7 +23,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** - * * Redis 消息队列实现类 * * @author xiaoxiamao @@ -43,13 +43,18 @@ public class RedisReceiver implements MessageReceiver { @Autowired private ConsumeService consumeService; + /** + * 调度线程池 + */ + private ScheduledExecutorService scheduler; + /** * 初始化调度线程池 */ @PostConstruct public void init() { // 创建调度线程池 - ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(2, + this.scheduler = new ScheduledThreadPoolExecutor(2, r -> new Thread(r, "RedisReceiverThread")); // 定时调度 scheduler.scheduleWithFixedDelay(this::receiveSendMessage, 0, 1, TimeUnit.SECONDS); @@ -80,23 +85,39 @@ public class RedisReceiver implements MessageReceiver { /** * 消息处理方法 - * + *

* 处理责任链有去重处理,此处暂不做 * - * @param topic 消息主题 + * @param topic 消息主题 * @param consumer 消费处理逻辑 */ private void receiveMessage(String topic, Consumer consumer) { - try { - while (true) { + while (true) { + try { // 阻塞操作,减少CPU,IO消耗 Optional message = Optional.ofNullable( stringRedisTemplate.opsForList().rightPop(topic, 20, TimeUnit.SECONDS)); message.ifPresent(consumer); + } catch (Exception e) { + log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}", + topic, e.getMessage()); + try { + TimeUnit.SECONDS.sleep(10); + } catch (InterruptedException ex) { + log.error("RedisReceiver#receiveMessage interrupted: {}", e.getMessage()); + Thread.currentThread().interrupt(); + break; + } } - } catch (Exception e) { - log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}", - topic, e.getMessage()); } } + + /** + * 销毁调用 + */ + @PreDestroy + public void onDestroy() { + scheduler.shutdown(); + } + } From 4fc485966667bbb8324bce12647e782f0bd88c66 Mon Sep 17 00:00:00 2001 From: xzxiaoshan <365384722@qq.com> Date: Fri, 1 Nov 2024 17:32:41 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=EF=BC=9A=E6=96=B0=E5=A2=9E=E9=94=80=E6=AF=81=E6=96=B9=E6=B3=95?= =?UTF-8?q?=EF=BC=8C=E5=9C=A8=E7=A8=8B=E5=BA=8F=E4=BC=98=E9=9B=85=E9=80=80?= =?UTF-8?q?=E5=87=BA=E5=90=8E=E4=B8=BB=E5=8A=A8shutdown=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B1=A0=EF=BC=8C=E4=BB=8E=E8=80=8C=E8=A7=A6=E5=8F=91Interrupt?= =?UTF-8?q?edException=E5=BC=82=E5=B8=B8=EF=BC=8Cbreak=E5=87=BA=E5=BE=AA?= =?UTF-8?q?=E7=8E=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../austin/handler/receipt/MessageReceipt.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receipt/MessageReceipt.java b/austin-handler/src/main/java/com/java3y/austin/handler/receipt/MessageReceipt.java index 01f00f7..3e4d816 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receipt/MessageReceipt.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receipt/MessageReceipt.java @@ -9,7 +9,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.List; +import java.util.concurrent.TimeUnit; /** * 拉取回执信息 入口 @@ -31,12 +33,24 @@ public class MessageReceipt { for (ReceiptMessageStater receiptMessageStater : receiptMessageStaterList) { //receiptMessageStater.start(); } - Thread.sleep(2000); + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException ex) { + log.error("MessageReceipt#init interrupted: {}", ex.getMessage()); + Thread.currentThread().interrupt(); + break; } catch (Exception e) { log.error("MessageReceipt#init fail:{}", Throwables.getStackTraceAsString(e)); - Thread.currentThread().interrupt(); } } }); } + + /** + * 销毁调用 + */ + @PreDestroy + public void onDestroy() { + SupportThreadPoolConfig.getPendingSingleThreadPool().shutdown(); + } + } From 606b1afeb6ebd31b62022d1ec7816d135048d491 Mon Sep 17 00:00:00 2001 From: xzxiaoshan <365384722@qq.com> Date: Fri, 1 Nov 2024 17:43:17 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E7=BA=A0=E6=AD=A3=E6=B3=A8=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/java3y/austin/datahouse/AustinHiveBootStrap.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/austin-data-house/src/main/java/com/java3y/austin/datahouse/AustinHiveBootStrap.java b/austin-data-house/src/main/java/com/java3y/austin/datahouse/AustinHiveBootStrap.java index 656e264..b796e69 100644 --- a/austin-data-house/src/main/java/com/java3y/austin/datahouse/AustinHiveBootStrap.java +++ b/austin-data-house/src/main/java/com/java3y/austin/datahouse/AustinHiveBootStrap.java @@ -8,7 +8,7 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; /** - * flink启动类 + * hive启动类 *

* 接受Kafka的消息 写入hive表中 * From 1908393aca3c0a1f80ef1a6d90a758f3c556c40f Mon Sep 17 00:00:00 2001 From: xzxiaoshan <365384722@qq.com> Date: Fri, 1 Nov 2024 17:44:11 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=EF=BC=9A=E7=A8=8B=E5=BA=8F=E9=80=80=E5=87=BA=E5=90=8E=E4=BC=98?= =?UTF-8?q?=E9=9B=85=E7=BB=93=E6=9D=9F=E7=A8=8B=E5=BA=8F=EF=BC=8C=E9=80=80?= =?UTF-8?q?=E5=87=BA=E5=BE=AA=E7=8E=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../handler/config/SensitiveWordsConfig.java | 22 ++++++++++++++++++- .../handler/receipt/MessageReceipt.java | 8 ++++++- .../handler/receiver/redis/RedisReceiver.java | 8 ++++++- 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/config/SensitiveWordsConfig.java b/austin-handler/src/main/java/com/java3y/austin/handler/config/SensitiveWordsConfig.java index d69a8e0..46a757a 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/config/SensitiveWordsConfig.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/config/SensitiveWordsConfig.java @@ -9,9 +9,11 @@ import org.springframework.core.io.Resource; import org.springframework.core.io.ResourceLoader; import org.springframework.core.task.TaskExecutor; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.ObjectUtils; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; @@ -67,6 +69,11 @@ public class SensitiveWordsConfig { @Autowired private ResourceLoader resourceLoader; + /** + * 是否终止线程 + */ + private volatile boolean stop = false; + /** * 初始化敏感词字典 */ @@ -120,7 +127,7 @@ public class SensitiveWordsConfig { * 实现热更新,修改词典后自动加载 */ private void startScheduledUpdate() { - while (true) { + while (!stop) { try { TimeUnit.SECONDS.sleep(UPDATE_TIME_SECONDS); log.debug("SensitiveWordConfig#startScheduledUpdate start update..."); @@ -128,9 +135,22 @@ public class SensitiveWordsConfig { storeSensWords(); } catch (InterruptedException e) { log.error("SensitiveWordConfig#startScheduledUpdate interrupted: {}", e.getMessage()); + Thread.currentThread().interrupt(); break; } } } + /** + * onDestroy + */ + @PreDestroy + public void onDestroy() { + stop = true; + if (taskExecutor instanceof ThreadPoolTaskExecutor) { + ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) taskExecutor; + threadPoolTaskExecutor.shutdown(); + } + } + } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receipt/MessageReceipt.java b/austin-handler/src/main/java/com/java3y/austin/handler/receipt/MessageReceipt.java index 3e4d816..a5b10b0 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receipt/MessageReceipt.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receipt/MessageReceipt.java @@ -25,10 +25,15 @@ public class MessageReceipt { @Autowired private List receiptMessageStaterList; + /** + * 是否终止线程 + */ + private volatile boolean stop = false; + @PostConstruct private void init() { SupportThreadPoolConfig.getPendingSingleThreadPool().execute(() -> { - while (true) { + while (!stop) { try { for (ReceiptMessageStater receiptMessageStater : receiptMessageStaterList) { //receiptMessageStater.start(); @@ -50,6 +55,7 @@ public class MessageReceipt { */ @PreDestroy public void onDestroy() { + this.stop = true; SupportThreadPoolConfig.getPendingSingleThreadPool().shutdown(); } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java index 0d6a154..15cfebc 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java @@ -48,6 +48,11 @@ public class RedisReceiver implements MessageReceiver { */ private ScheduledExecutorService scheduler; + /** + * 是否终止线程 + */ + private volatile boolean stop = false; + /** * 初始化调度线程池 */ @@ -92,7 +97,7 @@ public class RedisReceiver implements MessageReceiver { * @param consumer 消费处理逻辑 */ private void receiveMessage(String topic, Consumer consumer) { - while (true) { + while (!stop) { try { // 阻塞操作,减少CPU,IO消耗 Optional message = Optional.ofNullable( @@ -117,6 +122,7 @@ public class RedisReceiver implements MessageReceiver { */ @PreDestroy public void onDestroy() { + stop = true; scheduler.shutdown(); }