feat: 修改动态线程池 HttpAgent 注册为 Spring Bean.

pull/161/head
龙台 3 years ago
parent 471744635b
commit 962ee5f7bb

@ -1,13 +1,14 @@
package com.github.dynamic.threadpool.starter.config; package com.github.dynamic.threadpool.starter.config;
import com.github.dynamic.threadpool.starter.core.ThreadPoolBeanPostProcessor;
import com.github.dynamic.threadpool.starter.core.ThreadPoolOperation;
import com.github.dynamic.threadpool.starter.enable.DynamicThreadPoolMarkerConfiguration;
import com.github.dynamic.threadpool.common.config.ApplicationContextHolder; import com.github.dynamic.threadpool.common.config.ApplicationContextHolder;
import com.github.dynamic.threadpool.starter.controller.PoolRunStateController; import com.github.dynamic.threadpool.starter.controller.PoolRunStateController;
import com.github.dynamic.threadpool.starter.core.ConfigService; import com.github.dynamic.threadpool.starter.core.ConfigService;
import com.github.dynamic.threadpool.starter.core.ThreadPoolBeanPostProcessor;
import com.github.dynamic.threadpool.starter.core.ThreadPoolConfigService; import com.github.dynamic.threadpool.starter.core.ThreadPoolConfigService;
import com.github.dynamic.threadpool.starter.core.ThreadPoolOperation;
import com.github.dynamic.threadpool.starter.enable.DynamicThreadPoolMarkerConfiguration;
import com.github.dynamic.threadpool.starter.handler.ThreadPoolBannerHandler; import com.github.dynamic.threadpool.starter.handler.ThreadPoolBannerHandler;
import com.github.dynamic.threadpool.starter.remote.HttpAgent;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
@ -15,7 +16,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
/** /**
* 线 * 线
@ -26,7 +26,7 @@ import org.springframework.context.annotation.DependsOn;
@Slf4j @Slf4j
@Configuration @Configuration
@AllArgsConstructor @AllArgsConstructor
@ImportAutoConfiguration(OkHttpClientConfig.class) @ImportAutoConfiguration(HttpClientConfig.class)
@EnableConfigurationProperties(DynamicThreadPoolProperties.class) @EnableConfigurationProperties(DynamicThreadPoolProperties.class)
@ConditionalOnBean(DynamicThreadPoolMarkerConfiguration.Marker.class) @ConditionalOnBean(DynamicThreadPoolMarkerConfiguration.Marker.class)
public class DynamicThreadPoolAutoConfiguration { public class DynamicThreadPoolAutoConfiguration {
@ -44,9 +44,9 @@ public class DynamicThreadPoolAutoConfiguration {
} }
@Bean @Bean
@DependsOn("applicationContextHolder") @SuppressWarnings("all")
public ConfigService configService() { public ConfigService configService(HttpAgent httpAgent) {
return new ThreadPoolConfigService(properties); return new ThreadPoolConfigService(httpAgent);
} }
@Bean @Bean
@ -55,8 +55,9 @@ public class DynamicThreadPoolAutoConfiguration {
} }
@Bean @Bean
public ThreadPoolBeanPostProcessor threadPoolBeanPostProcessor(ThreadPoolOperation threadPoolOperation) { @SuppressWarnings("all")
return new ThreadPoolBeanPostProcessor(properties, threadPoolOperation); public ThreadPoolBeanPostProcessor threadPoolBeanPostProcessor(HttpAgent httpAgent, ThreadPoolOperation threadPoolOperation) {
return new ThreadPoolBeanPostProcessor(properties, httpAgent, threadPoolOperation);
} }
@Bean @Bean

@ -1,5 +1,7 @@
package com.github.dynamic.threadpool.starter.config; package com.github.dynamic.threadpool.starter.config;
import com.github.dynamic.threadpool.starter.remote.HttpAgent;
import com.github.dynamic.threadpool.starter.remote.ServerHttpAgent;
import com.github.dynamic.threadpool.starter.toolkit.HttpClientUtil; import com.github.dynamic.threadpool.starter.toolkit.HttpClientUtil;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -13,13 +15,13 @@ import javax.net.ssl.X509TrustManager;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* OkHttp3 bean * Http Client Config.
* *
* @author chen.ma * @author chen.ma
* @date 2021/6/10 13:28 * @date 2021/6/10 13:28
*/ */
@Slf4j @Slf4j
public class OkHttpClientConfig { public class HttpClientConfig {
/** /**
* OkHttpClient Bean * OkHttpClient Bean
@ -41,6 +43,11 @@ public class OkHttpClientConfig {
return new HttpClientUtil(); return new HttpClientUtil();
} }
@Bean
public HttpAgent httpAgent(DynamicThreadPoolProperties properties, HttpClientUtil httpClientUtil) {
return new ServerHttpAgent(properties, httpClientUtil);
}
/** /**
* Https * Https
* *

@ -1,19 +1,64 @@
package com.github.dynamic.threadpool.starter.core; package com.github.dynamic.threadpool.starter.core;
import com.github.dynamic.threadpool.common.web.base.Result;
import com.github.dynamic.threadpool.starter.remote.HttpAgent;
import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadFactoryBuilder;
import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
/** /**
* Discovery Client. * Discovery Client.
* *
* @author chen.ma * @author chen.ma
* @date 2021/7/13 21:51 * @date 2021/7/13 21:51
*/ */
@Slf4j
public class DiscoveryClient { public class DiscoveryClient {
private InstanceInfo instanceInfo; private final ThreadPoolExecutor heartbeatExecutor;
private final ScheduledExecutorService scheduler;
private final HttpAgent httpAgent;
private final InstanceInfo instanceInfo;
private volatile long lastSuccessfulHeartbeatTimestamp = -1;
private static final String PREFIX = "DiscoveryClient_";
private String appPathIdentifier;
public DiscoveryClient(HttpAgent httpAgent) {
this.httpAgent = httpAgent;
this.instanceInfo = null;
heartbeatExecutor = ThreadPoolBuilder.builder()
.poolThreadSize(1, 5)
.keepAliveTime(0, TimeUnit.SECONDS)
.workQueue(new SynchronousQueue())
.threadFactory("DiscoveryClient-HeartbeatExecutor", true)
.build();
scheduler = Executors.newScheduledThreadPool(2,
ThreadFactoryBuilder.builder()
.daemon(true)
.prefix("DiscoveryClient-Scheduler")
.build()
);
register();
// init the schedule tasks
initScheduledTasks();
}
/** /**
* *
*/ */
private void initScheduledTasks() { private void initScheduledTasks() {
scheduler.schedule(new HeartbeatThread(), 30, TimeUnit.SECONDS);
} }
@ -23,8 +68,22 @@ public class DiscoveryClient {
* @return * @return
*/ */
boolean register() { boolean register() {
log.info("{}{} :: registering service...", PREFIX, appPathIdentifier);
String urlPath = "/apps/" + appPathIdentifier;
return true; Result registerResult = null;
try {
registerResult = httpAgent.httpPostByDiscovery(urlPath, instanceInfo);
} catch (Exception ex) {
log.warn("{} {} - registration failed :: {}.", PREFIX, appPathIdentifier, ex.getMessage(), ex);
throw ex;
}
if (log.isInfoEnabled()) {
log.info("{} {} - registration status: {}.", PREFIX, appPathIdentifier, registerResult.getCode());
}
return registerResult.isSuccess();
} }
@ -35,7 +94,9 @@ public class DiscoveryClient {
@Override @Override
public void run() { public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
} }
} }
@ -48,4 +109,5 @@ public class DiscoveryClient {
return true; return true;
} }
} }

