From d4018ee6b366a68f274e412a9ed27747f4e39f6a Mon Sep 17 00:00:00 2001 From: weihu Date: Fri, 13 May 2022 10:31:40 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96spring=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E8=A7=A3=E8=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../starter/event/ExecutorsListener.java | 202 +++++++++++++ .../starter/event/PlatformsListener.java | 64 +++++ .../event/ThreadPoolDynamicRefreshEvent.java | 39 +++ .../ThreadPoolDynamicRefreshEventOrder.java | 33 +++ .../starter/event/WebExecutorListener.java | 92 ++++++ .../AbstractCoreThreadPoolDynamicRefresh.java | 268 ++---------------- 6 files changed, 449 insertions(+), 249 deletions(-) create mode 100644 hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/event/ExecutorsListener.java create mode 100644 hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/event/PlatformsListener.java create mode 100644 hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/event/ThreadPoolDynamicRefreshEvent.java create mode 100644 hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/event/ThreadPoolDynamicRefreshEventOrder.java create mode 100644 hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/event/WebExecutorListener.java diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/event/ExecutorsListener.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/event/ExecutorsListener.java new file mode 100644 index 00000000..47bcee58 --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/event/ExecutorsListener.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.springboot.starter.event; + +import cn.hippo4j.common.notify.request.ChangeParameterNotifyRequest; +import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; +import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; +import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; +import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; +import cn.hippo4j.core.executor.support.QueueTypeEnum; +import cn.hippo4j.core.executor.support.RejectedTypeEnum; +import cn.hippo4j.core.executor.support.ResizableCapacityLinkedBlockIngQueue; +import cn.hippo4j.core.proxy.RejectedProxyUtil; +import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties; +import cn.hippo4j.core.springboot.starter.config.ExecutorProperties; +import cn.hippo4j.core.springboot.starter.support.GlobalCoreThreadPoolManage; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.ApplicationListener; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Objects; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; +import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT; +import static cn.hippo4j.core.springboot.starter.event.ThreadPoolDynamicRefreshEventOrder.EXECUTORS_LISTENER; + +/** + * @author : wh + * @date : 2022/5/13 10:06 + * @description: + */ +@Slf4j +@Component +@RequiredArgsConstructor +@Order(EXECUTORS_LISTENER) +public class ExecutorsListener implements ApplicationListener { + + private final ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler; + + @Override + public void onApplicationEvent(ThreadPoolDynamicRefreshEvent threadPoolDynamicRefreshEvent) { + BootstrapCoreProperties bindableCoreProperties = threadPoolDynamicRefreshEvent.getBootstrapCoreProperties(); + List executors = bindableCoreProperties.getExecutors(); + for (ExecutorProperties properties : executors) { + String threadPoolId = properties.getThreadPoolId(); + if (!checkConsistency(threadPoolId, properties)) { + continue; + } + // refresh executor pool + dynamicRefreshPool(threadPoolId, properties); + // old properties + ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId()); + // refresh executor properties + GlobalCoreThreadPoolManage.refresh(threadPoolId, properties); + log.info(CHANGE_THREAD_POOL_TEXT, + threadPoolId.toUpperCase(), + String.format(CHANGE_DELIMITER, beforeProperties.getCorePoolSize(), properties.getCorePoolSize()), + String.format(CHANGE_DELIMITER, beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()), + String.format(CHANGE_DELIMITER, beforeProperties.getBlockingQueue(), properties.getBlockingQueue()), + String.format(CHANGE_DELIMITER, beforeProperties.getQueueCapacity(), properties.getQueueCapacity()), + String.format(CHANGE_DELIMITER, beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()), + String.format(CHANGE_DELIMITER, beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()), + String.format(CHANGE_DELIMITER, beforeProperties.getRejectedHandler(), properties.getRejectedHandler()), + String.format(CHANGE_DELIMITER, beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())); + try { + threadPoolNotifyAlarmHandler.sendPoolConfigChange(newChangeRequest(beforeProperties, properties)); + } catch (Throwable ex) { + log.error("Failed to send changSmartApplicationListenere notice. Message :: {}", ex.getMessage()); + } + } + + } + + /** + * Construct ChangeParameterNotifyRequest instance + * + * @param beforeProperties old properties + * @param properties new properties + * @return instance + */ + private ChangeParameterNotifyRequest newChangeRequest(ExecutorProperties beforeProperties, ExecutorProperties properties) { + ChangeParameterNotifyRequest changeRequest = new ChangeParameterNotifyRequest(); + changeRequest.setBeforeCorePoolSize(beforeProperties.getCorePoolSize()); + changeRequest.setBeforeMaximumPoolSize(beforeProperties.getMaximumPoolSize()); + changeRequest.setBeforeAllowsCoreThreadTimeOut(beforeProperties.getAllowCoreThreadTimeOut()); + changeRequest.setBeforeKeepAliveTime(beforeProperties.getKeepAliveTime()); + changeRequest.setBlockingQueueName(beforeProperties.getBlockingQueue()); + changeRequest.setBeforeQueueCapacity(beforeProperties.getQueueCapacity()); + changeRequest.setBeforeRejectedName(beforeProperties.getRejectedHandler()); + changeRequest.setBeforeExecuteTimeOut(beforeProperties.getExecuteTimeOut()); + changeRequest.setThreadPoolId(beforeProperties.getThreadPoolId()); + changeRequest.setNowCorePoolSize(properties.getCorePoolSize()); + changeRequest.setNowMaximumPoolSize(properties.getMaximumPoolSize()); + changeRequest.setNowAllowsCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut()); + changeRequest.setNowKeepAliveTime(properties.getKeepAliveTime()); + changeRequest.setNowQueueCapacity(properties.getQueueCapacity()); + changeRequest.setNowRejectedName(properties.getRejectedHandler()); + changeRequest.setNowExecuteTimeOut(properties.getExecuteTimeOut()); + return changeRequest; + } + + /** + * Check consistency. + * + * @param threadPoolId + * @param properties + */ + private boolean checkConsistency(String threadPoolId, ExecutorProperties properties) { + ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId()); + ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor(); + boolean result = !Objects.equals(beforeProperties.getCorePoolSize(), properties.getCorePoolSize()) + || !Objects.equals(beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()) + || !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut()) + || !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()) + || !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()) + || !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler()) + || + (!Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity()) + && Objects.equals(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.name, executor.getQueue().getClass().getSimpleName())); + return result; + } + + /** + * Dynamic refresh pool. + * + * @param threadPoolId + * @param properties + */ + private void dynamicRefreshPool(String threadPoolId, ExecutorProperties properties) { + ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId()); + ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor(); + if (properties.getMaximumPoolSize() != null && properties.getCorePoolSize() != null) { + if (properties.getMaximumPoolSize() < executor.getMaximumPoolSize()) { + executor.setCorePoolSize(properties.getCorePoolSize()); + executor.setMaximumPoolSize(properties.getMaximumPoolSize()); + } else { + executor.setMaximumPoolSize(properties.getMaximumPoolSize()); + executor.setCorePoolSize(properties.getCorePoolSize()); + } + } else { + if (properties.getMaximumPoolSize() != null) { + executor.setMaximumPoolSize(properties.getMaximumPoolSize()); + } + if (properties.getCorePoolSize() != null) { + executor.setCorePoolSize(properties.getCorePoolSize()); + } + } + if (!Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())) { + executor.allowCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut()); + } + if (!Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut())) { + if (executor instanceof AbstractDynamicExecutorSupport) { + ((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(properties.getExecuteTimeOut()); + } + } + if (!Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) { + RejectedExecutionHandler rejectedExecutionHandler = RejectedTypeEnum.createPolicy(properties.getRejectedHandler()); + if (executor instanceof AbstractDynamicExecutorSupport) { + DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor; + dynamicExecutor.setRedundancyHandler(rejectedExecutionHandler); + AtomicLong rejectCount = dynamicExecutor.getRejectCount(); + rejectedExecutionHandler = RejectedProxyUtil.createProxy(rejectedExecutionHandler, threadPoolId, rejectCount); + } + executor.setRejectedExecutionHandler(rejectedExecutionHandler); + } + if (!Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) { + executor.setKeepAliveTime(properties.getKeepAliveTime(), TimeUnit.SECONDS); + } + if (!Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity()) + && Objects.equals(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.name, executor.getQueue().getClass().getSimpleName())) { + if (executor.getQueue() instanceof ResizableCapacityLinkedBlockIngQueue) { + ResizableCapacityLinkedBlockIngQueue queue = (ResizableCapacityLinkedBlockIngQueue) executor.getQueue(); + queue.setCapacity(properties.getQueueCapacity()); + } else { + log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type :: {}", executor.getQueue().getClass().getSimpleName()); + } + } + } + +} diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/event/PlatformsListener.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/event/PlatformsListener.java new file mode 100644 index 00000000..e1bdee84 --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/event/PlatformsListener.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.springboot.starter.event; + +import cn.hippo4j.common.config.ApplicationContextHolder; +import cn.hippo4j.common.notify.HippoBaseSendMessageService; +import cn.hippo4j.common.notify.NotifyConfigDTO; +import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; +import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; +import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties; +import cn.hippo4j.core.springboot.starter.config.ExecutorProperties; +import cn.hippo4j.core.springboot.starter.notify.CoreNotifyConfigBuilder; +import org.springframework.context.ApplicationListener; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; + +import static cn.hippo4j.core.springboot.starter.event.ThreadPoolDynamicRefreshEventOrder.PLATFORMS_LISTENER; + +/** + * @author : wh + * @date : 2022/5/13 10:03 + * @description: + */ +@Component +@Order(PLATFORMS_LISTENER) +public class PlatformsListener implements ApplicationListener { + + + @Override + public void onApplicationEvent(ThreadPoolDynamicRefreshEvent threadPoolDynamicRefreshEvent) { + BootstrapCoreProperties bindableCoreProperties = threadPoolDynamicRefreshEvent.getBootstrapCoreProperties(); + List executors = bindableCoreProperties.getExecutors(); + for (ExecutorProperties executor : executors) { + String threadPoolId = executor.getThreadPoolId(); + DynamicThreadPoolWrapper wrapper = GlobalThreadPoolManage.getExecutorService(threadPoolId); + if (!wrapper.isInitFlag()) { + HippoBaseSendMessageService sendMessageService = ApplicationContextHolder.getBean(HippoBaseSendMessageService.class); + CoreNotifyConfigBuilder configBuilder = ApplicationContextHolder.getBean(CoreNotifyConfigBuilder.class); + Map> notifyConfig = configBuilder.buildSingleNotifyConfig(executor); + sendMessageService.putPlatform(notifyConfig); + wrapper.setInitFlag(Boolean.TRUE); + } + } + + } +} diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/event/ThreadPoolDynamicRefreshEvent.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/event/ThreadPoolDynamicRefreshEvent.java new file mode 100644 index 00000000..539c3a77 --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/event/ThreadPoolDynamicRefreshEvent.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.springboot.starter.event; + +import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties; +import lombok.Getter; +import lombok.Setter; +import org.springframework.context.ApplicationEvent; + +/** + * @author : wh + * @date : 2022/5/13 09:49 + * @description: + */ +public class ThreadPoolDynamicRefreshEvent extends ApplicationEvent { + + @Getter + @Setter + private BootstrapCoreProperties bootstrapCoreProperties; + + public ThreadPoolDynamicRefreshEvent(Object source) { + super(source); + } +} diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/event/ThreadPoolDynamicRefreshEventOrder.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/event/ThreadPoolDynamicRefreshEventOrder.java new file mode 100644 index 00000000..a6456ba4 --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/event/ThreadPoolDynamicRefreshEventOrder.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.springboot.starter.event; + +/** + * @author : wh + * @date : 2022/5/13 10:25 + * @description: + */ +public interface ThreadPoolDynamicRefreshEventOrder { + + Integer WEB_EXECUTOR_LISTENER = 0; + + Integer PLATFORMS_LISTENER = 1; + + Integer EXECUTORS_LISTENER = 2; + +} diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/event/WebExecutorListener.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/event/WebExecutorListener.java new file mode 100644 index 00000000..8ea75760 --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/event/WebExecutorListener.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.springboot.starter.event; + +import cn.hippo4j.common.config.ApplicationContextHolder; +import cn.hippo4j.common.model.ThreadPoolParameter; +import cn.hippo4j.common.model.ThreadPoolParameterInfo; +import cn.hippo4j.core.executor.web.WebThreadPoolHandlerChoose; +import cn.hippo4j.core.executor.web.WebThreadPoolService; +import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties; +import cn.hippo4j.core.springboot.starter.config.WebThreadPoolProperties; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.ApplicationListener; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import java.util.Objects; + +import static cn.hippo4j.core.springboot.starter.event.ThreadPoolDynamicRefreshEventOrder.WEB_EXECUTOR_LISTENER; + +/** + * @author : wh + * @date : 2022/5/13 09:53 + * @description: + */ +@Slf4j +@Component +@Order(WEB_EXECUTOR_LISTENER) +public class WebExecutorListener implements ApplicationListener { + + + @Override + public void onApplicationEvent(ThreadPoolDynamicRefreshEvent threadPoolDynamicRefreshEvent) { + BootstrapCoreProperties bindableCoreProperties = threadPoolDynamicRefreshEvent.getBootstrapCoreProperties(); + boolean isNullFlag = bindableCoreProperties.getJetty() == null + || bindableCoreProperties.getUndertow() == null + || bindableCoreProperties.getTomcat() == null; + if (isNullFlag) { + return; + } + try { + ThreadPoolParameterInfo nowParameter = buildWebPoolParameter(bindableCoreProperties); + if (nowParameter != null) { + WebThreadPoolHandlerChoose webThreadPoolHandlerChoose = ApplicationContextHolder.getBean(WebThreadPoolHandlerChoose.class); + WebThreadPoolService webThreadPoolService = webThreadPoolHandlerChoose.choose(); + ThreadPoolParameter beforeParameter = webThreadPoolService.getWebThreadPoolParameter(); + if (!Objects.equals(beforeParameter.getCoreSize(), nowParameter.getCoreSize()) + || !Objects.equals(beforeParameter.getMaxSize(), nowParameter.getMaxSize()) + || !Objects.equals(beforeParameter.getKeepAliveTime(), nowParameter.getKeepAliveTime())) { + webThreadPoolService.updateWebThreadPool(nowParameter); + } + } + } catch (Exception ex) { + log.error("Failed to modify web thread pool.", ex); + } + + } + + private ThreadPoolParameterInfo buildWebPoolParameter(BootstrapCoreProperties bindableCoreProperties) { + ThreadPoolParameterInfo parameterInfo = null; + WebThreadPoolProperties poolProperties = null; + if (bindableCoreProperties.getTomcat() != null) { + poolProperties = bindableCoreProperties.getTomcat(); + } else if (bindableCoreProperties.getUndertow() != null) { + poolProperties = bindableCoreProperties.getUndertow(); + } else if (bindableCoreProperties.getJetty() != null) { + poolProperties = bindableCoreProperties.getJetty(); + } + if (poolProperties != null) { + parameterInfo = new ThreadPoolParameterInfo(); + parameterInfo.setCoreSize(poolProperties.getCorePoolSize()); + parameterInfo.setMaxSize(poolProperties.getMaximumPoolSize()); + parameterInfo.setKeepAliveTime(poolProperties.getKeepAliveTime()); + } + return parameterInfo; + } +} diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/AbstractCoreThreadPoolDynamicRefresh.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/AbstractCoreThreadPoolDynamicRefresh.java index b10ed906..8fff4b89 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/AbstractCoreThreadPoolDynamicRefresh.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/AbstractCoreThreadPoolDynamicRefresh.java @@ -18,44 +18,23 @@ package cn.hippo4j.core.springboot.starter.refresher; import cn.hippo4j.common.api.ThreadPoolDynamicRefresh; -import cn.hippo4j.common.config.ApplicationContextHolder; -import cn.hippo4j.common.model.ThreadPoolParameter; -import cn.hippo4j.common.model.ThreadPoolParameterInfo; -import cn.hippo4j.common.notify.HippoBaseSendMessageService; -import cn.hippo4j.common.notify.NotifyConfigDTO; import cn.hippo4j.common.notify.ThreadPoolNotifyAlarm; -import cn.hippo4j.common.notify.request.ChangeParameterNotifyRequest; -import cn.hippo4j.core.executor.web.WebThreadPoolHandlerChoose; -import cn.hippo4j.core.executor.web.WebThreadPoolService; -import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; -import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; -import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; -import cn.hippo4j.core.executor.support.*; -import cn.hippo4j.core.proxy.RejectedProxyUtil; -import cn.hippo4j.core.springboot.starter.config.WebThreadPoolProperties; -import cn.hippo4j.core.springboot.starter.support.GlobalCoreThreadPoolManage; +import cn.hippo4j.core.executor.support.ThreadPoolBuilder; import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties; -import cn.hippo4j.core.springboot.starter.config.ExecutorProperties; -import cn.hippo4j.core.springboot.starter.notify.CoreNotifyConfigBuilder; +import cn.hippo4j.core.springboot.starter.event.ThreadPoolDynamicRefreshEvent; import cn.hippo4j.core.springboot.starter.parser.ConfigParserHandler; -import lombok.AllArgsConstructor; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import java.io.IOException; -import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; -import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT; /** * Abstract core thread-pool dynamic refresh. @@ -64,8 +43,8 @@ import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD * @date 2022/2/26 12:42 */ @Slf4j -@AllArgsConstructor -public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh, InitializingBean { +@RequiredArgsConstructor +public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh, InitializingBean, ApplicationContextAware { private final ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler; @@ -73,6 +52,8 @@ public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPool protected final ExecutorService dynamicRefreshExecutorService = ThreadPoolBuilder.builder().singlePool("client.dynamic.refresh").build(); + private ApplicationContext applicationContext; + @Override public void dynamicRefresh(String content) { Map configInfo; @@ -83,14 +64,12 @@ public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPool content, bootstrapCoreProperties.getConfigFileType(), e); return; } - + BootstrapCoreProperties bindableCoreProperties = BootstrapCorePropertiesBinderAdapt.bootstrapCorePropertiesBinder(configInfo, bootstrapCoreProperties); // web pool - refreshWebExecutor(bindableCoreProperties); - // platforms - refreshPlatforms(bindableCoreProperties); - // executors - refreshExecutors(bindableCoreProperties); + ThreadPoolDynamicRefreshEvent event = new ThreadPoolDynamicRefreshEvent(this); + event.setBootstrapCoreProperties(bindableCoreProperties); + applicationContext.publishEvent(event); } /** @@ -107,220 +86,11 @@ public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPool GlobalNotifyAlarmManage.put(executorProperties.getThreadPoolId(), threadPoolNotifyAlarm); }); } + - /** - * Refresh web executor. - * - * @param bindableCoreProperties - */ - private void refreshWebExecutor(BootstrapCoreProperties bindableCoreProperties) { - boolean isNullFlag = bindableCoreProperties.getJetty() == null - || bindableCoreProperties.getUndertow() == null - || bindableCoreProperties.getTomcat() == null; - if (isNullFlag) { - return; - } - try { - ThreadPoolParameterInfo nowParameter = buildWebPoolParameter(bindableCoreProperties); - if (nowParameter != null) { - WebThreadPoolHandlerChoose webThreadPoolHandlerChoose = ApplicationContextHolder.getBean(WebThreadPoolHandlerChoose.class); - WebThreadPoolService webThreadPoolService = webThreadPoolHandlerChoose.choose(); - ThreadPoolParameter beforeParameter = webThreadPoolService.getWebThreadPoolParameter(); - if (!Objects.equals(beforeParameter.getCoreSize(), nowParameter.getCoreSize()) - || !Objects.equals(beforeParameter.getMaxSize(), nowParameter.getMaxSize()) - || !Objects.equals(beforeParameter.getKeepAliveTime(), nowParameter.getKeepAliveTime())) { - webThreadPoolService.updateWebThreadPool(nowParameter); - } - } - } catch (Exception ex) { - log.error("Failed to modify web thread pool.", ex); - } - } - - /** - * Refresh platform. - * - * @param bindableCoreProperties - */ - private void refreshPlatforms(BootstrapCoreProperties bindableCoreProperties) { - List executors = bindableCoreProperties.getExecutors(); - for (ExecutorProperties executor : executors) { - String threadPoolId = executor.getThreadPoolId(); - DynamicThreadPoolWrapper wrapper = GlobalThreadPoolManage.getExecutorService(threadPoolId); - if (!wrapper.isInitFlag()) { - HippoBaseSendMessageService sendMessageService = ApplicationContextHolder.getBean(HippoBaseSendMessageService.class); - CoreNotifyConfigBuilder configBuilder = ApplicationContextHolder.getBean(CoreNotifyConfigBuilder.class); - Map> notifyConfig = configBuilder.buildSingleNotifyConfig(executor); - sendMessageService.putPlatform(notifyConfig); - wrapper.setInitFlag(Boolean.TRUE); - } - } - } - - /** - * Refresh executors. - * - * @param bindableCoreProperties - */ - private void refreshExecutors(BootstrapCoreProperties bindableCoreProperties) { - List executors = bindableCoreProperties.getExecutors(); - for (ExecutorProperties properties : executors) { - String threadPoolId = properties.getThreadPoolId(); - if (!checkConsistency(threadPoolId, properties)) { - continue; - } - // refresh executor pool - dynamicRefreshPool(threadPoolId, properties); - // old properties - ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId()); - // refresh executor properties - GlobalCoreThreadPoolManage.refresh(threadPoolId, properties); - log.info(CHANGE_THREAD_POOL_TEXT, - threadPoolId.toUpperCase(), - String.format(CHANGE_DELIMITER, beforeProperties.getCorePoolSize(), properties.getCorePoolSize()), - String.format(CHANGE_DELIMITER, beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()), - String.format(CHANGE_DELIMITER, beforeProperties.getBlockingQueue(), properties.getBlockingQueue()), - String.format(CHANGE_DELIMITER, beforeProperties.getQueueCapacity(), properties.getQueueCapacity()), - String.format(CHANGE_DELIMITER, beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()), - String.format(CHANGE_DELIMITER, beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()), - String.format(CHANGE_DELIMITER, beforeProperties.getRejectedHandler(), properties.getRejectedHandler()), - String.format(CHANGE_DELIMITER, beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())); - try { - threadPoolNotifyAlarmHandler.sendPoolConfigChange(newChangeRequest(beforeProperties, properties)); - } catch (Throwable ex) { - log.error("Failed to send change notice. Message :: {}", ex.getMessage()); - } - } - } - - /** - * Construct ChangeParameterNotifyRequest instance - * - * @param beforeProperties old properties - * @param properties new properties - * @return instance - */ - private ChangeParameterNotifyRequest newChangeRequest(ExecutorProperties beforeProperties, ExecutorProperties properties) { - ChangeParameterNotifyRequest changeRequest = new ChangeParameterNotifyRequest(); - changeRequest.setBeforeCorePoolSize(beforeProperties.getCorePoolSize()); - changeRequest.setBeforeMaximumPoolSize(beforeProperties.getMaximumPoolSize()); - changeRequest.setBeforeAllowsCoreThreadTimeOut(beforeProperties.getAllowCoreThreadTimeOut()); - changeRequest.setBeforeKeepAliveTime(beforeProperties.getKeepAliveTime()); - changeRequest.setBlockingQueueName(beforeProperties.getBlockingQueue()); - changeRequest.setBeforeQueueCapacity(beforeProperties.getQueueCapacity()); - changeRequest.setBeforeRejectedName(beforeProperties.getRejectedHandler()); - changeRequest.setBeforeExecuteTimeOut(beforeProperties.getExecuteTimeOut()); - changeRequest.setThreadPoolId(beforeProperties.getThreadPoolId()); - changeRequest.setNowCorePoolSize(properties.getCorePoolSize()); - changeRequest.setNowMaximumPoolSize(properties.getMaximumPoolSize()); - changeRequest.setNowAllowsCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut()); - changeRequest.setNowKeepAliveTime(properties.getKeepAliveTime()); - changeRequest.setNowQueueCapacity(properties.getQueueCapacity()); - changeRequest.setNowRejectedName(properties.getRejectedHandler()); - changeRequest.setNowExecuteTimeOut(properties.getExecuteTimeOut()); - return changeRequest; - } - - /** - * Check consistency. - * - * @param threadPoolId - * @param properties - */ - private boolean checkConsistency(String threadPoolId, ExecutorProperties properties) { - ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId()); - ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor(); - boolean result = !Objects.equals(beforeProperties.getCorePoolSize(), properties.getCorePoolSize()) - || !Objects.equals(beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()) - || !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut()) - || !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()) - || !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()) - || !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler()) - || - (!Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity()) - && Objects.equals(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.name, executor.getQueue().getClass().getSimpleName())); - return result; - } - - /** - * Dynamic refresh pool. - * - * @param threadPoolId - * @param properties - */ - private void dynamicRefreshPool(String threadPoolId, ExecutorProperties properties) { - ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId()); - ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor(); - if (properties.getMaximumPoolSize() != null && properties.getCorePoolSize() != null) { - if (properties.getMaximumPoolSize() < executor.getMaximumPoolSize()) { - executor.setCorePoolSize(properties.getCorePoolSize()); - executor.setMaximumPoolSize(properties.getMaximumPoolSize()); - } else { - executor.setMaximumPoolSize(properties.getMaximumPoolSize()); - executor.setCorePoolSize(properties.getCorePoolSize()); - } - } else { - if (properties.getMaximumPoolSize() != null) { - executor.setMaximumPoolSize(properties.getMaximumPoolSize()); - } - if (properties.getCorePoolSize() != null) { - executor.setCorePoolSize(properties.getCorePoolSize()); - } - } - if (!Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())) { - executor.allowCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut()); - } - if (!Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut())) { - if (executor instanceof AbstractDynamicExecutorSupport) { - ((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(properties.getExecuteTimeOut()); - } - } - if (!Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) { - RejectedExecutionHandler rejectedExecutionHandler = RejectedTypeEnum.createPolicy(properties.getRejectedHandler()); - if (executor instanceof AbstractDynamicExecutorSupport) { - DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor; - dynamicExecutor.setRedundancyHandler(rejectedExecutionHandler); - AtomicLong rejectCount = dynamicExecutor.getRejectCount(); - rejectedExecutionHandler = RejectedProxyUtil.createProxy(rejectedExecutionHandler, threadPoolId, rejectCount); - } - executor.setRejectedExecutionHandler(rejectedExecutionHandler); - } - if (!Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) { - executor.setKeepAliveTime(properties.getKeepAliveTime(), TimeUnit.SECONDS); - } - if (!Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity()) - && Objects.equals(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.name, executor.getQueue().getClass().getSimpleName())) { - if (executor.getQueue() instanceof ResizableCapacityLinkedBlockIngQueue) { - ResizableCapacityLinkedBlockIngQueue queue = (ResizableCapacityLinkedBlockIngQueue) executor.getQueue(); - queue.setCapacity(properties.getQueueCapacity()); - } else { - log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type :: {}", executor.getQueue().getClass().getSimpleName()); - } - } - } - - /** - * Build web pool parameter. - * - * @param bindableCoreProperties - * @return - */ - private ThreadPoolParameterInfo buildWebPoolParameter(BootstrapCoreProperties bindableCoreProperties) { - ThreadPoolParameterInfo parameterInfo = null; - WebThreadPoolProperties poolProperties = null; - if (bindableCoreProperties.getTomcat() != null) { - poolProperties = bindableCoreProperties.getTomcat(); - } else if (bindableCoreProperties.getUndertow() != null) { - poolProperties = bindableCoreProperties.getUndertow(); - } else if (bindableCoreProperties.getJetty() != null) { - poolProperties = bindableCoreProperties.getJetty(); - } - if (poolProperties != null) { - parameterInfo = new ThreadPoolParameterInfo(); - parameterInfo.setCoreSize(poolProperties.getCorePoolSize()); - parameterInfo.setMaxSize(poolProperties.getMaximumPoolSize()); - parameterInfo.setKeepAliveTime(poolProperties.getKeepAliveTime()); - } - return parameterInfo; + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; } + }