From 517779cf8a88d392bc12fbb63ef96df89796cfad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E7=A7=B0=20Ma=20Chen?= Date: Fri, 4 Nov 2022 22:03:06 +0800 Subject: [PATCH] Spring post processor logic refactoring (#874) (#905) --- .../DynamicThreadPoolPostProcessor.java | 74 ++++++++++++----- .../DynamicThreadPoolPostProcessor.java | 81 +++++++++---------- 2 files changed, 92 insertions(+), 63 deletions(-) diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/support/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/support/DynamicThreadPoolPostProcessor.java index 2bce698f..5496fc9d 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/support/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/support/DynamicThreadPoolPostProcessor.java @@ -20,6 +20,7 @@ package cn.hippo4j.config.springboot.starter.support; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum; import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum; +import cn.hippo4j.common.toolkit.ReflectUtil; import cn.hippo4j.common.toolkit.StringUtil; import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties; import cn.hippo4j.config.springboot.starter.config.DynamicThreadPoolNotifyProperties; @@ -31,14 +32,12 @@ import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.support.ThreadPoolBuilder; import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose; -import cn.hippo4j.core.provider.CommonDynamicThreadPoolProviderFactory; import cn.hippo4j.core.toolkit.DynamicThreadPoolAnnotationUtil; import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; -import org.springframework.core.task.TaskDecorator; import java.util.Objects; import java.util.Optional; @@ -96,11 +95,11 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { /** * Fill the thread pool and register. * - * @param dynamicThreadPoolWrapper + * @param dynamicThreadPoolWrapper dynamic thread-pool wrapper */ protected ThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) { String threadPoolId = dynamicThreadPoolWrapper.getThreadPoolId(); - ThreadPoolExecutor newDynamicPoolExecutor = dynamicThreadPoolWrapper.getExecutor(); + ThreadPoolExecutor executor = dynamicThreadPoolWrapper.getExecutor(); ExecutorProperties executorProperties = null; if (configProperties.getExecutors() != null) { executorProperties = configProperties.getExecutors() @@ -109,39 +108,42 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { .findFirst() .orElseThrow(() -> new RuntimeException("The thread pool id does not exist in the configuration.")); try { - newDynamicPoolExecutor = buildNewDynamicThreadPool(executorProperties); + threadPoolParamReplace(executor, executorProperties); } catch (Exception ex) { - log.error("Failed to initialize thread pool configuration. error: {}", ex); + log.error("Failed to initialize thread pool configuration.", ex); } finally { - if (Objects.isNull(dynamicThreadPoolWrapper.getExecutor())) { - dynamicThreadPoolWrapper.setExecutor(CommonDynamicThreadPoolProviderFactory.getInstance(threadPoolId)); - } dynamicThreadPoolWrapper.setInitFlag(Boolean.TRUE); } ThreadPoolNotifyAlarm threadPoolNotifyAlarm = buildThreadPoolNotifyAlarm(executorProperties); GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm); - DynamicThreadPoolExecutor actualDynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor(); - TaskDecorator taskDecorator = actualDynamicThreadPoolExecutor.getTaskDecorator(); - ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator); - long awaitTerminationMillis = actualDynamicThreadPoolExecutor.getAwaitTerminationMillis(); - boolean waitForTasksToCompleteOnShutdown = actualDynamicThreadPoolExecutor.isWaitForTasksToCompleteOnShutdown(); - ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown); - dynamicThreadPoolWrapper.setExecutor(newDynamicPoolExecutor); } GlobalThreadPoolManage.registerPool(dynamicThreadPoolWrapper.getThreadPoolId(), dynamicThreadPoolWrapper); GlobalCoreThreadPoolManage.register( threadPoolId, executorProperties == null - ? buildExecutorProperties(threadPoolId, newDynamicPoolExecutor) + ? buildDefaultExecutorProperties(threadPoolId, executor) : buildActualExecutorProperties(executorProperties)); - return newDynamicPoolExecutor; + return executor; } + /** + * Build actual executor properties. + * + * @param executorProperties executor properties + * @return executor properties + */ private ExecutorProperties buildActualExecutorProperties(ExecutorProperties executorProperties) { return Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> buildExecutorProperties(executorProperties)).orElse(executorProperties); } - private ExecutorProperties buildExecutorProperties(String threadPoolId, ThreadPoolExecutor executor) { + /** + * Build default executor properties. + * + * @param threadPoolId thread-pool id + * @param executor dynamic thread-pool executor + * @return executor properties + */ + private ExecutorProperties buildDefaultExecutorProperties(String threadPoolId, ThreadPoolExecutor executor) { ExecutorProperties executorProperties = new ExecutorProperties(); BlockingQueue blockingQueue = executor.getQueue(); int queueSize = blockingQueue.size(); @@ -160,6 +162,12 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { return executorProperties; } + /** + * Build new dynamic thread-pool. + * + * @param executorProperties executor properties + * @return thread-pool executor + */ private ThreadPoolExecutor buildNewDynamicThreadPool(ExecutorProperties executorProperties) { String threadNamePrefix = executorProperties.getThreadNamePrefix(); ExecutorProperties newExecutorProperties = buildExecutorProperties(executorProperties); @@ -177,6 +185,28 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { return newDynamicPoolExecutor; } + /** + * Thread-pool param replace. + * + * @param executor dynamic thread-pool executor + * @param executorProperties executor properties + */ + private void threadPoolParamReplace(ThreadPoolExecutor executor, ExecutorProperties executorProperties) { + BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(executorProperties.getBlockingQueue(), executorProperties.getQueueCapacity()); + ReflectUtil.setFieldValue(executor, "workQueue", workQueue); + executor.setCorePoolSize(executorProperties.getCorePoolSize()); + executor.setMaximumPoolSize(executorProperties.getMaximumPoolSize()); + executor.setKeepAliveTime(executorProperties.getKeepAliveTime(), TimeUnit.SECONDS); + executor.allowCoreThreadTimeOut(executorProperties.getAllowCoreThreadTimeOut()); + executor.setRejectedExecutionHandler(RejectedPolicyTypeEnum.createPolicy(executorProperties.getRejectedHandler())); + } + + /** + * Build executor properties. + * + * @param executorProperties executor properties + * @return executor properties + */ private ExecutorProperties buildExecutorProperties(ExecutorProperties executorProperties) { ExecutorProperties newExecutorProperties = ExecutorProperties.builder() .corePoolSize(Optional.ofNullable(executorProperties.getCorePoolSize()) @@ -201,6 +231,12 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { return newExecutorProperties; } + /** + * Build thread-pool notify alarm + * + * @param executorProperties executor properties + * @return thread-pool notify alarm + */ private ThreadPoolNotifyAlarm buildThreadPoolNotifyAlarm(ExecutorProperties executorProperties) { DynamicThreadPoolNotifyProperties notify = Optional.ofNullable(executorProperties).map(ExecutorProperties::getNotify).orElse(null); boolean isAlarm = Optional.ofNullable(executorProperties.getAlarm()) diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java index 0d3006d8..b1b6cee9 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java @@ -27,15 +27,14 @@ import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; import cn.hippo4j.common.toolkit.BooleanUtil; import cn.hippo4j.common.toolkit.JSONUtil; +import cn.hippo4j.common.toolkit.ReflectUtil; import cn.hippo4j.common.web.base.Result; import cn.hippo4j.core.executor.DynamicThreadPool; import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; -import cn.hippo4j.core.executor.support.ThreadPoolBuilder; import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose; -import cn.hippo4j.core.provider.CommonDynamicThreadPoolProviderFactory; import cn.hippo4j.core.toolkit.DynamicThreadPoolAnnotationUtil; import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import cn.hippo4j.springboot.starter.config.BootstrapProperties; @@ -45,12 +44,10 @@ import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; -import org.springframework.core.task.TaskDecorator; import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -114,7 +111,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { /** * Register and subscribe. * - * @param dynamicThreadPoolWrapper + * @param dynamicThreadPoolWrapper dynamic thread-pool wrapper */ protected void registerAndSubscribe(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) { fillPoolAndRegister(dynamicThreadPoolWrapper); @@ -124,7 +121,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { /** * Fill the thread pool and register. * - * @param dynamicThreadPoolWrapper + * @param dynamicThreadPoolWrapper dynamic thread-pool wrapper */ protected ThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) { String threadPoolId = dynamicThreadPoolWrapper.getThreadPoolId(); @@ -133,42 +130,14 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { queryStrMap.put(TP_ID, threadPoolId); queryStrMap.put(ITEM_ID, properties.getItemId()); queryStrMap.put(NAMESPACE, properties.getNamespace()); - ThreadPoolExecutor newDynamicThreadPoolExecutor = null; ThreadPoolParameterInfo threadPoolParameterInfo = new ThreadPoolParameterInfo(); try { Result result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 5000L); if (result.isSuccess() && result.getData() != null) { String resultJsonStr = JSONUtil.toJSONString(result.getData()); if ((threadPoolParameterInfo = JSONUtil.parseObject(resultJsonStr, ThreadPoolParameterInfo.class)) != null) { - // Create a thread pool with relevant parameters. - BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(threadPoolParameterInfo.getQueueType(), threadPoolParameterInfo.getCapacity()); - newDynamicThreadPoolExecutor = ThreadPoolBuilder.builder() - .dynamicPool() - .threadPoolId(threadPoolId) - .workQueue(workQueue) - .threadFactory(executor.getThreadFactory()) - .poolThreadSize(threadPoolParameterInfo.corePoolSizeAdapt(), threadPoolParameterInfo.maximumPoolSizeAdapt()) - .keepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS) - .rejected(RejectedPolicyTypeEnum.createPolicy(threadPoolParameterInfo.getRejectedType())) - .allowCoreThreadTimeOut(EnableEnum.getBool(threadPoolParameterInfo.getAllowCoreThreadTimeOut())) - .build(); - // Set dynamic thread pool enhancement parameters. - if (executor instanceof DynamicThreadPoolExecutor) { - ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm( - BooleanUtil.toBoolean(threadPoolParameterInfo.getIsAlarm().toString()), - threadPoolParameterInfo.getLivenessAlarm(), - threadPoolParameterInfo.getCapacityAlarm()); - GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm); - TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) executor).getTaskDecorator(); - ((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setTaskDecorator(taskDecorator); - long awaitTerminationMillis = ((DynamicThreadPoolExecutor) executor).getAwaitTerminationMillis(); - boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) executor).isWaitForTasksToCompleteOnShutdown(); - ((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown); - long executeTimeOut = Optional.ofNullable(threadPoolParameterInfo.getExecuteTimeOut()) - .orElse(((DynamicThreadPoolExecutor) executor).getExecuteTimeOut()); - ((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setExecuteTimeOut(executeTimeOut); - } - dynamicThreadPoolWrapper.setExecutor(newDynamicThreadPoolExecutor); + threadPoolParamReplace(executor, threadPoolParameterInfo); + registerNotifyAlarm(threadPoolParameterInfo); } } else { // DynamicThreadPool configuration undefined in server @@ -191,22 +160,46 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { GlobalThreadPoolManage.dynamicRegister(registerWrapper); } } catch (Exception ex) { - newDynamicThreadPoolExecutor = executor != null ? executor : CommonDynamicThreadPoolProviderFactory.getInstance(threadPoolId); - dynamicThreadPoolWrapper.setExecutor(newDynamicThreadPoolExecutor); log.error("Failed to initialize thread pool configuration. error message: {}", ex.getMessage()); - } finally { - if (Objects.isNull(executor)) { - dynamicThreadPoolWrapper.setExecutor(CommonDynamicThreadPoolProviderFactory.getInstance(threadPoolId)); - } } GlobalThreadPoolManage.register(dynamicThreadPoolWrapper.getThreadPoolId(), threadPoolParameterInfo, dynamicThreadPoolWrapper); - return newDynamicThreadPoolExecutor; + return executor; + } + + /** + * Thread-pool param replace. + * + * @param executor dynamic thread-pool executor + * @param threadPoolParameterInfo thread-pool parameter info + */ + private void threadPoolParamReplace(ThreadPoolExecutor executor, ThreadPoolParameterInfo threadPoolParameterInfo) { + BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(threadPoolParameterInfo.getQueueType(), threadPoolParameterInfo.getCapacity()); + ReflectUtil.setFieldValue(executor, "workQueue", workQueue); + executor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt()); + executor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt()); + executor.setKeepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS); + executor.allowCoreThreadTimeOut(EnableEnum.getBool(threadPoolParameterInfo.getAllowCoreThreadTimeOut())); + executor.setRejectedExecutionHandler(RejectedPolicyTypeEnum.createPolicy(threadPoolParameterInfo.getRejectedType())); + } + + /** + * Register notify alarm. + * + * @param threadPoolParameterInfo thread-pool parameter info + */ + private void registerNotifyAlarm(ThreadPoolParameterInfo threadPoolParameterInfo) { + // Set dynamic thread pool enhancement parameters. + ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm( + BooleanUtil.toBoolean(threadPoolParameterInfo.getIsAlarm().toString()), + threadPoolParameterInfo.getLivenessAlarm(), + threadPoolParameterInfo.getCapacityAlarm()); + GlobalNotifyAlarmManage.put(threadPoolParameterInfo.getTpId(), threadPoolNotifyAlarm); } /** * Client dynamic thread pool subscription server configuration. * - * @param dynamicThreadPoolWrapper + * @param dynamicThreadPoolWrapper dynamic thread-pool wrapper */ protected void subscribeConfig(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) { if (dynamicThreadPoolWrapper.isSubscribeFlag()) {