From 41383a4efd6c1181361b8a67f70b79e77fad27aa Mon Sep 17 00:00:00 2001
From: Redick01 <11090829@qq.com>
Date: Tue, 15 Mar 2022 20:53:29 +0800
Subject: [PATCH] =?UTF-8?q?1.=E6=94=AF=E6=8C=81zookeeper=E6=B3=A8=E5=86=8C?=
=?UTF-8?q?=E4=B8=AD=E5=BF=83=202.zookeeper=E9=85=8D=E7=BD=AE=E4=B8=AD?=
=?UTF-8?q?=E5=BF=83=E5=8F=AA=E6=94=AF=E6=8C=81properties=E7=B1=BB?=
=?UTF-8?q?=E5=9E=8B=E9=85=8D=E7=BD=AE=203.=E5=A2=9E=E5=8A=A0zookeeper?=
=?UTF-8?q?=E9=85=8D=E7=BD=AE=E4=B8=AD=E5=BF=83=E7=9A=84=E4=BD=BF=E7=94=A8?=
=?UTF-8?q?example=204.=E6=8F=90=E4=BE=9Bzookeeper=E9=85=8D=E7=BD=AE?=
=?UTF-8?q?=E4=B8=AD=E5=BF=83=E9=85=8D=E7=BD=AE=E6=A0=B7=E4=BE=8B=EF=BC=8C?=
=?UTF-8?q?=E5=8F=82=E8=80=83zookeeper=20example=E4=B8=AD=E7=9A=84zookeepe?=
=?UTF-8?q?r-demo.properties?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
...ippo4jCoreZookeeperExampleApplication.java | 18 +++
.../src/main/resources/application.yml | 19 +++
.../main/resources/zookeeper-demo.properties | 40 +++++++
hippo4j-example/pom.xml | 1 +
.../hippo4j-core-spring-boot-starter/pom.xml | 21 ++++
.../config/BootstrapCoreProperties.java | 5 +
...ynamicThreadPoolCoreAutoConfiguration.java | 9 ++
.../notify/CoreNotifyConfigBuilder.java | 7 +-
.../AbstractCoreThreadPoolDynamicRefresh.java | 20 ++++
.../refresher/ZookeeperRefresherHandler.java | 95 +++++++++++++++
.../DynamicThreadPoolPostProcessor.java | 111 +++++++++---------
11 files changed, 288 insertions(+), 58 deletions(-)
create mode 100644 hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/java/cn/hippo4j/example/core/zookeeper/Hippo4jCoreZookeeperExampleApplication.java
create mode 100644 hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/resources/application.yml
create mode 100644 hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/resources/zookeeper-demo.properties
create mode 100644 hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/ZookeeperRefresherHandler.java
diff --git a/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/java/cn/hippo4j/example/core/zookeeper/Hippo4jCoreZookeeperExampleApplication.java b/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/java/cn/hippo4j/example/core/zookeeper/Hippo4jCoreZookeeperExampleApplication.java
new file mode 100644
index 00000000..a1c7561b
--- /dev/null
+++ b/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/java/cn/hippo4j/example/core/zookeeper/Hippo4jCoreZookeeperExampleApplication.java
@@ -0,0 +1,18 @@
+package cn.hippo4j.example.core.zookeeper;
+
+import cn.hippo4j.core.enable.EnableDynamicThreadPool;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+/**
+ * @author Redick01
+ * @date 2022/3/14 20:40
+ */
+@EnableDynamicThreadPool
+@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.core")
+public class Hippo4jCoreZookeeperExampleApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(Hippo4jCoreZookeeperExampleApplication.class, args);
+ }
+}
diff --git a/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/resources/application.yml b/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/resources/application.yml
new file mode 100644
index 00000000..25c826ed
--- /dev/null
+++ b/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/resources/application.yml
@@ -0,0 +1,19 @@
+server:
+ port: 8888
+
+spring:
+ application:
+ name: dynamic-threadpool-zookeeper-example
+
+ dynamic:
+ thread-pool:
+ config-file-type: properties
+ zookeeper:
+ zk-connect-str: 127.0.0.1:2181
+ config-version: 1.0.0
+ root-node: /configserver/userproject
+ node: zookeeper-demo
+
+#logging:
+# level:
+# root: DEBUG
\ No newline at end of file
diff --git a/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/resources/zookeeper-demo.properties b/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/resources/zookeeper-demo.properties
new file mode 100644
index 00000000..4fb1c7f8
--- /dev/null
+++ b/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/resources/zookeeper-demo.properties
@@ -0,0 +1,40 @@
+# Export from zookeeper configuration group: [/configserver/userproject] - [1.0.0] - [zookeeper-demo].
+
+spring.application.name=dynamic-threadpool-zookeeper-example
+spring.dynamic.thread-pool.banner=true
+spring.dynamic.thread-pool.check-state-interval=5
+spring.dynamic.thread-pool.collect=true
+spring.dynamic.thread-pool.config-file-type=properties
+spring.dynamic.thread-pool.enable=true
+spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true
+spring.dynamic.thread-pool.executors[0].blocking-queue=LinkedBlockingQueue
+spring.dynamic.thread-pool.executors[0].core-pool-size=2
+spring.dynamic.thread-pool.executors[0].execute-time-out=1000
+spring.dynamic.thread-pool.executors[0].keep-alive-time=6691
+spring.dynamic.thread-pool.executors[0].maximum-pool-size=4
+spring.dynamic.thread-pool.executors[0].notify.active-alarm=80
+spring.dynamic.thread-pool.executors[0].notify.capacity-alarm=80
+spring.dynamic.thread-pool.executors[0].notify.interval=8
+spring.dynamic.thread-pool.executors[0].notify.is-alarm=true
+spring.dynamic.thread-pool.executors[0].notify.receives.DING=177****6993
+spring.dynamic.thread-pool.executors[0].queue-capacity=100
+spring.dynamic.thread-pool.executors[0].rejected-handler=AbortPolicy
+spring.dynamic.thread-pool.executors[0].thread-name-prefix=message-consume
+spring.dynamic.thread-pool.executors[0].thread-pool-id=message-consume
+spring.dynamic.thread-pool.executors[1].allow-core-thread-time-out=true
+spring.dynamic.thread-pool.executors[1].blocking-queue=LinkedBlockingQueue
+spring.dynamic.thread-pool.executors[1].core-pool-size=1
+spring.dynamic.thread-pool.executors[1].execute-time-out=1000
+spring.dynamic.thread-pool.executors[1].keep-alive-time=6691
+spring.dynamic.thread-pool.executors[1].maximum-pool-size=1
+spring.dynamic.thread-pool.executors[1].notify.active-alarm=80
+spring.dynamic.thread-pool.executors[1].notify.capacity-alarm=80
+spring.dynamic.thread-pool.executors[1].notify.interval=8
+spring.dynamic.thread-pool.executors[1].notify.is-alarm=true
+spring.dynamic.thread-pool.executors[1].notify.receives.DING=177****6993
+spring.dynamic.thread-pool.executors[1].queue-capacity=1
+spring.dynamic.thread-pool.executors[1].rejected-handler=AbortPolicy
+spring.dynamic.thread-pool.executors[1].thread-name-prefix=message-produce
+spring.dynamic.thread-pool.executors[1].thread-pool-id=message-produce
+spring.dynamic.thread-pool.notify-platforms[0].platform=DING
+spring.dynamic.thread-pool.notify-platforms[0].secret-key=aab197577f6d8cc3aa8b52ee38adb6e16a46642a9c4986f5e45ca6946fdcea6f
\ No newline at end of file
diff --git a/hippo4j-example/pom.xml b/hippo4j-example/pom.xml
index b2219b54..f21be879 100644
--- a/hippo4j-example/pom.xml
+++ b/hippo4j-example/pom.xml
@@ -28,6 +28,7 @@
hippo4j-core-nacos-spring-boot-starter-example
hippo4j-core-apollo-spring-boot-starter-example
+ hippo4j-core-zookeeper-spring-boot-starter-example
diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/pom.xml b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/pom.xml
index 8cff4fab..ce97adcd 100644
--- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/pom.xml
+++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/pom.xml
@@ -65,6 +65,27 @@
true
+
+ org.apache.curator
+ curator-framework
+ 5.1.0
+ compile
+ true
+
+
+
+ com.guicedee.services
+ commons-lang3
+ 1.2.1.1-jre17
+ compile
+
+
+
+ com.google.guava
+ guava
+ 16.0.1
+
+
org.springframework.boot
spring-boot-configuration-processor
diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/BootstrapCoreProperties.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/BootstrapCoreProperties.java
index 982503a3..39db69f4 100644
--- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/BootstrapCoreProperties.java
+++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/BootstrapCoreProperties.java
@@ -57,6 +57,11 @@ public class BootstrapCoreProperties implements BootstrapPropertiesInterface {
*/
private Map apollo;
+ /**
+ * Zookeeper config.
+ */
+ private Map zookeeper;
+
/**
* Tomcat thread pool config.
*/
diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/DynamicThreadPoolCoreAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/DynamicThreadPoolCoreAutoConfiguration.java
index ab563864..d0bc6d23 100644
--- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/DynamicThreadPoolCoreAutoConfiguration.java
+++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/DynamicThreadPoolCoreAutoConfiguration.java
@@ -17,6 +17,7 @@ import cn.hippo4j.core.starter.notify.CoreNotifyConfigBuilder;
import cn.hippo4j.core.starter.refresher.ApolloRefresherHandler;
import cn.hippo4j.core.starter.refresher.NacosCloudRefresherHandler;
import cn.hippo4j.core.starter.refresher.NacosRefresherHandler;
+import cn.hippo4j.core.starter.refresher.ZookeeperRefresherHandler;
import cn.hippo4j.core.starter.support.DynamicThreadPoolPostProcessor;
import lombok.AllArgsConstructor;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
@@ -52,6 +53,8 @@ public class DynamicThreadPoolCoreAutoConfiguration {
private static final String APOLLO_CONFIG_KEY = "com.ctrip.framework.apollo.ConfigService";
+ private static final String ZK_CONFIG_KEY = "org.apache.curator.framework.CuratorFramework";
+
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public ApplicationContextHolder hippo4JApplicationContextHolder() {
@@ -121,4 +124,10 @@ public class DynamicThreadPoolCoreAutoConfiguration {
return new ApolloRefresherHandler(threadPoolNotifyAlarmHandler, bootstrapCoreProperties);
}
+ @Bean
+ @ConditionalOnClass(name = ZK_CONFIG_KEY)
+ public ZookeeperRefresherHandler zookeeperRefresher(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler,
+ BootstrapCoreProperties bootstrapCoreProperties) {
+ return new ZookeeperRefresherHandler(threadPoolNotifyAlarmHandler, bootstrapCoreProperties);
+ }
}
diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/notify/CoreNotifyConfigBuilder.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/notify/CoreNotifyConfigBuilder.java
index 7fec509e..e3859775 100644
--- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/notify/CoreNotifyConfigBuilder.java
+++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/notify/CoreNotifyConfigBuilder.java
@@ -32,10 +32,11 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder {
Map> resultMap = Maps.newHashMap();
List executors = bootstrapCoreProperties.getExecutors();
- for (ExecutorProperties executor : executors) {
- resultMap.putAll(buildSingleNotifyConfig(executor));
+ if (null !=executors) {
+ for (ExecutorProperties executor : executors) {
+ resultMap.putAll(buildSingleNotifyConfig(executor));
+ }
}
-
return resultMap;
}
diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/AbstractCoreThreadPoolDynamicRefresh.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/AbstractCoreThreadPoolDynamicRefresh.java
index 0569e9e2..17192cfd 100644
--- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/AbstractCoreThreadPoolDynamicRefresh.java
+++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/AbstractCoreThreadPoolDynamicRefresh.java
@@ -6,12 +6,14 @@ import cn.hippo4j.common.model.PoolParameter;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.notify.HippoBaseSendMessageService;
import cn.hippo4j.common.notify.NotifyConfigDTO;
+import cn.hippo4j.common.notify.ThreadPoolNotifyAlarm;
import cn.hippo4j.common.notify.request.ChangeParameterNotifyRequest;
import cn.hippo4j.common.web.executor.WebThreadPoolHandlerChoose;
import cn.hippo4j.common.web.executor.WebThreadPoolService;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
+import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.*;
import cn.hippo4j.core.proxy.RejectedProxyUtil;
@@ -75,6 +77,24 @@ public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPool
refreshExecutors(bindableCoreProperties);
}
+ /**
+ * register notify alarm manage
+ */
+ public void registerNotifyAlarmManage() {
+ bootstrapCoreProperties.getExecutors().forEach(executorProperties -> {
+ ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(
+ executorProperties.getNotify().getIsAlarm(),
+ executorProperties.getNotify().getCapacityAlarm(),
+ executorProperties.getNotify().getActiveAlarm()
+ );
+
+ threadPoolNotifyAlarm.setInterval(executorProperties.getNotify().getInterval());
+ threadPoolNotifyAlarm.setReceives(executorProperties.getNotify().getReceives());
+ GlobalNotifyAlarmManage.put(executorProperties.getThreadPoolId(), threadPoolNotifyAlarm);
+ });
+
+ }
+
/**
* Refresh web executor.
*
diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/ZookeeperRefresherHandler.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/ZookeeperRefresherHandler.java
new file mode 100644
index 00000000..5fd336b5
--- /dev/null
+++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/ZookeeperRefresherHandler.java
@@ -0,0 +1,95 @@
+package cn.hippo4j.core.starter.refresher;
+
+import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
+import cn.hippo4j.core.starter.config.BootstrapCoreProperties;
+import com.google.common.base.Charsets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.api.GetChildrenBuilder;
+import org.apache.curator.framework.api.GetDataBuilder;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.WatchedEvent;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Redick01
+ * @date 2022/3/14 16:03
+ */
+@Slf4j
+public class ZookeeperRefresherHandler extends AbstractCoreThreadPoolDynamicRefresh {
+
+ private CuratorFramework curatorFramework;
+
+ public ZookeeperRefresherHandler(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler, BootstrapCoreProperties bootstrapCoreProperties) {
+ super(threadPoolNotifyAlarmHandler, bootstrapCoreProperties);
+ }
+
+ @Override
+ public void afterPropertiesSet() {
+ Map zkConfigs = bootstrapCoreProperties.getZookeeper();
+ curatorFramework = CuratorFrameworkFactory.newClient(zkConfigs.get("zk-connect-str"),
+ new ExponentialBackoffRetry(1000, 3));
+ String nodePath = ZKPaths.makePath(ZKPaths.makePath(zkConfigs.get("root-node"),
+ zkConfigs.get("config-version")), zkConfigs.get("node"));
+ final ConnectionStateListener connectionStateListener = (client, newState) -> {
+ if (newState == ConnectionState.CONNECTED) {
+ loadNode(nodePath);
+ } else if (newState == ConnectionState.RECONNECTED) {
+ loadNode(nodePath);
+ }};
+
+ final CuratorListener curatorListener = (client, curatorEvent) -> {
+ final WatchedEvent watchedEvent = curatorEvent.getWatchedEvent();
+ if (null != watchedEvent) {
+ switch (watchedEvent.getType()) {
+ case NodeChildrenChanged:
+ case NodeDataChanged:
+ loadNode(nodePath);
+ break;
+ default:
+ break;
+ }
+ }};
+ curatorFramework.getConnectionStateListenable().addListener(connectionStateListener);
+ curatorFramework.getCuratorListenable().addListener(curatorListener);
+ curatorFramework.start();
+ }
+
+ /**
+ * load config info and refresh.
+ * @param nodePath zk config node path.
+ */
+ public void loadNode(String nodePath) {
+ try {
+ final GetChildrenBuilder childrenBuilder = curatorFramework.getChildren();
+ final List children = childrenBuilder.watched().forPath(nodePath);
+ StringBuilder content = new StringBuilder();
+ children.forEach(c -> {
+ String n = ZKPaths.makePath(nodePath, c);
+ final String nodeName = ZKPaths.getNodeFromPath(n);
+ final GetDataBuilder data = curatorFramework.getData();
+ String value = "";
+ try {
+ value = new String(data.watched().forPath(n), Charsets.UTF_8);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ final Pair keyValue = new ImmutablePair<>(nodeName, value);
+ content.append(keyValue.getKey()).append("=").append(keyValue.getValue()).append("\n");
+ });
+ dynamicRefresh(content.toString());
+ registerNotifyAlarmManage();
+ } catch (Exception e) {
+ log.error("load zk node error, nodePath is {}", nodePath, e);
+ }
+ }
+}
diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/support/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/support/DynamicThreadPoolPostProcessor.java
index f66ffacc..19eda117 100644
--- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/support/DynamicThreadPoolPostProcessor.java
+++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/support/DynamicThreadPoolPostProcessor.java
@@ -90,67 +90,68 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
protected ThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrapper dynamicThreadPoolWrap) {
String threadPoolId = dynamicThreadPoolWrap.getTpId();
ThreadPoolExecutor newDynamicPoolExecutor = dynamicThreadPoolWrap.getExecutor();
+ ExecutorProperties executorProperties = null;
+ if (null != bootstrapCoreProperties.getExecutors()) {
+ executorProperties = bootstrapCoreProperties.getExecutors()
+ .stream()
+ .filter(each -> Objects.equals(threadPoolId, each.getThreadPoolId()))
+ .findFirst()
+ .orElse(null);
+ if (executorProperties != null) {
+ try {
+ // 使用相关参数创建线程池
+ BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(executorProperties.getBlockingQueue(), executorProperties.getQueueCapacity());
+ String threadNamePrefix = executorProperties.getThreadNamePrefix();
+ newDynamicPoolExecutor = ThreadPoolBuilder.builder()
+ .dynamicPool()
+ .workQueue(workQueue)
+ .threadFactory(StringUtil.isNotBlank(threadNamePrefix) ? threadNamePrefix : threadPoolId)
+ .executeTimeOut(Optional.ofNullable(executorProperties.getExecuteTimeOut()).orElse(0L))
+ .poolThreadSize(executorProperties.getCorePoolSize(), executorProperties.getMaximumPoolSize())
+ .keepAliveTime(executorProperties.getKeepAliveTime(), TimeUnit.SECONDS)
+ .rejected(RejectedTypeEnum.createPolicy(executorProperties.getRejectedHandler()))
+ .allowCoreThreadTimeOut(executorProperties.getAllowCoreThreadTimeOut())
+ .build();
+ } catch (Exception ex) {
+ log.error("Failed to initialize thread pool configuration. error :: {}", ex);
+ } finally {
+ if (Objects.isNull(dynamicThreadPoolWrap.getExecutor())) {
+ dynamicThreadPoolWrap.setExecutor(CommonDynamicThreadPool.getInstance(threadPoolId));
+ }
- ExecutorProperties executorProperties = bootstrapCoreProperties.getExecutors()
- .stream()
- .filter(each -> Objects.equals(threadPoolId, each.getThreadPoolId()))
- .findFirst()
- .orElse(null);
- if (executorProperties != null) {
- try {
- // 使用相关参数创建线程池
- BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(executorProperties.getBlockingQueue(), executorProperties.getQueueCapacity());
- String threadNamePrefix = executorProperties.getThreadNamePrefix();
- newDynamicPoolExecutor = ThreadPoolBuilder.builder()
- .dynamicPool()
- .workQueue(workQueue)
- .threadFactory(StringUtil.isNotBlank(threadNamePrefix) ? threadNamePrefix : threadPoolId)
- .executeTimeOut(Optional.ofNullable(executorProperties.getExecuteTimeOut()).orElse(0L))
- .poolThreadSize(executorProperties.getCorePoolSize(), executorProperties.getMaximumPoolSize())
- .keepAliveTime(executorProperties.getKeepAliveTime(), TimeUnit.SECONDS)
- .rejected(RejectedTypeEnum.createPolicy(executorProperties.getRejectedHandler()))
- .allowCoreThreadTimeOut(executorProperties.getAllowCoreThreadTimeOut())
- .build();
- } catch (Exception ex) {
- log.error("Failed to initialize thread pool configuration. error :: {}", ex);
- } finally {
- if (Objects.isNull(dynamicThreadPoolWrap.getExecutor())) {
- dynamicThreadPoolWrap.setExecutor(CommonDynamicThreadPool.getInstance(threadPoolId));
+ dynamicThreadPoolWrap.setInitFlag(Boolean.TRUE);
}
+ }
- dynamicThreadPoolWrap.setInitFlag(Boolean.TRUE);
+ // 设置动态线程池增强参数
+ ThreadPoolNotifyAlarm notify = Optional.ofNullable(executorProperties)
+ .map(each -> each.getNotify())
+ .orElseGet(() -> {
+ ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(true, 80, 80);
+ threadPoolNotifyAlarm.setInterval(2);
+ return threadPoolNotifyAlarm;
+ });
+ if (dynamicThreadPoolWrap.getExecutor() instanceof AbstractDynamicExecutorSupport) {
+ ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(
+ notify.getIsAlarm(),
+ notify.getCapacityAlarm(),
+ notify.getActiveAlarm()
+ );
+
+ threadPoolNotifyAlarm.setInterval(notify.getInterval());
+ threadPoolNotifyAlarm.setReceives(notify.getReceives());
+ GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm);
+
+ TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getTaskDecorator();
+ ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator);
+
+ long awaitTerminationMillis = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).awaitTerminationMillis;
+ boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).waitForTasksToCompleteOnShutdown;
+ ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown);
}
- }
- // 设置动态线程池增强参数
- ThreadPoolNotifyAlarm notify = Optional.ofNullable(executorProperties)
- .map(each -> each.getNotify())
- .orElseGet(() -> {
- ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(true, 80, 80);
- threadPoolNotifyAlarm.setInterval(2);
- return threadPoolNotifyAlarm;
- });
- if (dynamicThreadPoolWrap.getExecutor() instanceof AbstractDynamicExecutorSupport) {
- ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(
- notify.getIsAlarm(),
- notify.getCapacityAlarm(),
- notify.getActiveAlarm()
- );
-
- threadPoolNotifyAlarm.setInterval(notify.getInterval());
- threadPoolNotifyAlarm.setReceives(notify.getReceives());
- GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm);
-
- TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getTaskDecorator();
- ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator);
-
- long awaitTerminationMillis = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).awaitTerminationMillis;
- boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).waitForTasksToCompleteOnShutdown;
- ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown);
+ dynamicThreadPoolWrap.setExecutor(newDynamicPoolExecutor);
}
-
- dynamicThreadPoolWrap.setExecutor(newDynamicPoolExecutor);
-
GlobalThreadPoolManage.registerPool(dynamicThreadPoolWrap.getTpId(), dynamicThreadPoolWrap);
GlobalCoreThreadPoolManage.register(
threadPoolId,