diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java b/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java index eccec923..0b26af76 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java @@ -20,6 +20,8 @@ public class Constants { public static final String NULL = ""; + public static final String UP = "UP"; + public static final String ENCODE = "UTF-8"; public static final int CONFIG_LONG_POLL_TIMEOUT = 30000; @@ -42,6 +44,8 @@ public class Constants { public static final String MONITOR_PATH = BASE_PATH + "/monitor"; + public static final String HEALTH_CHECK_PATH = BASE_PATH + "/health/check"; + public static final String PROBE_MODIFY_REQUEST = "Listening-Configs"; public static final String LONG_PULLING_TIMEOUT = "Long-Pulling-Timeout"; diff --git a/hippo4j-console/src/main/java/cn/hippo4j/console/controller/HealthCheckController.java b/hippo4j-console/src/main/java/cn/hippo4j/console/controller/HealthCheckController.java new file mode 100644 index 00000000..6aa75fef --- /dev/null +++ b/hippo4j-console/src/main/java/cn/hippo4j/console/controller/HealthCheckController.java @@ -0,0 +1,31 @@ +package cn.hippo4j.console.controller; + +import cn.hippo4j.common.constant.Constants; +import cn.hippo4j.common.web.base.Result; +import cn.hippo4j.common.web.base.Results; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import static cn.hippo4j.common.constant.Constants.UP; + +/** + * Health check controller. + * + * @author chen.ma + * @date 2021/12/8 21:02 + */ +@Slf4j +@RestController +@AllArgsConstructor +@RequestMapping(Constants.BASE_PATH + "/health/check") +public class HealthCheckController { + + @GetMapping + public Result healthCheck() { + return Results.success(UP); + } + +} diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/DynamicThreadPoolAutoConfiguration.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/DynamicThreadPoolAutoConfiguration.java index d880bc1b..b23c67ac 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -13,6 +13,8 @@ import cn.hippo4j.starter.monitor.HttpMvcSender; import cn.hippo4j.starter.monitor.MessageSender; import cn.hippo4j.starter.monitor.ReportingEventExecutor; import cn.hippo4j.starter.remote.HttpAgent; +import cn.hippo4j.starter.remote.HttpScheduledHealthCheck; +import cn.hippo4j.starter.remote.ServerHealthCheck; import cn.hippo4j.starter.toolkit.IdentifyUtil; import cn.hippo4j.starter.toolkit.inet.InetUtils; import cn.hutool.core.util.IdUtil; @@ -58,9 +60,9 @@ public class DynamicThreadPoolAutoConfiguration { @Bean @SuppressWarnings("all") - public ConfigService configService(HttpAgent httpAgent, InetUtils hippo4JInetUtils) { + public ConfigService configService(HttpAgent httpAgent, InetUtils hippo4JInetUtils, ServerHealthCheck serverHealthCheck) { String identify = IdentifyUtil.generate(environment, hippo4JInetUtils); - return new ThreadPoolConfigService(httpAgent, identify); + return new ThreadPoolConfigService(httpAgent, identify, serverHealthCheck); } @Bean @@ -92,8 +94,15 @@ public class DynamicThreadPoolAutoConfiguration { } @Bean - public ReportingEventExecutor reportingEventExecutor(BootstrapProperties properties, MessageSender messageSender) { - return new ReportingEventExecutor(properties, messageSender); + public ReportingEventExecutor reportingEventExecutor(BootstrapProperties properties, MessageSender messageSender, + ServerHealthCheck serverHealthCheck) { + return new ReportingEventExecutor(properties, messageSender, serverHealthCheck); + } + + @Bean + @SuppressWarnings("all") + public ServerHealthCheck httpScheduledHealthCheck(HttpAgent httpAgent) { + return new HttpScheduledHealthCheck(httpAgent); } } diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/HttpClientConfig.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/HttpClientConfig.java index fad7e8ff..55390cb0 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/HttpClientConfig.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/HttpClientConfig.java @@ -37,6 +37,7 @@ public class HttpClientConfig { } @Bean + @SuppressWarnings("all") public HttpAgent httpAgent(BootstrapProperties properties, HttpClientUtil hippo4JHttpClientUtil) { return new ServerHttpAgent(properties, hippo4JHttpClientUtil); } diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/ClientWorker.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/ClientWorker.java index 55eab833..17901090 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/ClientWorker.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/ClientWorker.java @@ -5,6 +5,8 @@ import cn.hippo4j.common.toolkit.ContentUtil; import cn.hippo4j.common.toolkit.GroupKey; import cn.hippo4j.common.web.base.Result; import cn.hippo4j.starter.remote.HttpAgent; +import cn.hippo4j.starter.remote.ServerHealthCheck; +import cn.hippo4j.starter.toolkit.thread.ThreadFactoryBuilder; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; import lombok.SneakyThrows; @@ -18,7 +20,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import static cn.hippo4j.common.constant.Constants.*; @@ -39,21 +40,20 @@ public class ClientWorker implements DisposableBean { private final String identification; + private final ServerHealthCheck serverHealthCheck; + private final ScheduledExecutorService executor; private final ScheduledExecutorService executorService; - private AtomicBoolean isHealthServer = new AtomicBoolean(true); - - private AtomicBoolean isHealthServerTemp = new AtomicBoolean(true); - private final ConcurrentHashMap cacheMap = new ConcurrentHashMap(16); @SuppressWarnings("all") - public ClientWorker(HttpAgent httpAgent, String identification) { + public ClientWorker(HttpAgent httpAgent, String identification, ServerHealthCheck serverHealthCheck) { this.agent = httpAgent; this.identification = identification; this.timeout = CONFIG_LONG_POLL_TIMEOUT; + this.serverHealthCheck = serverHealthCheck; this.executor = Executors.newScheduledThreadPool(1, r -> { Thread t = new Thread(r); @@ -62,13 +62,9 @@ public class ClientWorker implements DisposableBean { return t; }); - int threadSize = Runtime.getRuntime().availableProcessors(); - this.executorService = Executors.newScheduledThreadPool(threadSize, r -> { - Thread t = new Thread(r); - t.setName("client.long.polling.executor"); - t.setDaemon(true); - return t; - }); + this.executorService = Executors.newSingleThreadScheduledExecutor( + ThreadFactoryBuilder.builder().prefix("client-long-polling-executor").daemon(true).build() + ); log.info("Client identity :: {}", identification); @@ -102,25 +98,10 @@ public class ClientWorker implements DisposableBean { class LongPollingRunnable implements Runnable { - @SneakyThrows - private void checkStatus() { - if (Objects.equals(isHealthServerTemp.get(), Boolean.FALSE) - && Objects.equals(isHealthServer.get(), Boolean.TRUE)) { - isHealthServerTemp.set(Boolean.TRUE); - log.info("🚀 The client reconnects to the server successfully."); - } - // 服务端状态不正常睡眠 30s - if (!isHealthServer.get()) { - isHealthServerTemp.set(Boolean.FALSE); - log.error("[Check config] Error. exception message, Thread sleep 30 s."); - Thread.sleep(30000); - } - } - @Override @SneakyThrows public void run() { - checkStatus(); + serverHealthCheck.isHealthStatus(); List cacheDataList = new ArrayList(); List inInitializingCacheList = new ArrayList(); @@ -197,10 +178,7 @@ public class ClientWorker implements DisposableBean { long readTimeoutMs = timeout + (long) Math.round(timeout >> 1); Result result = agent.httpPostByConfig(LISTENER_PATH, headers, params, readTimeoutMs); - // Server 端重启后会进入非健康状态, 不进入 catch 则为健康调用 - isHealthServer.set(true); if (result != null && result.isSuccess()) { - setHealthServer(true); return parseUpdateDataIdResponse(result.getData().toString()); } } catch (Exception ex) { @@ -295,12 +273,8 @@ public class ClientWorker implements DisposableBean { return lastCacheData; } - public boolean isHealthServer() { - return this.isHealthServer.get(); - } - private void setHealthServer(boolean isHealthServer) { - this.isHealthServer.set(isHealthServer); + this.serverHealthCheck.setHealthStatus(isHealthServer); } } diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DiscoveryClient.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DiscoveryClient.java index c62b256c..d225675a 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DiscoveryClient.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DiscoveryClient.java @@ -54,11 +54,9 @@ public class DiscoveryClient implements DisposableBean { .threadFactory("DiscoveryClient-HeartbeatExecutor", true) .build(); - this.scheduler = Executors.newScheduledThreadPool(2, - ThreadFactoryBuilder.builder() - .daemon(true) - .prefix("DiscoveryClient-Scheduler") - .build() + this.scheduler = new ScheduledThreadPoolExecutor( + new Integer(1), + ThreadFactoryBuilder.builder().daemon(true).prefix("DiscoveryClient-Scheduler").build() ); register(); diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/ThreadPoolConfigService.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/ThreadPoolConfigService.java index deee2e8f..c292df12 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/ThreadPoolConfigService.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/ThreadPoolConfigService.java @@ -1,6 +1,7 @@ package cn.hippo4j.starter.core; import cn.hippo4j.starter.remote.HttpAgent; +import cn.hippo4j.starter.remote.ServerHealthCheck; import java.util.Arrays; @@ -14,8 +15,11 @@ public class ThreadPoolConfigService implements ConfigService { private final ClientWorker clientWorker; - public ThreadPoolConfigService(HttpAgent httpAgent, String identification) { - clientWorker = new ClientWorker(httpAgent, identification); + private final ServerHealthCheck serverHealthCheck; + + public ThreadPoolConfigService(HttpAgent httpAgent, String identification, ServerHealthCheck serverHealthCheck) { + this.serverHealthCheck = serverHealthCheck; + this.clientWorker = new ClientWorker(httpAgent, identification, serverHealthCheck); } @Override @@ -25,7 +29,7 @@ public class ThreadPoolConfigService implements ConfigService { @Override public String getServerStatus() { - if (clientWorker.isHealthServer()) { + if (serverHealthCheck.isHealthStatus()) { return "UP"; } else { return "DOWN"; diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/ReportingEventExecutor.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/ReportingEventExecutor.java index 44a1b992..7013eadb 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/ReportingEventExecutor.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/monitor/ReportingEventExecutor.java @@ -8,6 +8,7 @@ import cn.hippo4j.common.monitor.RuntimeMessage; import cn.hippo4j.starter.config.BootstrapProperties; import cn.hippo4j.starter.core.GlobalThreadPoolManage; import cn.hippo4j.starter.handler.AbstractThreadPoolRuntime; +import cn.hippo4j.starter.remote.ServerHealthCheck; import cn.hippo4j.starter.toolkit.thread.ThreadFactoryBuilder; import cn.hippo4j.starter.toolkit.thread.ThreadUtil; import cn.hutool.core.bean.BeanUtil; @@ -47,6 +48,9 @@ public class ReportingEventExecutor extends AbstractThreadPoolRuntime implements @NonNull private final MessageSender messageSender; + @NonNull + private final ServerHealthCheck serverHealthCheck; + /** * 数据采集的缓冲容器, 等待 ReportingEventExecutor 上报服务端 */ @@ -57,7 +61,7 @@ public class ReportingEventExecutor extends AbstractThreadPoolRuntime implements */ private final ScheduledThreadPoolExecutor collectVesselExecutor = new ScheduledThreadPoolExecutor( new Integer(1), - ThreadFactoryBuilder.builder().daemon(true).prefix("scheduled-collect-vessel").build() + ThreadFactoryBuilder.builder().daemon(true).prefix("collect-data-scheduled").build() ); @SneakyThrows @@ -91,6 +95,7 @@ public class ReportingEventExecutor extends AbstractThreadPoolRuntime implements * 采集动态线程池数据, 并添加缓冲队列 */ private void runTimeGatherTask() { + serverHealthCheck.isHealthStatus(); Message message = collectMessage(); messageCollectVessel.offer(message); } diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/remote/AbstractHealthCheck.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/remote/AbstractHealthCheck.java new file mode 100644 index 00000000..ff2b7b0c --- /dev/null +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/remote/AbstractHealthCheck.java @@ -0,0 +1,113 @@ +package cn.hippo4j.starter.remote; + +import cn.hippo4j.starter.toolkit.thread.ThreadFactoryBuilder; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.InitializingBean; + +import java.util.Objects; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Abstract health check. + * + * @author chen.ma + * @date 2021/12/8 20:19 + */ +@Slf4j +public abstract class AbstractHealthCheck implements InitializingBean, ServerHealthCheck { + + /** + * Health status + */ + private volatile boolean healthStatus = true; + + /** + * Health main lock + */ + private final ReentrantLock healthMainLock = new ReentrantLock(); + + /** + * Health condition + */ + private final Condition healthCondition = healthMainLock.newCondition(); + + /** + * Health check executor. + */ + private final ScheduledThreadPoolExecutor healthCheckExecutor = new ScheduledThreadPoolExecutor( + new Integer(1), + ThreadFactoryBuilder.builder().daemon(true).prefix("health-check-scheduled").build() + ); + + /** + * Send health check. + * + * @return + */ + protected abstract boolean sendHealthCheck(); + + /** + * Health check. + */ + public void healthCheck() { + boolean healthCheckStatus = sendHealthCheck(); + if (healthCheckStatus) { + if (Objects.equals(healthStatus, false)) { + healthStatus = true; + log.info("🚀 The client reconnects to the server successfully."); + signalAllBizThread(); + } + } else { + healthStatus = false; + } + + } + + @Override + @SneakyThrows + public boolean isHealthStatus() { + while (!healthStatus) { + healthMainLock.lock(); + try { + healthCondition.await(); + } finally { + healthMainLock.unlock(); + } + } + + return healthStatus; + } + + @Override + public void setHealthStatus(boolean healthStatus) { + healthMainLock.lock(); + try { + this.healthStatus = healthStatus; + log.warn("The server health status setting is unavailable."); + } finally { + healthMainLock.unlock(); + } + } + + /** + * Signal all biz thread. + */ + private void signalAllBizThread() { + healthMainLock.lock(); + try { + healthCondition.signalAll(); + } finally { + healthMainLock.unlock(); + } + } + + @Override + public void afterPropertiesSet() throws Exception { + healthCheckExecutor.scheduleWithFixedDelay(() -> healthCheck(), 0, 5, TimeUnit.SECONDS); + } + +} diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/remote/HttpAgent.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/remote/HttpAgent.java index 7eda4a49..11c9156f 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/remote/HttpAgent.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/remote/HttpAgent.java @@ -31,6 +31,14 @@ public interface HttpAgent { */ String getEncode(); + /** + * Http get simple. + * + * @param path + * @return + */ + Result httpGetSimple(String path); + /** * Http post. * diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/remote/HttpScheduledHealthCheck.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/remote/HttpScheduledHealthCheck.java new file mode 100644 index 00000000..0d59b3d4 --- /dev/null +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/remote/HttpScheduledHealthCheck.java @@ -0,0 +1,39 @@ +package cn.hippo4j.starter.remote; + +import cn.hippo4j.common.web.base.Result; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.Objects; + +import static cn.hippo4j.common.constant.Constants.HEALTH_CHECK_PATH; +import static cn.hippo4j.common.constant.Constants.UP; + +/** + * Server health check. + * + * @author chen.ma + * @date 2021/12/8 20:16 + */ +@Slf4j +@AllArgsConstructor +public class HttpScheduledHealthCheck extends AbstractHealthCheck { + + private final HttpAgent httpAgent; + + @Override + protected boolean sendHealthCheck() { + boolean healthStatus = false; + try { + Result healthResult = httpAgent.httpGetSimple(HEALTH_CHECK_PATH); + if (healthResult != null && Objects.equals(healthResult.getData(), UP)) { + healthStatus = true; + } + } catch (Throwable ex) { + log.error("Failed to periodically check the health status of the server.", ex.getMessage()); + } + + return healthStatus; + } + +} diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/remote/ServerHealthCheck.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/remote/ServerHealthCheck.java new file mode 100644 index 00000000..d6be233d --- /dev/null +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/remote/ServerHealthCheck.java @@ -0,0 +1,25 @@ +package cn.hippo4j.starter.remote; + +/** + * Server health check. + * + * @author chen.ma + * @date 2021/12/8 20:08 + */ +public interface ServerHealthCheck { + + /** + * Is health status. + * + * @return + */ + boolean isHealthStatus(); + + /** + * Set health status. + * + * @param healthStatus + */ + void setHealthStatus(boolean healthStatus); + +} diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/remote/ServerHttpAgent.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/remote/ServerHttpAgent.java index cff906a3..a358a51f 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/remote/ServerHttpAgent.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/remote/ServerHttpAgent.java @@ -1,5 +1,6 @@ package cn.hippo4j.starter.remote; +import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.web.base.Result; import cn.hippo4j.starter.config.BootstrapProperties; import cn.hippo4j.starter.toolkit.HttpClientUtil; @@ -20,6 +21,8 @@ public class ServerHttpAgent implements HttpAgent { private final HttpClientUtil httpClientUtil; + private ServerHealthCheck serverHealthCheck; + public ServerHttpAgent(BootstrapProperties properties, HttpClientUtil httpClientUtil) { this.dynamicThreadPoolProperties = properties; this.httpClientUtil = httpClientUtil; @@ -41,23 +44,32 @@ public class ServerHttpAgent implements HttpAgent { return null; } + @Override + public Result httpGetSimple(String path) { + return httpClientUtil.restApiGetHealth(buildUrl(path), Result.class); + } + @Override public Result httpPost(String path, Object body) { + isHealthStatus(); return httpClientUtil.restApiPost(buildUrl(path), body, Result.class); } @Override public Result httpPostByDiscovery(String path, Object body) { + isHealthStatus(); return httpClientUtil.restApiPost(buildUrl(path), body, Result.class); } @Override public Result httpGetByConfig(String path, Map headers, Map paramValues, long readTimeoutMs) { + isHealthStatus(); return httpClientUtil.restApiGetByThreadPool(buildUrl(path), headers, paramValues, readTimeoutMs, Result.class); } @Override public Result httpPostByConfig(String path, Map headers, Map paramValues, long readTimeoutMs) { + isHealthStatus(); return httpClientUtil.restApiPostByThreadPool(buildUrl(path), headers, paramValues, readTimeoutMs, Result.class); } @@ -70,4 +82,12 @@ public class ServerHttpAgent implements HttpAgent { return serverListManager.getCurrentServerAddr() + path; } + private void isHealthStatus() { + if (serverHealthCheck == null) { + serverHealthCheck = ApplicationContextHolder.getBean(ServerHealthCheck.class); + } + + serverHealthCheck.isHealthStatus(); + } + } diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/toolkit/HttpClientUtil.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/toolkit/HttpClientUtil.java index 765cbb97..31c9d225 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/toolkit/HttpClientUtil.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/toolkit/HttpClientUtil.java @@ -66,6 +66,20 @@ public class HttpClientUtil { return JSON.parseObject(resp, clazz); } + /** + * 调用健康检查 + * + * @param url + * @param clazz + * @param + * @return + */ + @SneakyThrows + public T restApiGetHealth(String url, Class clazz) { + String resp = new String(doGet(url), "utf-8"); + return JSON.parseObject(resp, clazz); + } + /** * Get 请求, 支持查询字符串 *