@ -7,7 +7,6 @@ import com.github.dynamic.threadpool.common.web.base.Result;
import com.github.dynamic.threadpool.starter.common.CommonThreadPool; import com.github.dynamic.threadpool.starter.common.CommonThreadPool;
import com.github.dynamic.threadpool.starter.config.DynamicThreadPoolProperties; import com.github.dynamic.threadpool.starter.config.DynamicThreadPoolProperties;
import com.github.dynamic.threadpool.starter.remote.HttpAgent; import com.github.dynamic.threadpool.starter.remote.HttpAgent;
import com.github.dynamic.threadpool.starter.remote.ServerHttpAgent;
import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum; import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum;
import com.github.dynamic.threadpool.starter.toolkit.thread.RejectedTypeEnum; import com.github.dynamic.threadpool.starter.toolkit.thread.RejectedTypeEnum;
import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder; import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder;
@ -36,8 +35,12 @@ public final class ThreadPoolBeanPostProcessor implements BeanPostProcessor {
private final ThreadPoolOperation threadPoolOperation; private final ThreadPoolOperation threadPoolOperation;
public ThreadPoolBeanPostProcessor(DynamicThreadPoolProperties properties, ThreadPoolOperation threadPoolOperation) { private final HttpAgent httpAgent;
public ThreadPoolBeanPostProcessor(DynamicThreadPoolProperties properties, HttpAgent httpAgent,
ThreadPoolOperation threadPoolOperation) {
this.properties = properties; this.properties = properties;
this.httpAgent = httpAgent;
this.threadPoolOperation = threadPoolOperation; this.threadPoolOperation = threadPoolOperation;
} }
@ -78,12 +81,11 @@ public final class ThreadPoolBeanPostProcessor implements BeanPostProcessor {
queryStrMap.put("namespace", properties.getNamespace()); queryStrMap.put("namespace", properties.getNamespace());
PoolParameterInfo ppi = new PoolParameterInfo(); PoolParameterInfo ppi = new PoolParameterInfo();
HttpAgent httpAgent = new ServerHttpAgent(properties);
ThreadPoolExecutor poolExecutor = null; ThreadPoolExecutor poolExecutor = null;
Result result = null; Result result = null;
try { try {
result = httpAgent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 3000L); result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 3000L);
if (result.isSuccess() && result.getData() != null && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) { if (result.isSuccess() && result.getData() != null && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) {
// 使用相关参数创建线程池 // 使用相关参数创建线程池
BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity()); BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity());

@ -1,8 +1,6 @@
package com.github.dynamic.threadpool.starter.core; package com.github.dynamic.threadpool.starter.core;
import com.github.dynamic.threadpool.starter.config.DynamicThreadPoolProperties;
import com.github.dynamic.threadpool.starter.remote.HttpAgent; import com.github.dynamic.threadpool.starter.remote.HttpAgent;
import com.github.dynamic.threadpool.starter.remote.ServerHttpAgent;
import java.util.Arrays; import java.util.Arrays;
@ -18,8 +16,8 @@ public class ThreadPoolConfigService implements ConfigService {
private final ClientWorker clientWorker; private final ClientWorker clientWorker;
public ThreadPoolConfigService(DynamicThreadPoolProperties properties) { public ThreadPoolConfigService(HttpAgent httpAgent) {
httpAgent = new ServerHttpAgent(properties); this.httpAgent = httpAgent;
clientWorker = new ClientWorker(httpAgent); clientWorker = new ClientWorker(httpAgent);
} }

@ -32,7 +32,16 @@ public interface HttpAgent {
String getEncode(); String getEncode();
/** /**
* Http Get * Http Get By Discovery
*
* @param url
* @param body
* @return
*/
Result httpPostByDiscovery(String url, Object body);
/**
* Http Get By
* *
* @param path * @param path
* @param headers * @param headers
@ -40,11 +49,11 @@ public interface HttpAgent {
* @param readTimeoutMs * @param readTimeoutMs
* @return * @return
*/ */
Result httpGet(String path, Map<String, String> headers, Map<String, String> paramValues, Result httpGetByConfig(String path, Map<String, String> headers, Map<String, String> paramValues,
long readTimeoutMs); long readTimeoutMs);
/** /**
* Http Post * Http Post By
* *
* @param path * @param path
* @param headers * @param headers
@ -52,11 +61,11 @@ public interface HttpAgent {
* @param readTimeoutMs * @param readTimeoutMs
* @return * @return
*/ */
Result httpPost(String path, Map<String, String> headers, Map<String, String> paramValues, Result httpPostByConfig(String path, Map<String, String> headers, Map<String, String> paramValues,
long readTimeoutMs); long readTimeoutMs);
/** /**
* Http Delete * Http Delete By
* *
* @param path * @param path
* @param headers * @param headers
@ -64,6 +73,6 @@ public interface HttpAgent {
* @param readTimeoutMs * @param readTimeoutMs
* @return * @return
*/ */
Result httpDelete(String path, Map<String, String> headers, Map<String, String> paramValues, Result httpDeleteByConfig(String path, Map<String, String> headers, Map<String, String> paramValues,
long readTimeoutMs); long readTimeoutMs);
} }

@ -1,9 +1,8 @@
package com.github.dynamic.threadpool.starter.remote; package com.github.dynamic.threadpool.starter.remote;
import com.github.dynamic.threadpool.starter.toolkit.HttpClientUtil;
import com.github.dynamic.threadpool.common.config.ApplicationContextHolder;
import com.github.dynamic.threadpool.starter.config.DynamicThreadPoolProperties;
import com.github.dynamic.threadpool.common.web.base.Result; import com.github.dynamic.threadpool.common.web.base.Result;
import com.github.dynamic.threadpool.starter.config.DynamicThreadPoolProperties;
import com.github.dynamic.threadpool.starter.toolkit.HttpClientUtil;
import java.util.Map; import java.util.Map;
@ -19,10 +18,11 @@ public class ServerHttpAgent implements HttpAgent {
private final ServerListManager serverListManager; private final ServerListManager serverListManager;
private HttpClientUtil httpClientUtil = ApplicationContextHolder.getBean(HttpClientUtil.class); private final HttpClientUtil httpClientUtil;
public ServerHttpAgent(DynamicThreadPoolProperties properties) { public ServerHttpAgent(DynamicThreadPoolProperties properties, HttpClientUtil httpClientUtil) {
this.dynamicThreadPoolProperties = properties; this.dynamicThreadPoolProperties = properties;
this.httpClientUtil = httpClientUtil;
this.serverListManager = new ServerListManager(dynamicThreadPoolProperties); this.serverListManager = new ServerListManager(dynamicThreadPoolProperties);
} }
@ -32,31 +32,37 @@ public class ServerHttpAgent implements HttpAgent {
} }
@Override @Override
public Result httpGet(String path, Map<String, String> headers, Map<String, String> paramValues, long readTimeoutMs) { public String getTenantId() {
return httpClientUtil.restApiGetByThreadPool(buildUrl(path), headers, paramValues, readTimeoutMs, Result.class); return dynamicThreadPoolProperties.getNamespace();
} }
@Override @Override
public Result httpPost(String path, Map<String, String> headers, Map<String, String> paramValues, long readTimeoutMs) { public String getEncode() {
return httpClientUtil.restApiPostByThreadPool(buildUrl(path), headers, paramValues, readTimeoutMs, Result.class); return null;
} }
@Override @Override
public Result httpDelete(String path, Map<String, String> headers, Map<String, String> paramValues, long readTimeoutMs) { public Result httpPostByDiscovery(String url, Object body) {
return null; return httpClientUtil.restApiPost(url, body, Result.class);
} }
@Override @Override
public String getTenantId() { public Result httpGetByConfig(String path, Map<String, String> headers, Map<String, String> paramValues, long readTimeoutMs) {
return dynamicThreadPoolProperties.getNamespace(); return httpClientUtil.restApiGetByThreadPool(buildUrl(path), headers, paramValues, readTimeoutMs, Result.class);
} }
@Override @Override
public String getEncode() { public Result httpPostByConfig(String path, Map<String, String> headers, Map<String, String> paramValues, long readTimeoutMs) {
return httpClientUtil.restApiPostByThreadPool(buildUrl(path), headers, paramValues, readTimeoutMs, Result.class);
}
@Override
public Result httpDeleteByConfig(String path, Map<String, String> headers, Map<String, String> paramValues, long readTimeoutMs) {
return null; return null;
} }
private String buildUrl(String path) { private String buildUrl(String path) {
return serverListManager.getCurrentServerAddr() + path; return serverListManager.getCurrentServerAddr() + path;
} }
} }

Loading…
Cancel
Save