!69 优化代码,程序退出时,主动结束循环,防止程序死循环持续挂起无法正常结束服务

Merge pull request !69 from shanhy/thread-shutdown
master
Java3y 3 months ago committed by Gitee
commit c39c6c741f
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F

@ -8,7 +8,7 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.HiveCatalog;
/** /**
* flink * hive
* <p> * <p>
* Kafka hive * Kafka hive
* *

@ -9,9 +9,11 @@ import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader; import org.springframework.core.io.ResourceLoader;
import org.springframework.core.task.TaskExecutor; import org.springframework.core.task.TaskExecutor;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
@ -67,6 +69,11 @@ public class SensitiveWordsConfig {
@Autowired @Autowired
private ResourceLoader resourceLoader; private ResourceLoader resourceLoader;
/**
* 线
*/
private volatile boolean stop = false;
/** /**
* *
*/ */
@ -120,7 +127,7 @@ public class SensitiveWordsConfig {
* *
*/ */
private void startScheduledUpdate() { private void startScheduledUpdate() {
while (true) { while (!stop) {
try { try {
TimeUnit.SECONDS.sleep(UPDATE_TIME_SECONDS); TimeUnit.SECONDS.sleep(UPDATE_TIME_SECONDS);
log.debug("SensitiveWordConfig#startScheduledUpdate start update..."); log.debug("SensitiveWordConfig#startScheduledUpdate start update...");
@ -128,9 +135,22 @@ public class SensitiveWordsConfig {
storeSensWords(); storeSensWords();
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("SensitiveWordConfig#startScheduledUpdate interrupted: {}", e.getMessage()); log.error("SensitiveWordConfig#startScheduledUpdate interrupted: {}", e.getMessage());
Thread.currentThread().interrupt();
break; break;
} }
} }
} }
/**
* onDestroy
*/
@PreDestroy
public void onDestroy() {
stop = true;
if (taskExecutor instanceof ThreadPoolTaskExecutor) {
ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) taskExecutor;
threadPoolTaskExecutor.shutdown();
}
}
} }

@ -9,7 +9,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
/** /**
* *
@ -23,20 +25,38 @@ public class MessageReceipt {
@Autowired @Autowired
private List<ReceiptMessageStater> receiptMessageStaterList; private List<ReceiptMessageStater> receiptMessageStaterList;
/**
* 线
*/
private volatile boolean stop = false;
@PostConstruct @PostConstruct
private void init() { private void init() {
SupportThreadPoolConfig.getPendingSingleThreadPool().execute(() -> { SupportThreadPoolConfig.getPendingSingleThreadPool().execute(() -> {
while (true) { while (!stop) {
try { try {
for (ReceiptMessageStater receiptMessageStater : receiptMessageStaterList) { for (ReceiptMessageStater receiptMessageStater : receiptMessageStaterList) {
//receiptMessageStater.start(); //receiptMessageStater.start();
} }
Thread.sleep(2000); TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException ex) {
log.error("MessageReceipt#init interrupted: {}", ex.getMessage());
Thread.currentThread().interrupt();
break;
} catch (Exception e) { } catch (Exception e) {
log.error("MessageReceipt#init fail:{}", Throwables.getStackTraceAsString(e)); log.error("MessageReceipt#init fail:{}", Throwables.getStackTraceAsString(e));
Thread.currentThread().interrupt();
} }
} }
}); });
} }
/**
*
*/
@PreDestroy
public void onDestroy() {
this.stop = true;
SupportThreadPoolConfig.getPendingSingleThreadPool().shutdown();
}
} }

@ -14,6 +14,7 @@ import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -22,7 +23,6 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
/** /**
*
* Redis * Redis
* *
* @author xiaoxiamao * @author xiaoxiamao
@ -43,13 +43,23 @@ public class RedisReceiver implements MessageReceiver {
@Autowired @Autowired
private ConsumeService consumeService; private ConsumeService consumeService;
/**
* 线
*/
private ScheduledExecutorService scheduler;
/**
* 线
*/
private volatile boolean stop = false;
/** /**
* 线 * 线
*/ */
@PostConstruct @PostConstruct
public void init() { public void init() {
// 创建调度线程池 // 创建调度线程池
ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(2, this.scheduler = new ScheduledThreadPoolExecutor(2,
r -> new Thread(r, "RedisReceiverThread")); r -> new Thread(r, "RedisReceiverThread"));
// 定时调度 // 定时调度
scheduler.scheduleWithFixedDelay(this::receiveSendMessage, 0, 1, TimeUnit.SECONDS); scheduler.scheduleWithFixedDelay(this::receiveSendMessage, 0, 1, TimeUnit.SECONDS);
@ -80,23 +90,40 @@ public class RedisReceiver implements MessageReceiver {
/** /**
* *
* * <p>
* *
* *
* @param topic * @param topic
* @param consumer * @param consumer
*/ */
private void receiveMessage(String topic, Consumer<String> consumer) { private void receiveMessage(String topic, Consumer<String> consumer) {
while (!stop) {
try { try {
while (true) {
// 阻塞操作减少CPUIO消耗 // 阻塞操作减少CPUIO消耗
Optional<String> message = Optional.ofNullable( Optional<String> message = Optional.ofNullable(
stringRedisTemplate.opsForList().rightPop(topic, 20, TimeUnit.SECONDS)); stringRedisTemplate.opsForList().rightPop(topic, 20, TimeUnit.SECONDS));
message.ifPresent(consumer); message.ifPresent(consumer);
}
} catch (Exception e) { } catch (Exception e) {
log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}", log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}",
topic, e.getMessage()); topic, e.getMessage());
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ex) {
log.error("RedisReceiver#receiveMessage interrupted: {}", e.getMessage());
Thread.currentThread().interrupt();
break;
}
}
}
} }
/**
*
*/
@PreDestroy
public void onDestroy() {
stop = true;
scheduler.shutdown();
} }
} }

Loading…
Cancel
Save