From 962ee5f7bb144553ed74f8946999bc9c19c4f3c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BE=99=E5=8F=B0?= Date: Thu, 5 Aug 2021 22:30:10 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BF=AE=E6=94=B9=E5=8A=A8=E6=80=81?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E6=B1=A0=20HttpAgent=20=E6=B3=A8=E5=86=8C?= =?UTF-8?q?=E4=B8=BA=20Spring=20Bean.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DynamicThreadPoolAutoConfiguration.java | 21 +++--- ...lientConfig.java => HttpClientConfig.java} | 11 ++- .../starter/core/DiscoveryClient.java | 68 ++++++++++++++++++- .../core/ThreadPoolBeanPostProcessor.java | 10 +-- .../starter/core/ThreadPoolConfigService.java | 6 +- .../threadpool/starter/remote/HttpAgent.java | 27 +++++--- .../starter/remote/ServerHttpAgent.java | 34 ++++++---- 7 files changed, 131 insertions(+), 46 deletions(-) rename dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/{OkHttpClientConfig.java => HttpClientConfig.java} (84%) diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/DynamicThreadPoolAutoConfiguration.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/DynamicThreadPoolAutoConfiguration.java index 9e03a877..9ffb9fd8 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -1,13 +1,14 @@ package com.github.dynamic.threadpool.starter.config; -import com.github.dynamic.threadpool.starter.core.ThreadPoolBeanPostProcessor; -import com.github.dynamic.threadpool.starter.core.ThreadPoolOperation; -import com.github.dynamic.threadpool.starter.enable.DynamicThreadPoolMarkerConfiguration; import com.github.dynamic.threadpool.common.config.ApplicationContextHolder; import com.github.dynamic.threadpool.starter.controller.PoolRunStateController; import com.github.dynamic.threadpool.starter.core.ConfigService; +import com.github.dynamic.threadpool.starter.core.ThreadPoolBeanPostProcessor; import com.github.dynamic.threadpool.starter.core.ThreadPoolConfigService; +import com.github.dynamic.threadpool.starter.core.ThreadPoolOperation; +import com.github.dynamic.threadpool.starter.enable.DynamicThreadPoolMarkerConfiguration; import com.github.dynamic.threadpool.starter.handler.ThreadPoolBannerHandler; +import com.github.dynamic.threadpool.starter.remote.HttpAgent; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.ImportAutoConfiguration; @@ -15,7 +16,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.DependsOn; /** * 动态线程池自动装配类 @@ -26,7 +26,7 @@ import org.springframework.context.annotation.DependsOn; @Slf4j @Configuration @AllArgsConstructor -@ImportAutoConfiguration(OkHttpClientConfig.class) +@ImportAutoConfiguration(HttpClientConfig.class) @EnableConfigurationProperties(DynamicThreadPoolProperties.class) @ConditionalOnBean(DynamicThreadPoolMarkerConfiguration.Marker.class) public class DynamicThreadPoolAutoConfiguration { @@ -44,9 +44,9 @@ public class DynamicThreadPoolAutoConfiguration { } @Bean - @DependsOn("applicationContextHolder") - public ConfigService configService() { - return new ThreadPoolConfigService(properties); + @SuppressWarnings("all") + public ConfigService configService(HttpAgent httpAgent) { + return new ThreadPoolConfigService(httpAgent); } @Bean @@ -55,8 +55,9 @@ public class DynamicThreadPoolAutoConfiguration { } @Bean - public ThreadPoolBeanPostProcessor threadPoolBeanPostProcessor(ThreadPoolOperation threadPoolOperation) { - return new ThreadPoolBeanPostProcessor(properties, threadPoolOperation); + @SuppressWarnings("all") + public ThreadPoolBeanPostProcessor threadPoolBeanPostProcessor(HttpAgent httpAgent, ThreadPoolOperation threadPoolOperation) { + return new ThreadPoolBeanPostProcessor(properties, httpAgent, threadPoolOperation); } @Bean diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/OkHttpClientConfig.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/HttpClientConfig.java similarity index 84% rename from dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/OkHttpClientConfig.java rename to dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/HttpClientConfig.java index 6f90b9a3..b6925168 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/OkHttpClientConfig.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/HttpClientConfig.java @@ -1,5 +1,7 @@ package com.github.dynamic.threadpool.starter.config; +import com.github.dynamic.threadpool.starter.remote.HttpAgent; +import com.github.dynamic.threadpool.starter.remote.ServerHttpAgent; import com.github.dynamic.threadpool.starter.toolkit.HttpClientUtil; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -13,13 +15,13 @@ import javax.net.ssl.X509TrustManager; import java.util.concurrent.TimeUnit; /** - * OkHttp3 bean + * Http Client Config. * * @author chen.ma * @date 2021/6/10 13:28 */ @Slf4j -public class OkHttpClientConfig { +public class HttpClientConfig { /** * 配置 OkHttpClient Bean @@ -41,6 +43,11 @@ public class OkHttpClientConfig { return new HttpClientUtil(); } + @Bean + public HttpAgent httpAgent(DynamicThreadPoolProperties properties, HttpClientUtil httpClientUtil) { + return new ServerHttpAgent(properties, httpClientUtil); + } + /** * 支持 Https * diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/DiscoveryClient.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/DiscoveryClient.java index 06b98ccd..b69d061a 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/DiscoveryClient.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/DiscoveryClient.java @@ -1,19 +1,64 @@ package com.github.dynamic.threadpool.starter.core; +import com.github.dynamic.threadpool.common.web.base.Result; +import com.github.dynamic.threadpool.starter.remote.HttpAgent; +import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadFactoryBuilder; +import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.*; + /** * Discovery Client. * * @author chen.ma * @date 2021/7/13 21:51 */ +@Slf4j public class DiscoveryClient { - private InstanceInfo instanceInfo; + private final ThreadPoolExecutor heartbeatExecutor; + + private final ScheduledExecutorService scheduler; + + private final HttpAgent httpAgent; + + private final InstanceInfo instanceInfo; + + private volatile long lastSuccessfulHeartbeatTimestamp = -1; + + private static final String PREFIX = "DiscoveryClient_"; + + private String appPathIdentifier; + + public DiscoveryClient(HttpAgent httpAgent) { + this.httpAgent = httpAgent; + this.instanceInfo = null; + heartbeatExecutor = ThreadPoolBuilder.builder() + .poolThreadSize(1, 5) + .keepAliveTime(0, TimeUnit.SECONDS) + .workQueue(new SynchronousQueue()) + .threadFactory("DiscoveryClient-HeartbeatExecutor", true) + .build(); + + scheduler = Executors.newScheduledThreadPool(2, + ThreadFactoryBuilder.builder() + .daemon(true) + .prefix("DiscoveryClient-Scheduler") + .build() + ); + + register(); + + // init the schedule tasks + initScheduledTasks(); + } /** * 初始化所有计划任务 */ private void initScheduledTasks() { + scheduler.schedule(new HeartbeatThread(), 30, TimeUnit.SECONDS); } @@ -23,8 +68,22 @@ public class DiscoveryClient { * @return */ boolean register() { + log.info("{}{} :: registering service...", PREFIX, appPathIdentifier); + String urlPath = "/apps/" + appPathIdentifier; - return true; + Result registerResult = null; + try { + registerResult = httpAgent.httpPostByDiscovery(urlPath, instanceInfo); + } catch (Exception ex) { + log.warn("{} {} - registration failed :: {}.", PREFIX, appPathIdentifier, ex.getMessage(), ex); + throw ex; + } + + if (log.isInfoEnabled()) { + log.info("{} {} - registration status: {}.", PREFIX, appPathIdentifier, registerResult.getCode()); + } + + return registerResult.isSuccess(); } @@ -35,7 +94,9 @@ public class DiscoveryClient { @Override public void run() { - + if (renew()) { + lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); + } } } @@ -48,4 +109,5 @@ public class DiscoveryClient { return true; } + } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/ThreadPoolBeanPostProcessor.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/ThreadPoolBeanPostProcessor.java index a6ad6459..11a32d62 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/ThreadPoolBeanPostProcessor.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/ThreadPoolBeanPostProcessor.java @@ -7,7 +7,6 @@ import com.github.dynamic.threadpool.common.web.base.Result; import com.github.dynamic.threadpool.starter.common.CommonThreadPool; import com.github.dynamic.threadpool.starter.config.DynamicThreadPoolProperties; import com.github.dynamic.threadpool.starter.remote.HttpAgent; -import com.github.dynamic.threadpool.starter.remote.ServerHttpAgent; import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum; import com.github.dynamic.threadpool.starter.toolkit.thread.RejectedTypeEnum; import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder; @@ -36,8 +35,12 @@ public final class ThreadPoolBeanPostProcessor implements BeanPostProcessor { private final ThreadPoolOperation threadPoolOperation; - public ThreadPoolBeanPostProcessor(DynamicThreadPoolProperties properties, ThreadPoolOperation threadPoolOperation) { + private final HttpAgent httpAgent; + + public ThreadPoolBeanPostProcessor(DynamicThreadPoolProperties properties, HttpAgent httpAgent, + ThreadPoolOperation threadPoolOperation) { this.properties = properties; + this.httpAgent = httpAgent; this.threadPoolOperation = threadPoolOperation; } @@ -78,12 +81,11 @@ public final class ThreadPoolBeanPostProcessor implements BeanPostProcessor { queryStrMap.put("namespace", properties.getNamespace()); PoolParameterInfo ppi = new PoolParameterInfo(); - HttpAgent httpAgent = new ServerHttpAgent(properties); ThreadPoolExecutor poolExecutor = null; Result result = null; try { - result = httpAgent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 3000L); + result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 3000L); if (result.isSuccess() && result.getData() != null && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) { // 使用相关参数创建线程池 BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity()); diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/ThreadPoolConfigService.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/ThreadPoolConfigService.java index 4897cdaa..df6f191a 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/ThreadPoolConfigService.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/ThreadPoolConfigService.java @@ -1,8 +1,6 @@ package com.github.dynamic.threadpool.starter.core; -import com.github.dynamic.threadpool.starter.config.DynamicThreadPoolProperties; import com.github.dynamic.threadpool.starter.remote.HttpAgent; -import com.github.dynamic.threadpool.starter.remote.ServerHttpAgent; import java.util.Arrays; @@ -18,8 +16,8 @@ public class ThreadPoolConfigService implements ConfigService { private final ClientWorker clientWorker; - public ThreadPoolConfigService(DynamicThreadPoolProperties properties) { - httpAgent = new ServerHttpAgent(properties); + public ThreadPoolConfigService(HttpAgent httpAgent) { + this.httpAgent = httpAgent; clientWorker = new ClientWorker(httpAgent); } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/remote/HttpAgent.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/remote/HttpAgent.java index 048e733b..d9a53575 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/remote/HttpAgent.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/remote/HttpAgent.java @@ -32,7 +32,16 @@ public interface HttpAgent { String getEncode(); /** - * 发起 Http Get 请求 + * 发起 Http Get 请求 By Discovery + * + * @param url + * @param body + * @return + */ + Result httpPostByDiscovery(String url, Object body); + + /** + * 发起 Http Get 请求 By 动态配置 * * @param path * @param headers @@ -40,11 +49,11 @@ public interface HttpAgent { * @param readTimeoutMs * @return */ - Result httpGet(String path, Map headers, Map paramValues, - long readTimeoutMs); + Result httpGetByConfig(String path, Map headers, Map paramValues, + long readTimeoutMs); /** - * 发起 Http Post 请求 + * 发起 Http Post 请求 By 动态配置 * * @param path * @param headers @@ -52,11 +61,11 @@ public interface HttpAgent { * @param readTimeoutMs * @return */ - Result httpPost(String path, Map headers, Map paramValues, - long readTimeoutMs); + Result httpPostByConfig(String path, Map headers, Map paramValues, + long readTimeoutMs); /** - * 发起 Http Delete 请求 + * 发起 Http Delete 请求 By 动态配置 * * @param path * @param headers @@ -64,6 +73,6 @@ public interface HttpAgent { * @param readTimeoutMs * @return */ - Result httpDelete(String path, Map headers, Map paramValues, - long readTimeoutMs); + Result httpDeleteByConfig(String path, Map headers, Map paramValues, + long readTimeoutMs); } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/remote/ServerHttpAgent.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/remote/ServerHttpAgent.java index 818452cc..9ddb72f4 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/remote/ServerHttpAgent.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/remote/ServerHttpAgent.java @@ -1,9 +1,8 @@ package com.github.dynamic.threadpool.starter.remote; -import com.github.dynamic.threadpool.starter.toolkit.HttpClientUtil; -import com.github.dynamic.threadpool.common.config.ApplicationContextHolder; -import com.github.dynamic.threadpool.starter.config.DynamicThreadPoolProperties; import com.github.dynamic.threadpool.common.web.base.Result; +import com.github.dynamic.threadpool.starter.config.DynamicThreadPoolProperties; +import com.github.dynamic.threadpool.starter.toolkit.HttpClientUtil; import java.util.Map; @@ -19,10 +18,11 @@ public class ServerHttpAgent implements HttpAgent { private final ServerListManager serverListManager; - private HttpClientUtil httpClientUtil = ApplicationContextHolder.getBean(HttpClientUtil.class); + private final HttpClientUtil httpClientUtil; - public ServerHttpAgent(DynamicThreadPoolProperties properties) { + public ServerHttpAgent(DynamicThreadPoolProperties properties, HttpClientUtil httpClientUtil) { this.dynamicThreadPoolProperties = properties; + this.httpClientUtil = httpClientUtil; this.serverListManager = new ServerListManager(dynamicThreadPoolProperties); } @@ -32,31 +32,37 @@ public class ServerHttpAgent implements HttpAgent { } @Override - public Result httpGet(String path, Map headers, Map paramValues, long readTimeoutMs) { - return httpClientUtil.restApiGetByThreadPool(buildUrl(path), headers, paramValues, readTimeoutMs, Result.class); + public String getTenantId() { + return dynamicThreadPoolProperties.getNamespace(); } @Override - public Result httpPost(String path, Map headers, Map paramValues, long readTimeoutMs) { - return httpClientUtil.restApiPostByThreadPool(buildUrl(path), headers, paramValues, readTimeoutMs, Result.class); + public String getEncode() { + return null; } @Override - public Result httpDelete(String path, Map headers, Map paramValues, long readTimeoutMs) { - return null; + public Result httpPostByDiscovery(String url, Object body) { + return httpClientUtil.restApiPost(url, body, Result.class); } @Override - public String getTenantId() { - return dynamicThreadPoolProperties.getNamespace(); + public Result httpGetByConfig(String path, Map headers, Map paramValues, long readTimeoutMs) { + return httpClientUtil.restApiGetByThreadPool(buildUrl(path), headers, paramValues, readTimeoutMs, Result.class); } @Override - public String getEncode() { + public Result httpPostByConfig(String path, Map headers, Map paramValues, long readTimeoutMs) { + return httpClientUtil.restApiPostByThreadPool(buildUrl(path), headers, paramValues, readTimeoutMs, Result.class); + } + + @Override + public Result httpDeleteByConfig(String path, Map headers, Map paramValues, long readTimeoutMs) { return null; } private String buildUrl(String path) { return serverListManager.getCurrentServerAddr() + path; } + }