diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/ClientWorker.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/ClientWorker.java index 17901090..fbe3a40c 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/ClientWorker.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/ClientWorker.java @@ -11,7 +11,6 @@ import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.DisposableBean; import org.springframework.util.StringUtils; import java.net.URLDecoder; @@ -30,7 +29,7 @@ import static cn.hippo4j.common.constant.Constants.*; * @date 2021/6/20 18:34 */ @Slf4j -public class ClientWorker implements DisposableBean { +public class ClientWorker { private double currentLongingTaskCount = 0; @@ -90,12 +89,6 @@ public class ClientWorker implements DisposableBean { } } - @Override - public void destroy() throws Exception { - Optional.ofNullable(executor).ifPresent((each) -> each.shutdown()); - Optional.ofNullable(executorService).ifPresent((each) -> each.shutdown()); - } - class LongPollingRunnable implements Runnable { @Override diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DiscoveryClient.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DiscoveryClient.java index d225675a..679421a7 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DiscoveryClient.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DiscoveryClient.java @@ -15,7 +15,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import java.util.Map; -import java.util.Optional; import java.util.concurrent.*; import static cn.hippo4j.common.constant.Constants.GROUP_KEY; @@ -102,6 +101,10 @@ public class DiscoveryClient implements DisposableBean { log.info("{}{} - remove config cache success.", PREFIX, appPathIdentifier); } } catch (Throwable ex) { + if (ex instanceof ShutdownExecuteException) { + return; + } + log.error("{}{} - remove config cache fail.", PREFIX, appPathIdentifier, ex); } @@ -115,8 +118,6 @@ public class DiscoveryClient implements DisposableBean { } catch (Throwable ex) { log.error("{}{} - destroy service fail.", PREFIX, appPathIdentifier, ex); } - - Optional.ofNullable(scheduler).ifPresent((each) -> each.shutdown()); } public class HeartbeatThread implements Runnable { diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/ShutdownExecuteException.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/ShutdownExecuteException.java new file mode 100644 index 00000000..bbe7232a --- /dev/null +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/ShutdownExecuteException.java @@ -0,0 +1,15 @@ +package cn.hippo4j.starter.core; + +/** + * Shutdown execute exception. + * + * @author chen.ma + * @date 2021/12/9 21:48 + */ +public class ShutdownExecuteException extends Exception { + + public ShutdownExecuteException() { + super("Execute task when stopped."); + } + +} diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/ReportingEventExecutor.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/ReportingEventExecutor.java index 7013eadb..3c0aecbd 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/ReportingEventExecutor.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/ReportingEventExecutor.java @@ -95,7 +95,10 @@ public class ReportingEventExecutor extends AbstractThreadPoolRuntime implements * 采集动态线程池数据, 并添加缓冲队列 */ private void runTimeGatherTask() { - serverHealthCheck.isHealthStatus(); + boolean healthStatus = serverHealthCheck.isHealthStatus(); + if (!healthStatus) { + return; + } Message message = collectMessage(); messageCollectVessel.offer(message); } diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/remote/AbstractHealthCheck.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/remote/AbstractHealthCheck.java index ff2b7b0c..f2e25f03 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/remote/AbstractHealthCheck.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/remote/AbstractHealthCheck.java @@ -1,5 +1,6 @@ package cn.hippo4j.starter.remote; +import cn.hippo4j.starter.core.ShutdownExecuteException; import cn.hippo4j.starter.toolkit.thread.ThreadFactoryBuilder; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -11,6 +12,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import static cn.hippo4j.common.constant.Constants.HEALTH_CHECK_INTERVAL; + /** * Abstract health check. * @@ -18,13 +21,18 @@ import java.util.concurrent.locks.ReentrantLock; * @date 2021/12/8 20:19 */ @Slf4j -public abstract class AbstractHealthCheck implements InitializingBean, ServerHealthCheck { +public abstract class AbstractHealthCheck implements ServerHealthCheck, InitializingBean { /** * Health status */ private volatile boolean healthStatus = true; + /** + * Client shutdown hook + */ + private volatile boolean clientShutdownHook = false; + /** * Health main lock */ @@ -64,13 +72,12 @@ public abstract class AbstractHealthCheck implements InitializingBean, ServerHea } else { healthStatus = false; } - } @Override @SneakyThrows public boolean isHealthStatus() { - while (!healthStatus) { + while (!healthStatus && !clientShutdownHook) { healthMainLock.lock(); try { healthCondition.await(); @@ -79,6 +86,10 @@ public abstract class AbstractHealthCheck implements InitializingBean, ServerHea } } + if (!healthStatus) { + throw new ShutdownExecuteException(); + } + return healthStatus; } @@ -107,7 +118,13 @@ public abstract class AbstractHealthCheck implements InitializingBean, ServerHea @Override public void afterPropertiesSet() throws Exception { - healthCheckExecutor.scheduleWithFixedDelay(() -> healthCheck(), 0, 5, TimeUnit.SECONDS); + // 添加钩子函数, Client 端停止时, 如果 Server 端是非健康状态, Client 销毁函数会暂停运行 + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + clientShutdownHook = true; + signalAllBizThread(); + })); + + healthCheckExecutor.scheduleWithFixedDelay(() -> healthCheck(), 0, HEALTH_CHECK_INTERVAL, TimeUnit.SECONDS); } }