From da0c9318c957028b4a253090cace06b9628d1823 Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Tue, 22 Jun 2021 21:43:11 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=8A=9F=E8=83=BD=E6=8C=81=E7=BB=AD?= =?UTF-8?q?=E6=9B=B4=E6=96=B0.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/.gitignore | 33 ++++ common/pom.xml | 22 +++ .../hreadpool/common/toolkit/Md5Util.java | 56 +++++++ .../pom.xml | 5 + .../starter/adapter/ConfigAdapter.java | 22 +++ .../adapter/ThreadPoolConfigAdapter.java | 37 +++++ .../threadpool/starter/common/Constants.java | 10 +- .../starter/config/CommonConfiguration.java | 5 - .../DynamicThreadPoolAutoConfiguration.java | 49 ++++++ .../config/DynamicThreadPoolProperties.java | 32 ++++ .../threadpool/starter/core/CacheData.java | 85 ++++++++++ .../starter/core/ConfigService.java | 21 +++ .../starter/core/GlobalThreadPoolManage.java | 12 +- .../starter/core/ThreadPoolConfigService.java | 26 +++ .../ThreadPoolDynamicRefresh.java} | 17 +- .../starter/core/ThreadPoolRunListener.java | 16 +- .../starter/listener/ClientWorker.java | 152 ++++++++++++++++++ .../threadpool/starter/listener/Listener.java | 26 +++ .../operation/ThreadPoolOperation.java | 37 +++++ .../ThreadPoolSubscribeCallback.java | 18 +++ .../starter/wrap/DynamicThreadPoolWrap.java | 14 +- .../starter/wrap/ManagerListenerWrap.java | 25 +++ example/pom.xml | 4 +- .../example/config/ThreadPoolConfig.java | 4 +- .../src/main/resources/application.properties | 1 - example/src/main/resources/application.yaml | 5 + pom.xml | 9 +- .../server/controller/ConfigController.java | 4 +- .../server/mapper/RowMapperManager.java | 2 +- .../threadpool/server/model/ConfigInfo.java | 2 +- .../server/service/ConfigService.java | 4 +- .../service/impl/ConfigServiceImpl.java | 6 +- 32 files changed, 707 insertions(+), 54 deletions(-) create mode 100644 common/.gitignore create mode 100644 common/pom.xml create mode 100644 common/src/main/java/io/dynamict/hreadpool/common/toolkit/Md5Util.java create mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/adapter/ConfigAdapter.java create mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/adapter/ThreadPoolConfigAdapter.java create mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolAutoConfiguration.java create mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolProperties.java create mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/CacheData.java create mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ConfigService.java create mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolConfigService.java rename dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/{monitor/ThreadPoolDynamicMonitor.java => core/ThreadPoolDynamicRefresh.java} (56%) create mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/Listener.java create mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/operation/ThreadPoolOperation.java create mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/operation/ThreadPoolSubscribeCallback.java create mode 100644 dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/wrap/ManagerListenerWrap.java delete mode 100644 example/src/main/resources/application.properties create mode 100644 example/src/main/resources/application.yaml diff --git a/common/.gitignore b/common/.gitignore new file mode 100644 index 00000000..549e00a2 --- /dev/null +++ b/common/.gitignore @@ -0,0 +1,33 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/common/pom.xml b/common/pom.xml new file mode 100644 index 00000000..b6a0b364 --- /dev/null +++ b/common/pom.xml @@ -0,0 +1,22 @@ + + + 4.0.0 + + + io.dynamic-threadpool + parent + ${revision} + + + common + jar + + common + Demo project for Spring Boot + + + 1.8 + + + diff --git a/common/src/main/java/io/dynamict/hreadpool/common/toolkit/Md5Util.java b/common/src/main/java/io/dynamict/hreadpool/common/toolkit/Md5Util.java new file mode 100644 index 00000000..f82b5b17 --- /dev/null +++ b/common/src/main/java/io/dynamict/hreadpool/common/toolkit/Md5Util.java @@ -0,0 +1,56 @@ +package io.dynamict.hreadpool.common.toolkit; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +/** + * MD5 Util. + * + * @author chen.ma + * @date 2021/6/22 17:55 + */ +public class Md5Util { + + private static final char[] DIGITS_LOWER = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; + + private static final ThreadLocal MESSAGE_DIGEST_LOCAL = ThreadLocal.withInitial(() -> { + try { + return MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + return null; + } + }); + + public static String md5Hex(byte[] bytes) throws NoSuchAlgorithmException { + try { + MessageDigest messageDigest = MESSAGE_DIGEST_LOCAL.get(); + if (messageDigest != null) { + return encodeHexString(messageDigest.digest(bytes)); + } + throw new NoSuchAlgorithmException("MessageDigest get MD5 instance error"); + } finally { + MESSAGE_DIGEST_LOCAL.remove(); + } + } + + public static String md5Hex(String value, String encode) { + try { + return md5Hex(value.getBytes(encode)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static String encodeHexString(byte[] bytes) { + int l = bytes.length; + + char[] out = new char[l << 1]; + + for (int i = 0, j = 0; i < l; i++) { + out[j++] = DIGITS_LOWER[(0xF0 & bytes[i]) >>> 4]; + out[j++] = DIGITS_LOWER[0x0F & bytes[i]]; + } + + return new String(out); + } +} diff --git a/dynamic-threadpool-spring-boot-starter/pom.xml b/dynamic-threadpool-spring-boot-starter/pom.xml index 723056f2..f152c303 100644 --- a/dynamic-threadpool-spring-boot-starter/pom.xml +++ b/dynamic-threadpool-spring-boot-starter/pom.xml @@ -39,5 +39,10 @@ com.alibaba fastjson + + + io.dynamic-threadpool + common + diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/adapter/ConfigAdapter.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/adapter/ConfigAdapter.java new file mode 100644 index 00000000..34557b0f --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/adapter/ConfigAdapter.java @@ -0,0 +1,22 @@ +package io.dynamic.threadpool.starter.adapter; + +import io.dynamic.threadpool.starter.core.ThreadPoolDynamicRefresh; + +/** + * ConfigAdapter. + * + * @author chen.ma + * @date 2021/6/22 21:29 + */ +public class ConfigAdapter { + + /** + * 回调修改线程池配置 + * + * @param tpId + * @param config + */ + public void callbackConfig(String tpId, String config) { + ThreadPoolDynamicRefresh.refreshDynamicPool(tpId, config); + } +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/adapter/ThreadPoolConfigAdapter.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/adapter/ThreadPoolConfigAdapter.java new file mode 100644 index 00000000..59a0398e --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/adapter/ThreadPoolConfigAdapter.java @@ -0,0 +1,37 @@ +package io.dynamic.threadpool.starter.adapter; + +import cn.hutool.core.thread.ThreadFactoryBuilder; +import io.dynamic.threadpool.starter.operation.ThreadPoolOperation; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * 动态线程池配置适配器 + * + * @author chen.ma + * @date 2021/6/22 20:17 + */ +public class ThreadPoolConfigAdapter extends ConfigAdapter { + + @Autowired + private ThreadPoolOperation threadPoolOperation; + + private ExecutorService executorService = new ThreadPoolExecutor( + 2, + 4, + 0, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(1), + new ThreadFactoryBuilder().setNamePrefix("threadPool-config").build(), + new ThreadPoolExecutor.DiscardOldestPolicy()); + + public void subscribeConfig(List tpIds) { + tpIds.forEach(each -> threadPoolOperation.subscribeConfig(each, executorService, (tpId, config) -> callbackConfig(tpId, config))); + } + +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/common/Constants.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/common/Constants.java index 2cca19da..7e66170d 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/common/Constants.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/common/Constants.java @@ -8,11 +8,13 @@ package io.dynamic.threadpool.starter.common; */ public class Constants { - public static final String DEFAULT_GROUP = "DEFAULT_GROUP"; + public static final String TP_ID = "tpId"; - public static final String DATA_ID = "dataId"; - - public static final String GROUP_ID = "group"; + public static final String ITEM_ID = "itemId"; public static final String DEFAULT_NAMESPACE_ID = "public"; + + public static final String NULL = ""; + + public static final String ENCODE = "UTF-8"; } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/CommonConfiguration.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/CommonConfiguration.java index de06e3c8..1c394a81 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/CommonConfiguration.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/CommonConfiguration.java @@ -1,6 +1,5 @@ package io.dynamic.threadpool.starter.config; -import io.dynamic.threadpool.starter.core.ThreadPoolRunListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -18,8 +17,4 @@ public class CommonConfiguration { return new ApplicationContextHolder(); } - @Bean - public ThreadPoolRunListener threadPoolRunListener() { - return new ThreadPoolRunListener(); - } } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolAutoConfiguration.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolAutoConfiguration.java new file mode 100644 index 00000000..5d9a461c --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -0,0 +1,49 @@ +package io.dynamic.threadpool.starter.config; + +import io.dynamic.threadpool.starter.adapter.ThreadPoolConfigAdapter; +import io.dynamic.threadpool.starter.core.ConfigService; +import io.dynamic.threadpool.starter.core.ThreadPoolConfigService; +import io.dynamic.threadpool.starter.core.ThreadPoolRunListener; +import io.dynamic.threadpool.starter.operation.ThreadPoolOperation; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * 动态线程池自动装配类 + * + * @author chen.ma + * @date 2021/6/22 09:20 + */ +@Slf4j +@Configuration +@AllArgsConstructor +@EnableConfigurationProperties(DynamicThreadPoolProperties.class) +@ConditionalOnProperty(prefix = DynamicThreadPoolProperties.PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true) +public class DynamicThreadPoolAutoConfiguration { + + private final DynamicThreadPoolProperties properties; + + @Bean + public ConfigService configService() { + return new ThreadPoolConfigService(); + } + + @Bean + public ThreadPoolRunListener threadPoolRunListener() { + return new ThreadPoolRunListener(); + } + + @Bean + public ThreadPoolConfigAdapter threadPoolConfigAdapter() { + return new ThreadPoolConfigAdapter(); + } + + @Bean + public ThreadPoolOperation threadPoolOperation() { + return new ThreadPoolOperation(); + } +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolProperties.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolProperties.java new file mode 100644 index 00000000..2104f124 --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolProperties.java @@ -0,0 +1,32 @@ +package io.dynamic.threadpool.starter.config; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * 动态线程池配置 + * + * @author chen.ma + * @date 2021/6/22 09:14 + */ +@Slf4j +@Getter +@Setter +@ConfigurationProperties(prefix = DynamicThreadPoolProperties.PREFIX) +public class DynamicThreadPoolProperties { + + public static final String PREFIX = "spring.threadpool.dynamic"; + + /** + * 命名空间 + */ + private String namespace; + + /** + * 项目 Id + */ + private String itemId; + +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/CacheData.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/CacheData.java new file mode 100644 index 00000000..50ac839e --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/CacheData.java @@ -0,0 +1,85 @@ +package io.dynamic.threadpool.starter.core; + +import io.dynamic.threadpool.starter.common.Constants; +import io.dynamic.threadpool.starter.listener.Listener; +import io.dynamic.threadpool.starter.wrap.ManagerListenerWrap; +import io.dynamict.hreadpool.common.toolkit.Md5Util; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * CacheData. + * + * @author chen.ma + * @date 2021/6/22 20:46 + */ +@Slf4j +public class CacheData { + + private volatile String md5; + + private volatile String content; + + public final String tpId; + + private int taskId; + + private volatile long localConfigLastModified; + + private final CopyOnWriteArrayList listeners; + + public CacheData(String tpId) { + this.tpId = tpId; + // TODO:nacos 走的本地文件获取, 这里思考下如何优雅获取 + this.content = null; + this.md5 = getMd5String(content); + this.listeners = new CopyOnWriteArrayList(); + + } + + public void addListener(Listener listener) { + if (null == listener) { + throw new IllegalArgumentException("listener is null"); + } + + ManagerListenerWrap managerListenerWrap = new ManagerListenerWrap(md5, listener); + + if (listeners.addIfAbsent(managerListenerWrap)) { + log.info("[add-listener] ok, tpId :: {}, cnt :: {}", tpId, listeners.size()); + } + } + + public void checkListenerMd5() { + for (ManagerListenerWrap wrap : listeners) { + if (!md5.equals(wrap.getLastCallMd5())) { + safeNotifyListener(content, md5, wrap); + } + } + } + + private void safeNotifyListener(String content, String md5, ManagerListenerWrap wrap) { + Listener listener = wrap.getListener(); + + Runnable runnable = () -> { + wrap.setLastCallMd5(md5); + + listener.receiveConfigInfo(content); + }; + + listener.getExecutor().execute(runnable); + } + + public void setContent(String content) { + this.content = content; + this.md5 = getMd5String(this.content); + } + + public static String getMd5String(String config) { + return (null == config) ? Constants.NULL : Md5Util.md5Hex(config, Constants.ENCODE); + } + + public String getMd5() { + return this.md5; + } +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ConfigService.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ConfigService.java new file mode 100644 index 00000000..9566aad6 --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ConfigService.java @@ -0,0 +1,21 @@ +package io.dynamic.threadpool.starter.core; + +import io.dynamic.threadpool.starter.listener.Listener; + +/** + * 配置服务 + * + * @author chen.ma + * @date 2021/6/21 21:49 + */ +public interface ConfigService { + + /** + * 添加监听器, 如果服务端发生变更, 客户端会使用监听器进行回调 + * + * @param tpId + * @param listener + */ + void addListener(String tpId, Listener listener); + +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/GlobalThreadPoolManage.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/GlobalThreadPoolManage.java index 9104ec11..7571a5f2 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/GlobalThreadPoolManage.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/GlobalThreadPoolManage.java @@ -15,15 +15,15 @@ public class GlobalThreadPoolManage { private static final Map EXECUTOR_MAP = new ConcurrentHashMap(); - public static DynamicThreadPoolWrap getExecutorService(String name) { - return EXECUTOR_MAP.get(name); + public static DynamicThreadPoolWrap getExecutorService(String tpId) { + return EXECUTOR_MAP.get(tpId); } - public static void register(String name, DynamicThreadPoolWrap executor) { - EXECUTOR_MAP.put(name, executor); + public static void register(String tpId, DynamicThreadPoolWrap executor) { + EXECUTOR_MAP.put(tpId, executor); } - public static void remove(String name) { - EXECUTOR_MAP.remove(name); + public static void remove(String tpId) { + EXECUTOR_MAP.remove(tpId); } } \ No newline at end of file diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolConfigService.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolConfigService.java new file mode 100644 index 00000000..5d34c79a --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolConfigService.java @@ -0,0 +1,26 @@ +package io.dynamic.threadpool.starter.core; + +import io.dynamic.threadpool.starter.listener.ClientWorker; +import io.dynamic.threadpool.starter.listener.Listener; + +import java.util.Arrays; + +/** + * 线程池配置服务 + * + * @author chen.ma + * @date 2021/6/21 21:50 + */ +public class ThreadPoolConfigService implements ConfigService { + + private final ClientWorker clientWorker; + + public ThreadPoolConfigService() { + clientWorker = new ClientWorker(); + } + + @Override + public void addListener(String tpId, Listener listener) { + clientWorker.addTenantListeners(tpId, Arrays.asList(listener)); + } +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/monitor/ThreadPoolDynamicMonitor.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolDynamicRefresh.java similarity index 56% rename from dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/monitor/ThreadPoolDynamicMonitor.java rename to dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolDynamicRefresh.java index 51f6d7b7..e6c072a2 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/monitor/ThreadPoolDynamicMonitor.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolDynamicRefresh.java @@ -1,7 +1,7 @@ -package io.dynamic.threadpool.starter.monitor; +package io.dynamic.threadpool.starter.core; -import io.dynamic.threadpool.starter.core.GlobalThreadPoolManage; -import io.dynamic.threadpool.starter.core.ResizableCapacityLinkedBlockIngQueue; +import com.alibaba.fastjson.JSON; +import io.dynamic.threadpool.starter.model.PoolParameterInfo; import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap; import java.util.concurrent.ThreadPoolExecutor; @@ -13,10 +13,15 @@ import java.util.concurrent.TimeUnit; * @author chen.ma * @date 2021/6/20 15:51 */ -public class ThreadPoolDynamicMonitor { +public class ThreadPoolDynamicRefresh { - public void dynamicPool(String threadPoolName, Integer coreSize, Integer maxSize, Integer capacity, Long keepAliveTime) { - DynamicThreadPoolWrap wrap = GlobalThreadPoolManage.getExecutorService(threadPoolName); + public static void refreshDynamicPool(String tpId, String content) { + PoolParameterInfo parameter = JSON.parseObject(content, PoolParameterInfo.class); + refreshDynamicPool(tpId, parameter.getCoreSize(), parameter.getMaxSize(), parameter.getCapacity(), parameter.getKeepAliveTime()); + } + + public static void refreshDynamicPool(String threadPoolId, Integer coreSize, Integer maxSize, Integer capacity, Integer keepAliveTime) { + DynamicThreadPoolWrap wrap = GlobalThreadPoolManage.getExecutorService(threadPoolId); ThreadPoolExecutor executor = wrap.getPool(); if (coreSize != null) { executor.setCorePoolSize(coreSize); diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolRunListener.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolRunListener.java index 79a5e000..ba942fa8 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolRunListener.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolRunListener.java @@ -2,6 +2,7 @@ package io.dynamic.threadpool.starter.core; import io.dynamic.threadpool.starter.common.CommonThreadPool; import io.dynamic.threadpool.starter.config.ApplicationContextHolder; +import io.dynamic.threadpool.starter.config.DynamicThreadPoolProperties; import io.dynamic.threadpool.starter.model.PoolParameterInfo; import io.dynamic.threadpool.starter.toolkit.BlockingQueueUtil; import io.dynamic.threadpool.starter.toolkit.HttpClientUtil; @@ -10,6 +11,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; +import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -27,17 +29,19 @@ public class ThreadPoolRunListener implements ApplicationRunner { @Autowired private HttpClientUtil httpClientUtil; + @Resource + private DynamicThreadPoolProperties dynamicThreadPoolProperties; + @Override public void run(ApplicationArguments args) throws Exception { Map executorMap = ApplicationContextHolder.getBeansOfType(DynamicThreadPoolWrap.class); executorMap.forEach((key, val) -> { - Map queryStrMap = new HashMap(16); queryStrMap.put("tdId", val.getTpId()); - queryStrMap.put("itemId", val.getItemId()); - queryStrMap.put("tenant", val.getTenant()); + queryStrMap.put("itemId", dynamicThreadPoolProperties.getItemId()); + queryStrMap.put("namespace", dynamicThreadPoolProperties.getNamespace()); PoolParameterInfo ppi = httpClientUtil.restApiGet(buildUrl(), queryStrMap, PoolParameterInfo.class); if (ppi != null) { @@ -50,7 +54,7 @@ public class ThreadPoolRunListener implements ApplicationRunner { val.setPool(CommonThreadPool.getInstance(val.getTpId())); } - GlobalThreadPoolManage.register(buildOnlyId(val), val); + GlobalThreadPoolManage.register(val.getTpId(), val); }); } @@ -58,8 +62,4 @@ public class ThreadPoolRunListener implements ApplicationRunner { return "http://127.0.0.1:6691/v1/cs/configs"; } - private String buildOnlyId(DynamicThreadPoolWrap poolWrap) { - return poolWrap.getTenant() + "_" + poolWrap.getItemId() + "_" + poolWrap.getTpId(); - } - } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/ClientWorker.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/ClientWorker.java index c7fe4d8c..6fff2d44 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/ClientWorker.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/ClientWorker.java @@ -1,10 +1,162 @@ package io.dynamic.threadpool.starter.listener; +import io.dynamic.threadpool.starter.core.CacheData; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.CollectionUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + /** * 客户端监听 * * @author chen.ma * @date 2021/6/20 18:34 */ +@Slf4j public class ClientWorker { + + private double currentLongingTaskCount = 0; + + private final ScheduledExecutorService executor; + + private final ScheduledExecutorService executorService; + + private final ConcurrentHashMap cacheMap = new ConcurrentHashMap(16); + + @SuppressWarnings("all") + public ClientWorker() { + this.executor = Executors.newScheduledThreadPool(1, r -> { + Thread t = new Thread(r); + t.setName("io.dynamic.threadPool.client.Worker.executor"); + t.setDaemon(true); + return t; + }); + + int threadSize = Runtime.getRuntime().availableProcessors(); + this.executorService = Executors.newScheduledThreadPool(threadSize, r -> { + Thread t = new Thread(r); + t.setName("io.dynamic.threadPool.client.Worker.longPolling.executor"); + t.setDaemon(true); + return t; + }); + + this.executor.scheduleWithFixedDelay(() -> { + try { + checkConfigInfo(); + } catch (Throwable e) { + log.error("[sub-check] rotate check error", e); + } + }, 1L, 10L, TimeUnit.MILLISECONDS); + } + + /** + * 检查配置信息 + */ + public void checkConfigInfo() { + int listenerSize = cacheMap.size(); + double perTaskConfigSize = 3000D; + int longingTaskCount = (int) Math.ceil(listenerSize / perTaskConfigSize); + + if (longingTaskCount > currentLongingTaskCount) { + for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { + executorService.execute(new LongPollingRunnable(i)); + } + currentLongingTaskCount = longingTaskCount; + } + } + + /** + * 长轮训任务 + */ + class LongPollingRunnable implements Runnable { + + private final int taskId; + + public LongPollingRunnable(int taskId) { + this.taskId = taskId; + } + + @Override + public void run() { + List cacheDataList = new ArrayList(); + + List changedTpIds = checkUpdateTpIds(cacheDataList); + if (!CollectionUtils.isEmpty(cacheDataList)) { + log.info("[dynamic threadPool] tpIds changed :: {}", changedTpIds); + } + + for (String each : changedTpIds) { + String[] keys = each.split(","); + String namespace = keys[0]; + String itemId = keys[1]; + String tpId = keys[2]; + + try { + String content = getServerConfig(namespace, itemId, tpId, 3000L); + CacheData cacheData = cacheMap.get(tpId); + cacheData.setContent(content); + cacheDataList.add(cacheData); + log.info("[data-received] namespace :: {}, itemId :: {}, tpId :: {}, md5 :: {}, content :: {}", + namespace, itemId, tpId, cacheData.getMd5(), content); + } catch (Exception ex) { + // ignore + } + } + + for (CacheData each : cacheDataList) { + each.checkListenerMd5(); + } + + } + } + + /** + * 检查修改的线程池 ID + * + * @param cacheDataList + * @return + */ + public List checkUpdateTpIds(List cacheDataList) { + return null; + } + + public String getServerConfig(String namespace, String itemId, String tpId, long readTimeout) { + return null; + } + + /** + * CacheData 添加 Listener + * + * @param tpId + * @param listeners + */ + public void addTenantListeners(String tpId, List listeners) { + CacheData cacheData = addCacheDataIfAbsent(tpId); + for (Listener listener : listeners) { + cacheData.addListener(listener); + } + } + + /** + * CacheData 不存在则添加 + * + * @param tpId + * @return + */ + public CacheData addCacheDataIfAbsent(String tpId) { + CacheData cacheData = cacheMap.get(tpId); + if (cacheData != null) { + return cacheData; + } + + cacheData = new CacheData(tpId); + CacheData lastCacheData = cacheMap.putIfAbsent(tpId, cacheData); + + return lastCacheData; + } } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/Listener.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/Listener.java new file mode 100644 index 00000000..eca04a1d --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/Listener.java @@ -0,0 +1,26 @@ +package io.dynamic.threadpool.starter.listener; + +import java.util.concurrent.Executor; + +/** + * 监听器 + * + * @author chen.ma + * @date 2021/6/22 20:20 + */ +public interface Listener { + + /** + * 获取执行器 + * + * @return + */ + Executor getExecutor(); + + /** + * 接受配置信息 + * + * @param configInfo + */ + void receiveConfigInfo(String configInfo); +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/operation/ThreadPoolOperation.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/operation/ThreadPoolOperation.java new file mode 100644 index 00000000..33cca1f0 --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/operation/ThreadPoolOperation.java @@ -0,0 +1,37 @@ +package io.dynamic.threadpool.starter.operation; + +import io.dynamic.threadpool.starter.core.ConfigService; +import io.dynamic.threadpool.starter.listener.Listener; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.concurrent.Executor; + +/** + * ThreadPoolOperation. + * + * @author chen.ma + * @date 2021/6/22 20:25 + */ +public class ThreadPoolOperation { + + @Autowired + private ConfigService configService; + + public Listener subscribeConfig(String tpId, Executor executor, ThreadPoolSubscribeCallback threadPoolSubscribeCallback) { + Listener configListener = new Listener() { + @Override + public void receiveConfigInfo(String config) { + threadPoolSubscribeCallback.callback(tpId, config); + } + + @Override + public Executor getExecutor() { + return executor; + } + }; + + configService.addListener(tpId, configListener); + + return configListener; + } +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/operation/ThreadPoolSubscribeCallback.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/operation/ThreadPoolSubscribeCallback.java new file mode 100644 index 00000000..ea4de466 --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/operation/ThreadPoolSubscribeCallback.java @@ -0,0 +1,18 @@ +package io.dynamic.threadpool.starter.operation; + +/** + * ThreadPoolSubscribeCallback. + * + * @author chen.ma + * @date 2021/6/22 20:26 + */ +public interface ThreadPoolSubscribeCallback { + + /** + * 回调函数 + * + * @param tpId + * @param config + */ + void callback(String tpId, String config); +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/wrap/DynamicThreadPoolWrap.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/wrap/DynamicThreadPoolWrap.java index ec0d6b69..de5556e5 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/wrap/DynamicThreadPoolWrap.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/wrap/DynamicThreadPoolWrap.java @@ -16,7 +16,7 @@ import java.util.concurrent.ThreadPoolExecutor; @Data public class DynamicThreadPoolWrap { - private String tenant; + private String namespace; private String itemId; @@ -27,25 +27,19 @@ public class DynamicThreadPoolWrap { /** * 首选服务端线程池, 为空使用默认线程池 {@link CommonThreadPool#getInstance(String)} * - * @param tenant - * @param itemId * @param threadPoolId */ - public DynamicThreadPoolWrap(String tenant, String itemId, String threadPoolId) { - this(tenant, itemId, threadPoolId, null); + public DynamicThreadPoolWrap(String threadPoolId) { + this(threadPoolId, null); } /** * 首选服务端线程池, 为空使用 threadPoolExecutor * - * @param tenant - * @param itemId * @param threadPoolId * @param threadPoolExecutor */ - public DynamicThreadPoolWrap(String tenant, String itemId, String threadPoolId, ThreadPoolExecutor threadPoolExecutor) { - this.tenant = tenant; - this.itemId = itemId; + public DynamicThreadPoolWrap(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) { this.tpId = threadPoolId; this.pool = threadPoolExecutor; } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/wrap/ManagerListenerWrap.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/wrap/ManagerListenerWrap.java new file mode 100644 index 00000000..27f19505 --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/wrap/ManagerListenerWrap.java @@ -0,0 +1,25 @@ +package io.dynamic.threadpool.starter.wrap; + +import io.dynamic.threadpool.starter.listener.Listener; +import lombok.Getter; +import lombok.Setter; + +/** + * 监听包装 + * + * @author chen.ma + * @date 2021/6/22 17:47 + */ +@Getter +@Setter +public class ManagerListenerWrap { + + final Listener listener; + + String lastCallMd5; + + public ManagerListenerWrap(String md5, Listener listener) { + this.lastCallMd5 = md5; + this.listener = listener; + } +} diff --git a/example/pom.xml b/example/pom.xml index 3e0172c6..65b239c0 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -37,8 +37,8 @@ - io.dynamic-thread-pool - dtp-spring-boot-starter + io.dynamic-threadpool + dynamic-threadpool-spring-boot-starter ${revision} diff --git a/example/src/main/java/io/dynamic/threadpool/example/config/ThreadPoolConfig.java b/example/src/main/java/io/dynamic/threadpool/example/config/ThreadPoolConfig.java index 86f5e71e..b7843445 100644 --- a/example/src/main/java/io/dynamic/threadpool/example/config/ThreadPoolConfig.java +++ b/example/src/main/java/io/dynamic/threadpool/example/config/ThreadPoolConfig.java @@ -1,6 +1,6 @@ package io.dynamic.threadpool.example.config; -import io.dtp.starter.wrap.DynamicThreadPoolWrap; +import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -15,7 +15,7 @@ public class ThreadPoolConfig { @Bean public DynamicThreadPoolWrap messageCenterConsumeThreadPool() { - return new DynamicThreadPoolWrap("common", "message", "message-consume"); + return new DynamicThreadPoolWrap("message-consume"); } } diff --git a/example/src/main/resources/application.properties b/example/src/main/resources/application.properties deleted file mode 100644 index 8b137891..00000000 --- a/example/src/main/resources/application.properties +++ /dev/null @@ -1 +0,0 @@ - diff --git a/example/src/main/resources/application.yaml b/example/src/main/resources/application.yaml new file mode 100644 index 00000000..2d9c745c --- /dev/null +++ b/example/src/main/resources/application.yaml @@ -0,0 +1,5 @@ +spring: + threadpool: + dynamic: + namespace: common + itemId: message-center diff --git a/pom.xml b/pom.xml index 5fd4cb78..3a0240b3 100644 --- a/pom.xml +++ b/pom.xml @@ -18,6 +18,7 @@ 3.8.1 5.4.7 1.2.75 + 3.12.0 2.3.2.RELEASE @@ -60,7 +61,13 @@ org.apache.commons commons-lang3 - 3.12.0 + ${commons-lang3.version} + + + + io.dynamic-threadpool + common + ${revision} diff --git a/server/src/main/java/io/dynamic/threadpool/server/controller/ConfigController.java b/server/src/main/java/io/dynamic/threadpool/server/controller/ConfigController.java index 6003e7f6..aadd6e98 100644 --- a/server/src/main/java/io/dynamic/threadpool/server/controller/ConfigController.java +++ b/server/src/main/java/io/dynamic/threadpool/server/controller/ConfigController.java @@ -29,9 +29,9 @@ public class ConfigController { public ConfigInfoBase detailConfigInfo( @RequestParam("tdId") String tdId, @RequestParam("itemId") String itemId, - @RequestParam(value = "tenant", required = false, defaultValue = "") String tenant) { + @RequestParam(value = "namespace", required = false, defaultValue = "") String namespace) { - return configService.findConfigAllInfo(tdId, itemId, tenant); + return configService.findConfigAllInfo(tdId, itemId, namespace); } @SneakyThrows diff --git a/server/src/main/java/io/dynamic/threadpool/server/mapper/RowMapperManager.java b/server/src/main/java/io/dynamic/threadpool/server/mapper/RowMapperManager.java index 9b0f5655..f474ee75 100644 --- a/server/src/main/java/io/dynamic/threadpool/server/mapper/RowMapperManager.java +++ b/server/src/main/java/io/dynamic/threadpool/server/mapper/RowMapperManager.java @@ -22,7 +22,7 @@ public final class RowMapperManager { ConfigAllInfo configAllInfo = new ConfigAllInfo(); configAllInfo.setTpId(rs.getString("tp_id")); configAllInfo.setItemId(rs.getString("item_id")); - configAllInfo.setTenant(rs.getString("tenant_id")); + configAllInfo.setNamespace(rs.getString("namespace")); configAllInfo.setContent(rs.getString("content")); configAllInfo.setCoreSize(rs.getInt("core_size")); configAllInfo.setMaxSize(rs.getInt("max_size")); diff --git a/server/src/main/java/io/dynamic/threadpool/server/model/ConfigInfo.java b/server/src/main/java/io/dynamic/threadpool/server/model/ConfigInfo.java index 6da8f56c..b7dec444 100644 --- a/server/src/main/java/io/dynamic/threadpool/server/model/ConfigInfo.java +++ b/server/src/main/java/io/dynamic/threadpool/server/model/ConfigInfo.java @@ -13,5 +13,5 @@ public class ConfigInfo extends ConfigInfoBase { private static final long serialVersionUID = -3504960832191834675L; - private String tenant; + private String namespace; } diff --git a/server/src/main/java/io/dynamic/threadpool/server/service/ConfigService.java b/server/src/main/java/io/dynamic/threadpool/server/service/ConfigService.java index 4c3f57a6..b50c450c 100644 --- a/server/src/main/java/io/dynamic/threadpool/server/service/ConfigService.java +++ b/server/src/main/java/io/dynamic/threadpool/server/service/ConfigService.java @@ -15,8 +15,8 @@ public interface ConfigService { * * @param tpId tpId * @param itemId itemId - * @param tenant tenant + * @param namespace namespace * @return 全部配置信息 */ - ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String tenant); + ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String namespace); } diff --git a/server/src/main/java/io/dynamic/threadpool/server/service/impl/ConfigServiceImpl.java b/server/src/main/java/io/dynamic/threadpool/server/service/impl/ConfigServiceImpl.java index 317821ad..43ba553b 100644 --- a/server/src/main/java/io/dynamic/threadpool/server/service/impl/ConfigServiceImpl.java +++ b/server/src/main/java/io/dynamic/threadpool/server/service/impl/ConfigServiceImpl.java @@ -20,10 +20,10 @@ public class ConfigServiceImpl implements ConfigService { private JdbcTemplate jdbcTemplate; @Override - public ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String tenant) { + public ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String namespace) { ConfigAllInfo configAllInfo = jdbcTemplate.queryForObject( - "select * from config_info where tp_id = ? and item_id = ? and tenant_id = ?", - new Object[]{tpId, itemId, tenant}, + "select * from config_info where tp_id = ? and item_id = ? and namespace = ?", + new Object[]{tpId, itemId, namespace}, RowMapperManager.CONFIG_ALL_INFO_ROW_MAPPER); return configAllInfo;