From 41383a4efd6c1181361b8a67f70b79e77fad27aa Mon Sep 17 00:00:00 2001 From: Redick01 <11090829@qq.com> Date: Tue, 15 Mar 2022 20:53:29 +0800 Subject: [PATCH] =?UTF-8?q?1.=E6=94=AF=E6=8C=81zookeeper=E6=B3=A8=E5=86=8C?= =?UTF-8?q?=E4=B8=AD=E5=BF=83=202.zookeeper=E9=85=8D=E7=BD=AE=E4=B8=AD?= =?UTF-8?q?=E5=BF=83=E5=8F=AA=E6=94=AF=E6=8C=81properties=E7=B1=BB?= =?UTF-8?q?=E5=9E=8B=E9=85=8D=E7=BD=AE=203.=E5=A2=9E=E5=8A=A0zookeeper?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E4=B8=AD=E5=BF=83=E7=9A=84=E4=BD=BF=E7=94=A8?= =?UTF-8?q?example=204.=E6=8F=90=E4=BE=9Bzookeeper=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E4=B8=AD=E5=BF=83=E9=85=8D=E7=BD=AE=E6=A0=B7=E4=BE=8B=EF=BC=8C?= =?UTF-8?q?=E5=8F=82=E8=80=83zookeeper=20example=E4=B8=AD=E7=9A=84zookeepe?= =?UTF-8?q?r-demo.properties?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...ippo4jCoreZookeeperExampleApplication.java | 18 +++ .../src/main/resources/application.yml | 19 +++ .../main/resources/zookeeper-demo.properties | 40 +++++++ hippo4j-example/pom.xml | 1 + .../hippo4j-core-spring-boot-starter/pom.xml | 21 ++++ .../config/BootstrapCoreProperties.java | 5 + ...ynamicThreadPoolCoreAutoConfiguration.java | 9 ++ .../notify/CoreNotifyConfigBuilder.java | 7 +- .../AbstractCoreThreadPoolDynamicRefresh.java | 20 ++++ .../refresher/ZookeeperRefresherHandler.java | 95 +++++++++++++++ .../DynamicThreadPoolPostProcessor.java | 111 +++++++++--------- 11 files changed, 288 insertions(+), 58 deletions(-) create mode 100644 hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/java/cn/hippo4j/example/core/zookeeper/Hippo4jCoreZookeeperExampleApplication.java create mode 100644 hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/resources/application.yml create mode 100644 hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/resources/zookeeper-demo.properties create mode 100644 hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/ZookeeperRefresherHandler.java diff --git a/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/java/cn/hippo4j/example/core/zookeeper/Hippo4jCoreZookeeperExampleApplication.java b/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/java/cn/hippo4j/example/core/zookeeper/Hippo4jCoreZookeeperExampleApplication.java new file mode 100644 index 00000000..a1c7561b --- /dev/null +++ b/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/java/cn/hippo4j/example/core/zookeeper/Hippo4jCoreZookeeperExampleApplication.java @@ -0,0 +1,18 @@ +package cn.hippo4j.example.core.zookeeper; + +import cn.hippo4j.core.enable.EnableDynamicThreadPool; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * @author Redick01 + * @date 2022/3/14 20:40 + */ +@EnableDynamicThreadPool +@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.core") +public class Hippo4jCoreZookeeperExampleApplication { + + public static void main(String[] args) { + SpringApplication.run(Hippo4jCoreZookeeperExampleApplication.class, args); + } +} diff --git a/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/resources/application.yml b/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/resources/application.yml new file mode 100644 index 00000000..25c826ed --- /dev/null +++ b/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/resources/application.yml @@ -0,0 +1,19 @@ +server: + port: 8888 + +spring: + application: + name: dynamic-threadpool-zookeeper-example + + dynamic: + thread-pool: + config-file-type: properties + zookeeper: + zk-connect-str: 127.0.0.1:2181 + config-version: 1.0.0 + root-node: /configserver/userproject + node: zookeeper-demo + +#logging: +# level: +# root: DEBUG \ No newline at end of file diff --git a/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/resources/zookeeper-demo.properties b/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/resources/zookeeper-demo.properties new file mode 100644 index 00000000..4fb1c7f8 --- /dev/null +++ b/hippo4j-example/hippo4j-core-zookeeper-spring-boot-starter-example/src/main/resources/zookeeper-demo.properties @@ -0,0 +1,40 @@ +# Export from zookeeper configuration group: [/configserver/userproject] - [1.0.0] - [zookeeper-demo]. + +spring.application.name=dynamic-threadpool-zookeeper-example +spring.dynamic.thread-pool.banner=true +spring.dynamic.thread-pool.check-state-interval=5 +spring.dynamic.thread-pool.collect=true +spring.dynamic.thread-pool.config-file-type=properties +spring.dynamic.thread-pool.enable=true +spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true +spring.dynamic.thread-pool.executors[0].blocking-queue=LinkedBlockingQueue +spring.dynamic.thread-pool.executors[0].core-pool-size=2 +spring.dynamic.thread-pool.executors[0].execute-time-out=1000 +spring.dynamic.thread-pool.executors[0].keep-alive-time=6691 +spring.dynamic.thread-pool.executors[0].maximum-pool-size=4 +spring.dynamic.thread-pool.executors[0].notify.active-alarm=80 +spring.dynamic.thread-pool.executors[0].notify.capacity-alarm=80 +spring.dynamic.thread-pool.executors[0].notify.interval=8 +spring.dynamic.thread-pool.executors[0].notify.is-alarm=true +spring.dynamic.thread-pool.executors[0].notify.receives.DING=177****6993 +spring.dynamic.thread-pool.executors[0].queue-capacity=100 +spring.dynamic.thread-pool.executors[0].rejected-handler=AbortPolicy +spring.dynamic.thread-pool.executors[0].thread-name-prefix=message-consume +spring.dynamic.thread-pool.executors[0].thread-pool-id=message-consume +spring.dynamic.thread-pool.executors[1].allow-core-thread-time-out=true +spring.dynamic.thread-pool.executors[1].blocking-queue=LinkedBlockingQueue +spring.dynamic.thread-pool.executors[1].core-pool-size=1 +spring.dynamic.thread-pool.executors[1].execute-time-out=1000 +spring.dynamic.thread-pool.executors[1].keep-alive-time=6691 +spring.dynamic.thread-pool.executors[1].maximum-pool-size=1 +spring.dynamic.thread-pool.executors[1].notify.active-alarm=80 +spring.dynamic.thread-pool.executors[1].notify.capacity-alarm=80 +spring.dynamic.thread-pool.executors[1].notify.interval=8 +spring.dynamic.thread-pool.executors[1].notify.is-alarm=true +spring.dynamic.thread-pool.executors[1].notify.receives.DING=177****6993 +spring.dynamic.thread-pool.executors[1].queue-capacity=1 +spring.dynamic.thread-pool.executors[1].rejected-handler=AbortPolicy +spring.dynamic.thread-pool.executors[1].thread-name-prefix=message-produce +spring.dynamic.thread-pool.executors[1].thread-pool-id=message-produce +spring.dynamic.thread-pool.notify-platforms[0].platform=DING +spring.dynamic.thread-pool.notify-platforms[0].secret-key=aab197577f6d8cc3aa8b52ee38adb6e16a46642a9c4986f5e45ca6946fdcea6f \ No newline at end of file diff --git a/hippo4j-example/pom.xml b/hippo4j-example/pom.xml index b2219b54..f21be879 100644 --- a/hippo4j-example/pom.xml +++ b/hippo4j-example/pom.xml @@ -28,6 +28,7 @@ hippo4j-core-nacos-spring-boot-starter-example hippo4j-core-apollo-spring-boot-starter-example + hippo4j-core-zookeeper-spring-boot-starter-example diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/pom.xml b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/pom.xml index 8cff4fab..ce97adcd 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/pom.xml +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/pom.xml @@ -65,6 +65,27 @@ true + + org.apache.curator + curator-framework + 5.1.0 + compile + true + + + + com.guicedee.services + commons-lang3 + 1.2.1.1-jre17 + compile + + + + com.google.guava + guava + 16.0.1 + + org.springframework.boot spring-boot-configuration-processor diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/BootstrapCoreProperties.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/BootstrapCoreProperties.java index 982503a3..39db69f4 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/BootstrapCoreProperties.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/BootstrapCoreProperties.java @@ -57,6 +57,11 @@ public class BootstrapCoreProperties implements BootstrapPropertiesInterface { */ private Map apollo; + /** + * Zookeeper config. + */ + private Map zookeeper; + /** * Tomcat thread pool config. */ diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/DynamicThreadPoolCoreAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/DynamicThreadPoolCoreAutoConfiguration.java index ab563864..d0bc6d23 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/DynamicThreadPoolCoreAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/config/DynamicThreadPoolCoreAutoConfiguration.java @@ -17,6 +17,7 @@ import cn.hippo4j.core.starter.notify.CoreNotifyConfigBuilder; import cn.hippo4j.core.starter.refresher.ApolloRefresherHandler; import cn.hippo4j.core.starter.refresher.NacosCloudRefresherHandler; import cn.hippo4j.core.starter.refresher.NacosRefresherHandler; +import cn.hippo4j.core.starter.refresher.ZookeeperRefresherHandler; import cn.hippo4j.core.starter.support.DynamicThreadPoolPostProcessor; import lombok.AllArgsConstructor; import org.springframework.boot.autoconfigure.ImportAutoConfiguration; @@ -52,6 +53,8 @@ public class DynamicThreadPoolCoreAutoConfiguration { private static final String APOLLO_CONFIG_KEY = "com.ctrip.framework.apollo.ConfigService"; + private static final String ZK_CONFIG_KEY = "org.apache.curator.framework.CuratorFramework"; + @Bean @Order(Ordered.HIGHEST_PRECEDENCE) public ApplicationContextHolder hippo4JApplicationContextHolder() { @@ -121,4 +124,10 @@ public class DynamicThreadPoolCoreAutoConfiguration { return new ApolloRefresherHandler(threadPoolNotifyAlarmHandler, bootstrapCoreProperties); } + @Bean + @ConditionalOnClass(name = ZK_CONFIG_KEY) + public ZookeeperRefresherHandler zookeeperRefresher(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler, + BootstrapCoreProperties bootstrapCoreProperties) { + return new ZookeeperRefresherHandler(threadPoolNotifyAlarmHandler, bootstrapCoreProperties); + } } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/notify/CoreNotifyConfigBuilder.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/notify/CoreNotifyConfigBuilder.java index 7fec509e..e3859775 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/notify/CoreNotifyConfigBuilder.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/notify/CoreNotifyConfigBuilder.java @@ -32,10 +32,11 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder { Map> resultMap = Maps.newHashMap(); List executors = bootstrapCoreProperties.getExecutors(); - for (ExecutorProperties executor : executors) { - resultMap.putAll(buildSingleNotifyConfig(executor)); + if (null !=executors) { + for (ExecutorProperties executor : executors) { + resultMap.putAll(buildSingleNotifyConfig(executor)); + } } - return resultMap; } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/AbstractCoreThreadPoolDynamicRefresh.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/AbstractCoreThreadPoolDynamicRefresh.java index 0569e9e2..17192cfd 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/AbstractCoreThreadPoolDynamicRefresh.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/AbstractCoreThreadPoolDynamicRefresh.java @@ -6,12 +6,14 @@ import cn.hippo4j.common.model.PoolParameter; import cn.hippo4j.common.model.PoolParameterInfo; import cn.hippo4j.common.notify.HippoBaseSendMessageService; import cn.hippo4j.common.notify.NotifyConfigDTO; +import cn.hippo4j.common.notify.ThreadPoolNotifyAlarm; import cn.hippo4j.common.notify.request.ChangeParameterNotifyRequest; import cn.hippo4j.common.web.executor.WebThreadPoolHandlerChoose; import cn.hippo4j.common.web.executor.WebThreadPoolService; import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; +import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.support.*; import cn.hippo4j.core.proxy.RejectedProxyUtil; @@ -75,6 +77,24 @@ public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPool refreshExecutors(bindableCoreProperties); } + /** + * register notify alarm manage + */ + public void registerNotifyAlarmManage() { + bootstrapCoreProperties.getExecutors().forEach(executorProperties -> { + ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm( + executorProperties.getNotify().getIsAlarm(), + executorProperties.getNotify().getCapacityAlarm(), + executorProperties.getNotify().getActiveAlarm() + ); + + threadPoolNotifyAlarm.setInterval(executorProperties.getNotify().getInterval()); + threadPoolNotifyAlarm.setReceives(executorProperties.getNotify().getReceives()); + GlobalNotifyAlarmManage.put(executorProperties.getThreadPoolId(), threadPoolNotifyAlarm); + }); + + } + /** * Refresh web executor. * diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/ZookeeperRefresherHandler.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/ZookeeperRefresherHandler.java new file mode 100644 index 00000000..5fd336b5 --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/ZookeeperRefresherHandler.java @@ -0,0 +1,95 @@ +package cn.hippo4j.core.starter.refresher; + +import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; +import cn.hippo4j.core.starter.config.BootstrapCoreProperties; +import com.google.common.base.Charsets; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.CuratorListener; +import org.apache.curator.framework.api.GetChildrenBuilder; +import org.apache.curator.framework.api.GetDataBuilder; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.WatchedEvent; + +import java.util.List; +import java.util.Map; + +/** + * @author Redick01 + * @date 2022/3/14 16:03 + */ +@Slf4j +public class ZookeeperRefresherHandler extends AbstractCoreThreadPoolDynamicRefresh { + + private CuratorFramework curatorFramework; + + public ZookeeperRefresherHandler(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler, BootstrapCoreProperties bootstrapCoreProperties) { + super(threadPoolNotifyAlarmHandler, bootstrapCoreProperties); + } + + @Override + public void afterPropertiesSet() { + Map zkConfigs = bootstrapCoreProperties.getZookeeper(); + curatorFramework = CuratorFrameworkFactory.newClient(zkConfigs.get("zk-connect-str"), + new ExponentialBackoffRetry(1000, 3)); + String nodePath = ZKPaths.makePath(ZKPaths.makePath(zkConfigs.get("root-node"), + zkConfigs.get("config-version")), zkConfigs.get("node")); + final ConnectionStateListener connectionStateListener = (client, newState) -> { + if (newState == ConnectionState.CONNECTED) { + loadNode(nodePath); + } else if (newState == ConnectionState.RECONNECTED) { + loadNode(nodePath); + }}; + + final CuratorListener curatorListener = (client, curatorEvent) -> { + final WatchedEvent watchedEvent = curatorEvent.getWatchedEvent(); + if (null != watchedEvent) { + switch (watchedEvent.getType()) { + case NodeChildrenChanged: + case NodeDataChanged: + loadNode(nodePath); + break; + default: + break; + } + }}; + curatorFramework.getConnectionStateListenable().addListener(connectionStateListener); + curatorFramework.getCuratorListenable().addListener(curatorListener); + curatorFramework.start(); + } + + /** + * load config info and refresh. + * @param nodePath zk config node path. + */ + public void loadNode(String nodePath) { + try { + final GetChildrenBuilder childrenBuilder = curatorFramework.getChildren(); + final List children = childrenBuilder.watched().forPath(nodePath); + StringBuilder content = new StringBuilder(); + children.forEach(c -> { + String n = ZKPaths.makePath(nodePath, c); + final String nodeName = ZKPaths.getNodeFromPath(n); + final GetDataBuilder data = curatorFramework.getData(); + String value = ""; + try { + value = new String(data.watched().forPath(n), Charsets.UTF_8); + } catch (Exception e) { + e.printStackTrace(); + } + final Pair keyValue = new ImmutablePair<>(nodeName, value); + content.append(keyValue.getKey()).append("=").append(keyValue.getValue()).append("\n"); + }); + dynamicRefresh(content.toString()); + registerNotifyAlarmManage(); + } catch (Exception e) { + log.error("load zk node error, nodePath is {}", nodePath, e); + } + } +} diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/support/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/support/DynamicThreadPoolPostProcessor.java index f66ffacc..19eda117 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/support/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/support/DynamicThreadPoolPostProcessor.java @@ -90,67 +90,68 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { protected ThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrapper dynamicThreadPoolWrap) { String threadPoolId = dynamicThreadPoolWrap.getTpId(); ThreadPoolExecutor newDynamicPoolExecutor = dynamicThreadPoolWrap.getExecutor(); + ExecutorProperties executorProperties = null; + if (null != bootstrapCoreProperties.getExecutors()) { + executorProperties = bootstrapCoreProperties.getExecutors() + .stream() + .filter(each -> Objects.equals(threadPoolId, each.getThreadPoolId())) + .findFirst() + .orElse(null); + if (executorProperties != null) { + try { + // 使用相关参数创建线程池 + BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(executorProperties.getBlockingQueue(), executorProperties.getQueueCapacity()); + String threadNamePrefix = executorProperties.getThreadNamePrefix(); + newDynamicPoolExecutor = ThreadPoolBuilder.builder() + .dynamicPool() + .workQueue(workQueue) + .threadFactory(StringUtil.isNotBlank(threadNamePrefix) ? threadNamePrefix : threadPoolId) + .executeTimeOut(Optional.ofNullable(executorProperties.getExecuteTimeOut()).orElse(0L)) + .poolThreadSize(executorProperties.getCorePoolSize(), executorProperties.getMaximumPoolSize()) + .keepAliveTime(executorProperties.getKeepAliveTime(), TimeUnit.SECONDS) + .rejected(RejectedTypeEnum.createPolicy(executorProperties.getRejectedHandler())) + .allowCoreThreadTimeOut(executorProperties.getAllowCoreThreadTimeOut()) + .build(); + } catch (Exception ex) { + log.error("Failed to initialize thread pool configuration. error :: {}", ex); + } finally { + if (Objects.isNull(dynamicThreadPoolWrap.getExecutor())) { + dynamicThreadPoolWrap.setExecutor(CommonDynamicThreadPool.getInstance(threadPoolId)); + } - ExecutorProperties executorProperties = bootstrapCoreProperties.getExecutors() - .stream() - .filter(each -> Objects.equals(threadPoolId, each.getThreadPoolId())) - .findFirst() - .orElse(null); - if (executorProperties != null) { - try { - // 使用相关参数创建线程池 - BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(executorProperties.getBlockingQueue(), executorProperties.getQueueCapacity()); - String threadNamePrefix = executorProperties.getThreadNamePrefix(); - newDynamicPoolExecutor = ThreadPoolBuilder.builder() - .dynamicPool() - .workQueue(workQueue) - .threadFactory(StringUtil.isNotBlank(threadNamePrefix) ? threadNamePrefix : threadPoolId) - .executeTimeOut(Optional.ofNullable(executorProperties.getExecuteTimeOut()).orElse(0L)) - .poolThreadSize(executorProperties.getCorePoolSize(), executorProperties.getMaximumPoolSize()) - .keepAliveTime(executorProperties.getKeepAliveTime(), TimeUnit.SECONDS) - .rejected(RejectedTypeEnum.createPolicy(executorProperties.getRejectedHandler())) - .allowCoreThreadTimeOut(executorProperties.getAllowCoreThreadTimeOut()) - .build(); - } catch (Exception ex) { - log.error("Failed to initialize thread pool configuration. error :: {}", ex); - } finally { - if (Objects.isNull(dynamicThreadPoolWrap.getExecutor())) { - dynamicThreadPoolWrap.setExecutor(CommonDynamicThreadPool.getInstance(threadPoolId)); + dynamicThreadPoolWrap.setInitFlag(Boolean.TRUE); } + } - dynamicThreadPoolWrap.setInitFlag(Boolean.TRUE); + // 设置动态线程池增强参数 + ThreadPoolNotifyAlarm notify = Optional.ofNullable(executorProperties) + .map(each -> each.getNotify()) + .orElseGet(() -> { + ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(true, 80, 80); + threadPoolNotifyAlarm.setInterval(2); + return threadPoolNotifyAlarm; + }); + if (dynamicThreadPoolWrap.getExecutor() instanceof AbstractDynamicExecutorSupport) { + ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm( + notify.getIsAlarm(), + notify.getCapacityAlarm(), + notify.getActiveAlarm() + ); + + threadPoolNotifyAlarm.setInterval(notify.getInterval()); + threadPoolNotifyAlarm.setReceives(notify.getReceives()); + GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm); + + TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getTaskDecorator(); + ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator); + + long awaitTerminationMillis = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).awaitTerminationMillis; + boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).waitForTasksToCompleteOnShutdown; + ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown); } - } - // 设置动态线程池增强参数 - ThreadPoolNotifyAlarm notify = Optional.ofNullable(executorProperties) - .map(each -> each.getNotify()) - .orElseGet(() -> { - ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(true, 80, 80); - threadPoolNotifyAlarm.setInterval(2); - return threadPoolNotifyAlarm; - }); - if (dynamicThreadPoolWrap.getExecutor() instanceof AbstractDynamicExecutorSupport) { - ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm( - notify.getIsAlarm(), - notify.getCapacityAlarm(), - notify.getActiveAlarm() - ); - - threadPoolNotifyAlarm.setInterval(notify.getInterval()); - threadPoolNotifyAlarm.setReceives(notify.getReceives()); - GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm); - - TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getTaskDecorator(); - ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator); - - long awaitTerminationMillis = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).awaitTerminationMillis; - boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).waitForTasksToCompleteOnShutdown; - ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown); + dynamicThreadPoolWrap.setExecutor(newDynamicPoolExecutor); } - - dynamicThreadPoolWrap.setExecutor(newDynamicPoolExecutor); - GlobalThreadPoolManage.registerPool(dynamicThreadPoolWrap.getTpId(), dynamicThreadPoolWrap); GlobalCoreThreadPoolManage.register( threadPoolId,