diff --git a/common/.gitignore b/common/.gitignore
new file mode 100644
index 00000000..549e00a2
--- /dev/null
+++ b/common/.gitignore
@@ -0,0 +1,33 @@
+HELP.md
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
diff --git a/common/pom.xml b/common/pom.xml
new file mode 100644
index 00000000..b6a0b364
--- /dev/null
+++ b/common/pom.xml
@@ -0,0 +1,22 @@
+
+
+ 4.0.0
+
+
+ io.dynamic-threadpool
+ parent
+ ${revision}
+
+
+ common
+ jar
+
+ common
+ Demo project for Spring Boot
+
+
+ 1.8
+
+
+
diff --git a/common/src/main/java/io/dynamict/hreadpool/common/toolkit/Md5Util.java b/common/src/main/java/io/dynamict/hreadpool/common/toolkit/Md5Util.java
new file mode 100644
index 00000000..f82b5b17
--- /dev/null
+++ b/common/src/main/java/io/dynamict/hreadpool/common/toolkit/Md5Util.java
@@ -0,0 +1,56 @@
+package io.dynamict.hreadpool.common.toolkit;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * MD5 Util.
+ *
+ * @author chen.ma
+ * @date 2021/6/22 17:55
+ */
+public class Md5Util {
+
+ private static final char[] DIGITS_LOWER = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
+
+ private static final ThreadLocal MESSAGE_DIGEST_LOCAL = ThreadLocal.withInitial(() -> {
+ try {
+ return MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException e) {
+ return null;
+ }
+ });
+
+ public static String md5Hex(byte[] bytes) throws NoSuchAlgorithmException {
+ try {
+ MessageDigest messageDigest = MESSAGE_DIGEST_LOCAL.get();
+ if (messageDigest != null) {
+ return encodeHexString(messageDigest.digest(bytes));
+ }
+ throw new NoSuchAlgorithmException("MessageDigest get MD5 instance error");
+ } finally {
+ MESSAGE_DIGEST_LOCAL.remove();
+ }
+ }
+
+ public static String md5Hex(String value, String encode) {
+ try {
+ return md5Hex(value.getBytes(encode));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static String encodeHexString(byte[] bytes) {
+ int l = bytes.length;
+
+ char[] out = new char[l << 1];
+
+ for (int i = 0, j = 0; i < l; i++) {
+ out[j++] = DIGITS_LOWER[(0xF0 & bytes[i]) >>> 4];
+ out[j++] = DIGITS_LOWER[0x0F & bytes[i]];
+ }
+
+ return new String(out);
+ }
+}
diff --git a/dynamic-threadpool-spring-boot-starter/pom.xml b/dynamic-threadpool-spring-boot-starter/pom.xml
index 723056f2..f152c303 100644
--- a/dynamic-threadpool-spring-boot-starter/pom.xml
+++ b/dynamic-threadpool-spring-boot-starter/pom.xml
@@ -39,5 +39,10 @@
com.alibaba
fastjson
+
+
+ io.dynamic-threadpool
+ common
+
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/adapter/ConfigAdapter.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/adapter/ConfigAdapter.java
new file mode 100644
index 00000000..34557b0f
--- /dev/null
+++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/adapter/ConfigAdapter.java
@@ -0,0 +1,22 @@
+package io.dynamic.threadpool.starter.adapter;
+
+import io.dynamic.threadpool.starter.core.ThreadPoolDynamicRefresh;
+
+/**
+ * ConfigAdapter.
+ *
+ * @author chen.ma
+ * @date 2021/6/22 21:29
+ */
+public class ConfigAdapter {
+
+ /**
+ * 回调修改线程池配置
+ *
+ * @param tpId
+ * @param config
+ */
+ public void callbackConfig(String tpId, String config) {
+ ThreadPoolDynamicRefresh.refreshDynamicPool(tpId, config);
+ }
+}
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/adapter/ThreadPoolConfigAdapter.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/adapter/ThreadPoolConfigAdapter.java
new file mode 100644
index 00000000..59a0398e
--- /dev/null
+++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/adapter/ThreadPoolConfigAdapter.java
@@ -0,0 +1,37 @@
+package io.dynamic.threadpool.starter.adapter;
+
+import cn.hutool.core.thread.ThreadFactoryBuilder;
+import io.dynamic.threadpool.starter.operation.ThreadPoolOperation;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 动态线程池配置适配器
+ *
+ * @author chen.ma
+ * @date 2021/6/22 20:17
+ */
+public class ThreadPoolConfigAdapter extends ConfigAdapter {
+
+ @Autowired
+ private ThreadPoolOperation threadPoolOperation;
+
+ private ExecutorService executorService = new ThreadPoolExecutor(
+ 2,
+ 4,
+ 0,
+ TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue(1),
+ new ThreadFactoryBuilder().setNamePrefix("threadPool-config").build(),
+ new ThreadPoolExecutor.DiscardOldestPolicy());
+
+ public void subscribeConfig(List tpIds) {
+ tpIds.forEach(each -> threadPoolOperation.subscribeConfig(each, executorService, (tpId, config) -> callbackConfig(tpId, config)));
+ }
+
+}
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/common/Constants.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/common/Constants.java
index 2cca19da..7e66170d 100644
--- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/common/Constants.java
+++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/common/Constants.java
@@ -8,11 +8,13 @@ package io.dynamic.threadpool.starter.common;
*/
public class Constants {
- public static final String DEFAULT_GROUP = "DEFAULT_GROUP";
+ public static final String TP_ID = "tpId";
- public static final String DATA_ID = "dataId";
-
- public static final String GROUP_ID = "group";
+ public static final String ITEM_ID = "itemId";
public static final String DEFAULT_NAMESPACE_ID = "public";
+
+ public static final String NULL = "";
+
+ public static final String ENCODE = "UTF-8";
}
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/CommonConfiguration.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/CommonConfiguration.java
index de06e3c8..1c394a81 100644
--- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/CommonConfiguration.java
+++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/CommonConfiguration.java
@@ -1,6 +1,5 @@
package io.dynamic.threadpool.starter.config;
-import io.dynamic.threadpool.starter.core.ThreadPoolRunListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -18,8 +17,4 @@ public class CommonConfiguration {
return new ApplicationContextHolder();
}
- @Bean
- public ThreadPoolRunListener threadPoolRunListener() {
- return new ThreadPoolRunListener();
- }
}
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolAutoConfiguration.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolAutoConfiguration.java
new file mode 100644
index 00000000..5d9a461c
--- /dev/null
+++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolAutoConfiguration.java
@@ -0,0 +1,49 @@
+package io.dynamic.threadpool.starter.config;
+
+import io.dynamic.threadpool.starter.adapter.ThreadPoolConfigAdapter;
+import io.dynamic.threadpool.starter.core.ConfigService;
+import io.dynamic.threadpool.starter.core.ThreadPoolConfigService;
+import io.dynamic.threadpool.starter.core.ThreadPoolRunListener;
+import io.dynamic.threadpool.starter.operation.ThreadPoolOperation;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * 动态线程池自动装配类
+ *
+ * @author chen.ma
+ * @date 2021/6/22 09:20
+ */
+@Slf4j
+@Configuration
+@AllArgsConstructor
+@EnableConfigurationProperties(DynamicThreadPoolProperties.class)
+@ConditionalOnProperty(prefix = DynamicThreadPoolProperties.PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
+public class DynamicThreadPoolAutoConfiguration {
+
+ private final DynamicThreadPoolProperties properties;
+
+ @Bean
+ public ConfigService configService() {
+ return new ThreadPoolConfigService();
+ }
+
+ @Bean
+ public ThreadPoolRunListener threadPoolRunListener() {
+ return new ThreadPoolRunListener();
+ }
+
+ @Bean
+ public ThreadPoolConfigAdapter threadPoolConfigAdapter() {
+ return new ThreadPoolConfigAdapter();
+ }
+
+ @Bean
+ public ThreadPoolOperation threadPoolOperation() {
+ return new ThreadPoolOperation();
+ }
+}
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolProperties.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolProperties.java
new file mode 100644
index 00000000..2104f124
--- /dev/null
+++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolProperties.java
@@ -0,0 +1,32 @@
+package io.dynamic.threadpool.starter.config;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+/**
+ * 动态线程池配置
+ *
+ * @author chen.ma
+ * @date 2021/6/22 09:14
+ */
+@Slf4j
+@Getter
+@Setter
+@ConfigurationProperties(prefix = DynamicThreadPoolProperties.PREFIX)
+public class DynamicThreadPoolProperties {
+
+ public static final String PREFIX = "spring.threadpool.dynamic";
+
+ /**
+ * 命名空间
+ */
+ private String namespace;
+
+ /**
+ * 项目 Id
+ */
+ private String itemId;
+
+}
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/CacheData.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/CacheData.java
new file mode 100644
index 00000000..50ac839e
--- /dev/null
+++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/CacheData.java
@@ -0,0 +1,85 @@
+package io.dynamic.threadpool.starter.core;
+
+import io.dynamic.threadpool.starter.common.Constants;
+import io.dynamic.threadpool.starter.listener.Listener;
+import io.dynamic.threadpool.starter.wrap.ManagerListenerWrap;
+import io.dynamict.hreadpool.common.toolkit.Md5Util;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * CacheData.
+ *
+ * @author chen.ma
+ * @date 2021/6/22 20:46
+ */
+@Slf4j
+public class CacheData {
+
+ private volatile String md5;
+
+ private volatile String content;
+
+ public final String tpId;
+
+ private int taskId;
+
+ private volatile long localConfigLastModified;
+
+ private final CopyOnWriteArrayList listeners;
+
+ public CacheData(String tpId) {
+ this.tpId = tpId;
+ // TODO:nacos 走的本地文件获取, 这里思考下如何优雅获取
+ this.content = null;
+ this.md5 = getMd5String(content);
+ this.listeners = new CopyOnWriteArrayList();
+
+ }
+
+ public void addListener(Listener listener) {
+ if (null == listener) {
+ throw new IllegalArgumentException("listener is null");
+ }
+
+ ManagerListenerWrap managerListenerWrap = new ManagerListenerWrap(md5, listener);
+
+ if (listeners.addIfAbsent(managerListenerWrap)) {
+ log.info("[add-listener] ok, tpId :: {}, cnt :: {}", tpId, listeners.size());
+ }
+ }
+
+ public void checkListenerMd5() {
+ for (ManagerListenerWrap wrap : listeners) {
+ if (!md5.equals(wrap.getLastCallMd5())) {
+ safeNotifyListener(content, md5, wrap);
+ }
+ }
+ }
+
+ private void safeNotifyListener(String content, String md5, ManagerListenerWrap wrap) {
+ Listener listener = wrap.getListener();
+
+ Runnable runnable = () -> {
+ wrap.setLastCallMd5(md5);
+
+ listener.receiveConfigInfo(content);
+ };
+
+ listener.getExecutor().execute(runnable);
+ }
+
+ public void setContent(String content) {
+ this.content = content;
+ this.md5 = getMd5String(this.content);
+ }
+
+ public static String getMd5String(String config) {
+ return (null == config) ? Constants.NULL : Md5Util.md5Hex(config, Constants.ENCODE);
+ }
+
+ public String getMd5() {
+ return this.md5;
+ }
+}
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ConfigService.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ConfigService.java
new file mode 100644
index 00000000..9566aad6
--- /dev/null
+++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ConfigService.java
@@ -0,0 +1,21 @@
+package io.dynamic.threadpool.starter.core;
+
+import io.dynamic.threadpool.starter.listener.Listener;
+
+/**
+ * 配置服务
+ *
+ * @author chen.ma
+ * @date 2021/6/21 21:49
+ */
+public interface ConfigService {
+
+ /**
+ * 添加监听器, 如果服务端发生变更, 客户端会使用监听器进行回调
+ *
+ * @param tpId
+ * @param listener
+ */
+ void addListener(String tpId, Listener listener);
+
+}
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/GlobalThreadPoolManage.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/GlobalThreadPoolManage.java
index 9104ec11..7571a5f2 100644
--- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/GlobalThreadPoolManage.java
+++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/GlobalThreadPoolManage.java
@@ -15,15 +15,15 @@ public class GlobalThreadPoolManage {
private static final Map EXECUTOR_MAP = new ConcurrentHashMap();
- public static DynamicThreadPoolWrap getExecutorService(String name) {
- return EXECUTOR_MAP.get(name);
+ public static DynamicThreadPoolWrap getExecutorService(String tpId) {
+ return EXECUTOR_MAP.get(tpId);
}
- public static void register(String name, DynamicThreadPoolWrap executor) {
- EXECUTOR_MAP.put(name, executor);
+ public static void register(String tpId, DynamicThreadPoolWrap executor) {
+ EXECUTOR_MAP.put(tpId, executor);
}
- public static void remove(String name) {
- EXECUTOR_MAP.remove(name);
+ public static void remove(String tpId) {
+ EXECUTOR_MAP.remove(tpId);
}
}
\ No newline at end of file
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolConfigService.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolConfigService.java
new file mode 100644
index 00000000..5d34c79a
--- /dev/null
+++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolConfigService.java
@@ -0,0 +1,26 @@
+package io.dynamic.threadpool.starter.core;
+
+import io.dynamic.threadpool.starter.listener.ClientWorker;
+import io.dynamic.threadpool.starter.listener.Listener;
+
+import java.util.Arrays;
+
+/**
+ * 线程池配置服务
+ *
+ * @author chen.ma
+ * @date 2021/6/21 21:50
+ */
+public class ThreadPoolConfigService implements ConfigService {
+
+ private final ClientWorker clientWorker;
+
+ public ThreadPoolConfigService() {
+ clientWorker = new ClientWorker();
+ }
+
+ @Override
+ public void addListener(String tpId, Listener listener) {
+ clientWorker.addTenantListeners(tpId, Arrays.asList(listener));
+ }
+}
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/monitor/ThreadPoolDynamicMonitor.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolDynamicRefresh.java
similarity index 56%
rename from dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/monitor/ThreadPoolDynamicMonitor.java
rename to dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolDynamicRefresh.java
index 51f6d7b7..e6c072a2 100644
--- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/monitor/ThreadPoolDynamicMonitor.java
+++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolDynamicRefresh.java
@@ -1,7 +1,7 @@
-package io.dynamic.threadpool.starter.monitor;
+package io.dynamic.threadpool.starter.core;
-import io.dynamic.threadpool.starter.core.GlobalThreadPoolManage;
-import io.dynamic.threadpool.starter.core.ResizableCapacityLinkedBlockIngQueue;
+import com.alibaba.fastjson.JSON;
+import io.dynamic.threadpool.starter.model.PoolParameterInfo;
import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap;
import java.util.concurrent.ThreadPoolExecutor;
@@ -13,10 +13,15 @@ import java.util.concurrent.TimeUnit;
* @author chen.ma
* @date 2021/6/20 15:51
*/
-public class ThreadPoolDynamicMonitor {
+public class ThreadPoolDynamicRefresh {
- public void dynamicPool(String threadPoolName, Integer coreSize, Integer maxSize, Integer capacity, Long keepAliveTime) {
- DynamicThreadPoolWrap wrap = GlobalThreadPoolManage.getExecutorService(threadPoolName);
+ public static void refreshDynamicPool(String tpId, String content) {
+ PoolParameterInfo parameter = JSON.parseObject(content, PoolParameterInfo.class);
+ refreshDynamicPool(tpId, parameter.getCoreSize(), parameter.getMaxSize(), parameter.getCapacity(), parameter.getKeepAliveTime());
+ }
+
+ public static void refreshDynamicPool(String threadPoolId, Integer coreSize, Integer maxSize, Integer capacity, Integer keepAliveTime) {
+ DynamicThreadPoolWrap wrap = GlobalThreadPoolManage.getExecutorService(threadPoolId);
ThreadPoolExecutor executor = wrap.getPool();
if (coreSize != null) {
executor.setCorePoolSize(coreSize);
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolRunListener.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolRunListener.java
index 79a5e000..ba942fa8 100644
--- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolRunListener.java
+++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolRunListener.java
@@ -2,6 +2,7 @@ package io.dynamic.threadpool.starter.core;
import io.dynamic.threadpool.starter.common.CommonThreadPool;
import io.dynamic.threadpool.starter.config.ApplicationContextHolder;
+import io.dynamic.threadpool.starter.config.DynamicThreadPoolProperties;
import io.dynamic.threadpool.starter.model.PoolParameterInfo;
import io.dynamic.threadpool.starter.toolkit.BlockingQueueUtil;
import io.dynamic.threadpool.starter.toolkit.HttpClientUtil;
@@ -10,6 +11,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
+import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -27,17 +29,19 @@ public class ThreadPoolRunListener implements ApplicationRunner {
@Autowired
private HttpClientUtil httpClientUtil;
+ @Resource
+ private DynamicThreadPoolProperties dynamicThreadPoolProperties;
+
@Override
public void run(ApplicationArguments args) throws Exception {
Map executorMap =
ApplicationContextHolder.getBeansOfType(DynamicThreadPoolWrap.class);
executorMap.forEach((key, val) -> {
-
Map queryStrMap = new HashMap(16);
queryStrMap.put("tdId", val.getTpId());
- queryStrMap.put("itemId", val.getItemId());
- queryStrMap.put("tenant", val.getTenant());
+ queryStrMap.put("itemId", dynamicThreadPoolProperties.getItemId());
+ queryStrMap.put("namespace", dynamicThreadPoolProperties.getNamespace());
PoolParameterInfo ppi = httpClientUtil.restApiGet(buildUrl(), queryStrMap, PoolParameterInfo.class);
if (ppi != null) {
@@ -50,7 +54,7 @@ public class ThreadPoolRunListener implements ApplicationRunner {
val.setPool(CommonThreadPool.getInstance(val.getTpId()));
}
- GlobalThreadPoolManage.register(buildOnlyId(val), val);
+ GlobalThreadPoolManage.register(val.getTpId(), val);
});
}
@@ -58,8 +62,4 @@ public class ThreadPoolRunListener implements ApplicationRunner {
return "http://127.0.0.1:6691/v1/cs/configs";
}
- private String buildOnlyId(DynamicThreadPoolWrap poolWrap) {
- return poolWrap.getTenant() + "_" + poolWrap.getItemId() + "_" + poolWrap.getTpId();
- }
-
}
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/ClientWorker.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/ClientWorker.java
index c7fe4d8c..6fff2d44 100644
--- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/ClientWorker.java
+++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/ClientWorker.java
@@ -1,10 +1,162 @@
package io.dynamic.threadpool.starter.listener;
+import io.dynamic.threadpool.starter.core.CacheData;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
/**
* 客户端监听
*
* @author chen.ma
* @date 2021/6/20 18:34
*/
+@Slf4j
public class ClientWorker {
+
+ private double currentLongingTaskCount = 0;
+
+ private final ScheduledExecutorService executor;
+
+ private final ScheduledExecutorService executorService;
+
+ private final ConcurrentHashMap cacheMap = new ConcurrentHashMap(16);
+
+ @SuppressWarnings("all")
+ public ClientWorker() {
+ this.executor = Executors.newScheduledThreadPool(1, r -> {
+ Thread t = new Thread(r);
+ t.setName("io.dynamic.threadPool.client.Worker.executor");
+ t.setDaemon(true);
+ return t;
+ });
+
+ int threadSize = Runtime.getRuntime().availableProcessors();
+ this.executorService = Executors.newScheduledThreadPool(threadSize, r -> {
+ Thread t = new Thread(r);
+ t.setName("io.dynamic.threadPool.client.Worker.longPolling.executor");
+ t.setDaemon(true);
+ return t;
+ });
+
+ this.executor.scheduleWithFixedDelay(() -> {
+ try {
+ checkConfigInfo();
+ } catch (Throwable e) {
+ log.error("[sub-check] rotate check error", e);
+ }
+ }, 1L, 10L, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * 检查配置信息
+ */
+ public void checkConfigInfo() {
+ int listenerSize = cacheMap.size();
+ double perTaskConfigSize = 3000D;
+ int longingTaskCount = (int) Math.ceil(listenerSize / perTaskConfigSize);
+
+ if (longingTaskCount > currentLongingTaskCount) {
+ for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
+ executorService.execute(new LongPollingRunnable(i));
+ }
+ currentLongingTaskCount = longingTaskCount;
+ }
+ }
+
+ /**
+ * 长轮训任务
+ */
+ class LongPollingRunnable implements Runnable {
+
+ private final int taskId;
+
+ public LongPollingRunnable(int taskId) {
+ this.taskId = taskId;
+ }
+
+ @Override
+ public void run() {
+ List cacheDataList = new ArrayList();
+
+ List changedTpIds = checkUpdateTpIds(cacheDataList);
+ if (!CollectionUtils.isEmpty(cacheDataList)) {
+ log.info("[dynamic threadPool] tpIds changed :: {}", changedTpIds);
+ }
+
+ for (String each : changedTpIds) {
+ String[] keys = each.split(",");
+ String namespace = keys[0];
+ String itemId = keys[1];
+ String tpId = keys[2];
+
+ try {
+ String content = getServerConfig(namespace, itemId, tpId, 3000L);
+ CacheData cacheData = cacheMap.get(tpId);
+ cacheData.setContent(content);
+ cacheDataList.add(cacheData);
+ log.info("[data-received] namespace :: {}, itemId :: {}, tpId :: {}, md5 :: {}, content :: {}",
+ namespace, itemId, tpId, cacheData.getMd5(), content);
+ } catch (Exception ex) {
+ // ignore
+ }
+ }
+
+ for (CacheData each : cacheDataList) {
+ each.checkListenerMd5();
+ }
+
+ }
+ }
+
+ /**
+ * 检查修改的线程池 ID
+ *
+ * @param cacheDataList
+ * @return
+ */
+ public List checkUpdateTpIds(List cacheDataList) {
+ return null;
+ }
+
+ public String getServerConfig(String namespace, String itemId, String tpId, long readTimeout) {
+ return null;
+ }
+
+ /**
+ * CacheData 添加 Listener
+ *
+ * @param tpId
+ * @param listeners
+ */
+ public void addTenantListeners(String tpId, List extends Listener> listeners) {
+ CacheData cacheData = addCacheDataIfAbsent(tpId);
+ for (Listener listener : listeners) {
+ cacheData.addListener(listener);
+ }
+ }
+
+ /**
+ * CacheData 不存在则添加
+ *
+ * @param tpId
+ * @return
+ */
+ public CacheData addCacheDataIfAbsent(String tpId) {
+ CacheData cacheData = cacheMap.get(tpId);
+ if (cacheData != null) {
+ return cacheData;
+ }
+
+ cacheData = new CacheData(tpId);
+ CacheData lastCacheData = cacheMap.putIfAbsent(tpId, cacheData);
+
+ return lastCacheData;
+ }
}
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/Listener.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/Listener.java
new file mode 100644
index 00000000..eca04a1d
--- /dev/null
+++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/Listener.java
@@ -0,0 +1,26 @@
+package io.dynamic.threadpool.starter.listener;
+
+import java.util.concurrent.Executor;
+
+/**
+ * 监听器
+ *
+ * @author chen.ma
+ * @date 2021/6/22 20:20
+ */
+public interface Listener {
+
+ /**
+ * 获取执行器
+ *
+ * @return
+ */
+ Executor getExecutor();
+
+ /**
+ * 接受配置信息
+ *
+ * @param configInfo
+ */
+ void receiveConfigInfo(String configInfo);
+}
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/operation/ThreadPoolOperation.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/operation/ThreadPoolOperation.java
new file mode 100644
index 00000000..33cca1f0
--- /dev/null
+++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/operation/ThreadPoolOperation.java
@@ -0,0 +1,37 @@
+package io.dynamic.threadpool.starter.operation;
+
+import io.dynamic.threadpool.starter.core.ConfigService;
+import io.dynamic.threadpool.starter.listener.Listener;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.concurrent.Executor;
+
+/**
+ * ThreadPoolOperation.
+ *
+ * @author chen.ma
+ * @date 2021/6/22 20:25
+ */
+public class ThreadPoolOperation {
+
+ @Autowired
+ private ConfigService configService;
+
+ public Listener subscribeConfig(String tpId, Executor executor, ThreadPoolSubscribeCallback threadPoolSubscribeCallback) {
+ Listener configListener = new Listener() {
+ @Override
+ public void receiveConfigInfo(String config) {
+ threadPoolSubscribeCallback.callback(tpId, config);
+ }
+
+ @Override
+ public Executor getExecutor() {
+ return executor;
+ }
+ };
+
+ configService.addListener(tpId, configListener);
+
+ return configListener;
+ }
+}
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/operation/ThreadPoolSubscribeCallback.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/operation/ThreadPoolSubscribeCallback.java
new file mode 100644
index 00000000..ea4de466
--- /dev/null
+++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/operation/ThreadPoolSubscribeCallback.java
@@ -0,0 +1,18 @@
+package io.dynamic.threadpool.starter.operation;
+
+/**
+ * ThreadPoolSubscribeCallback.
+ *
+ * @author chen.ma
+ * @date 2021/6/22 20:26
+ */
+public interface ThreadPoolSubscribeCallback {
+
+ /**
+ * 回调函数
+ *
+ * @param tpId
+ * @param config
+ */
+ void callback(String tpId, String config);
+}
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/wrap/DynamicThreadPoolWrap.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/wrap/DynamicThreadPoolWrap.java
index ec0d6b69..de5556e5 100644
--- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/wrap/DynamicThreadPoolWrap.java
+++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/wrap/DynamicThreadPoolWrap.java
@@ -16,7 +16,7 @@ import java.util.concurrent.ThreadPoolExecutor;
@Data
public class DynamicThreadPoolWrap {
- private String tenant;
+ private String namespace;
private String itemId;
@@ -27,25 +27,19 @@ public class DynamicThreadPoolWrap {
/**
* 首选服务端线程池, 为空使用默认线程池 {@link CommonThreadPool#getInstance(String)}
*
- * @param tenant
- * @param itemId
* @param threadPoolId
*/
- public DynamicThreadPoolWrap(String tenant, String itemId, String threadPoolId) {
- this(tenant, itemId, threadPoolId, null);
+ public DynamicThreadPoolWrap(String threadPoolId) {
+ this(threadPoolId, null);
}
/**
* 首选服务端线程池, 为空使用 threadPoolExecutor
*
- * @param tenant
- * @param itemId
* @param threadPoolId
* @param threadPoolExecutor
*/
- public DynamicThreadPoolWrap(String tenant, String itemId, String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
- this.tenant = tenant;
- this.itemId = itemId;
+ public DynamicThreadPoolWrap(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
this.tpId = threadPoolId;
this.pool = threadPoolExecutor;
}
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/wrap/ManagerListenerWrap.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/wrap/ManagerListenerWrap.java
new file mode 100644
index 00000000..27f19505
--- /dev/null
+++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/wrap/ManagerListenerWrap.java
@@ -0,0 +1,25 @@
+package io.dynamic.threadpool.starter.wrap;
+
+import io.dynamic.threadpool.starter.listener.Listener;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * 监听包装
+ *
+ * @author chen.ma
+ * @date 2021/6/22 17:47
+ */
+@Getter
+@Setter
+public class ManagerListenerWrap {
+
+ final Listener listener;
+
+ String lastCallMd5;
+
+ public ManagerListenerWrap(String md5, Listener listener) {
+ this.lastCallMd5 = md5;
+ this.listener = listener;
+ }
+}
diff --git a/example/pom.xml b/example/pom.xml
index 3e0172c6..65b239c0 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -37,8 +37,8 @@
- io.dynamic-thread-pool
- dtp-spring-boot-starter
+ io.dynamic-threadpool
+ dynamic-threadpool-spring-boot-starter
${revision}
diff --git a/example/src/main/java/io/dynamic/threadpool/example/config/ThreadPoolConfig.java b/example/src/main/java/io/dynamic/threadpool/example/config/ThreadPoolConfig.java
index 86f5e71e..b7843445 100644
--- a/example/src/main/java/io/dynamic/threadpool/example/config/ThreadPoolConfig.java
+++ b/example/src/main/java/io/dynamic/threadpool/example/config/ThreadPoolConfig.java
@@ -1,6 +1,6 @@
package io.dynamic.threadpool.example.config;
-import io.dtp.starter.wrap.DynamicThreadPoolWrap;
+import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -15,7 +15,7 @@ public class ThreadPoolConfig {
@Bean
public DynamicThreadPoolWrap messageCenterConsumeThreadPool() {
- return new DynamicThreadPoolWrap("common", "message", "message-consume");
+ return new DynamicThreadPoolWrap("message-consume");
}
}
diff --git a/example/src/main/resources/application.properties b/example/src/main/resources/application.properties
deleted file mode 100644
index 8b137891..00000000
--- a/example/src/main/resources/application.properties
+++ /dev/null
@@ -1 +0,0 @@
-
diff --git a/example/src/main/resources/application.yaml b/example/src/main/resources/application.yaml
new file mode 100644
index 00000000..2d9c745c
--- /dev/null
+++ b/example/src/main/resources/application.yaml
@@ -0,0 +1,5 @@
+spring:
+ threadpool:
+ dynamic:
+ namespace: common
+ itemId: message-center
diff --git a/pom.xml b/pom.xml
index 5fd4cb78..3a0240b3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,6 +18,7 @@
3.8.1
5.4.7
1.2.75
+ 3.12.0
2.3.2.RELEASE
@@ -60,7 +61,13 @@
org.apache.commons
commons-lang3
- 3.12.0
+ ${commons-lang3.version}
+
+
+
+ io.dynamic-threadpool
+ common
+ ${revision}
diff --git a/server/src/main/java/io/dynamic/threadpool/server/controller/ConfigController.java b/server/src/main/java/io/dynamic/threadpool/server/controller/ConfigController.java
index 6003e7f6..aadd6e98 100644
--- a/server/src/main/java/io/dynamic/threadpool/server/controller/ConfigController.java
+++ b/server/src/main/java/io/dynamic/threadpool/server/controller/ConfigController.java
@@ -29,9 +29,9 @@ public class ConfigController {
public ConfigInfoBase detailConfigInfo(
@RequestParam("tdId") String tdId,
@RequestParam("itemId") String itemId,
- @RequestParam(value = "tenant", required = false, defaultValue = "") String tenant) {
+ @RequestParam(value = "namespace", required = false, defaultValue = "") String namespace) {
- return configService.findConfigAllInfo(tdId, itemId, tenant);
+ return configService.findConfigAllInfo(tdId, itemId, namespace);
}
@SneakyThrows
diff --git a/server/src/main/java/io/dynamic/threadpool/server/mapper/RowMapperManager.java b/server/src/main/java/io/dynamic/threadpool/server/mapper/RowMapperManager.java
index 9b0f5655..f474ee75 100644
--- a/server/src/main/java/io/dynamic/threadpool/server/mapper/RowMapperManager.java
+++ b/server/src/main/java/io/dynamic/threadpool/server/mapper/RowMapperManager.java
@@ -22,7 +22,7 @@ public final class RowMapperManager {
ConfigAllInfo configAllInfo = new ConfigAllInfo();
configAllInfo.setTpId(rs.getString("tp_id"));
configAllInfo.setItemId(rs.getString("item_id"));
- configAllInfo.setTenant(rs.getString("tenant_id"));
+ configAllInfo.setNamespace(rs.getString("namespace"));
configAllInfo.setContent(rs.getString("content"));
configAllInfo.setCoreSize(rs.getInt("core_size"));
configAllInfo.setMaxSize(rs.getInt("max_size"));
diff --git a/server/src/main/java/io/dynamic/threadpool/server/model/ConfigInfo.java b/server/src/main/java/io/dynamic/threadpool/server/model/ConfigInfo.java
index 6da8f56c..b7dec444 100644
--- a/server/src/main/java/io/dynamic/threadpool/server/model/ConfigInfo.java
+++ b/server/src/main/java/io/dynamic/threadpool/server/model/ConfigInfo.java
@@ -13,5 +13,5 @@ public class ConfigInfo extends ConfigInfoBase {
private static final long serialVersionUID = -3504960832191834675L;
- private String tenant;
+ private String namespace;
}
diff --git a/server/src/main/java/io/dynamic/threadpool/server/service/ConfigService.java b/server/src/main/java/io/dynamic/threadpool/server/service/ConfigService.java
index 4c3f57a6..b50c450c 100644
--- a/server/src/main/java/io/dynamic/threadpool/server/service/ConfigService.java
+++ b/server/src/main/java/io/dynamic/threadpool/server/service/ConfigService.java
@@ -15,8 +15,8 @@ public interface ConfigService {
*
* @param tpId tpId
* @param itemId itemId
- * @param tenant tenant
+ * @param namespace namespace
* @return 全部配置信息
*/
- ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String tenant);
+ ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String namespace);
}
diff --git a/server/src/main/java/io/dynamic/threadpool/server/service/impl/ConfigServiceImpl.java b/server/src/main/java/io/dynamic/threadpool/server/service/impl/ConfigServiceImpl.java
index 317821ad..43ba553b 100644
--- a/server/src/main/java/io/dynamic/threadpool/server/service/impl/ConfigServiceImpl.java
+++ b/server/src/main/java/io/dynamic/threadpool/server/service/impl/ConfigServiceImpl.java
@@ -20,10 +20,10 @@ public class ConfigServiceImpl implements ConfigService {
private JdbcTemplate jdbcTemplate;
@Override
- public ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String tenant) {
+ public ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String namespace) {
ConfigAllInfo configAllInfo = jdbcTemplate.queryForObject(
- "select * from config_info where tp_id = ? and item_id = ? and tenant_id = ?",
- new Object[]{tpId, itemId, tenant},
+ "select * from config_info where tp_id = ? and item_id = ? and namespace = ?",
+ new Object[]{tpId, itemId, namespace},
RowMapperManager.CONFIG_ALL_INFO_ROW_MAPPER);
return configAllInfo;