diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java index 85d8bd9b..4d10c963 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java @@ -17,6 +17,7 @@ package cn.hippo4j.common.model.register; +import com.fasterxml.jackson.annotation.JsonAlias; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -65,7 +66,7 @@ public class DynamicThreadPoolRegisterParameter { /** * Keep alive time */ - private Integer keepAliveTime; + private Long keepAliveTime; /** * Rejected type @@ -83,12 +84,23 @@ public class DynamicThreadPoolRegisterParameter { private Integer capacityAlarm; /** - * Liveness alarm + * Active alarm */ - private Integer livenessAlarm; + @JsonAlias("livenessAlarm") + private Integer activeAlarm; /** * Allow core thread timeout */ - private Integer allowCoreThreadTimeOut; + private Boolean allowCoreThreadTimeOut; + + /** + * Thread name prefix + */ + private String threadNamePrefix; + + /** + * Execute timeout + */ + private Long executeTimeOut; } diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterWrapper.java b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterWrapper.java index cdbd455f..421302aa 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterWrapper.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterWrapper.java @@ -17,6 +17,8 @@ package cn.hippo4j.common.model.register; +import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterCoreNotifyParameter; +import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterServerNotifyParameter; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -50,4 +52,14 @@ public class DynamicThreadPoolRegisterWrapper { * Dynamic thread-pool register parameter */ private DynamicThreadPoolRegisterParameter dynamicThreadPoolRegisterParameter; + + /** + * Dynamic thread-pool core notify parameter + */ + private DynamicThreadPoolRegisterCoreNotifyParameter dynamicThreadPoolRegisterCoreNotifyParameter; + + /** + * Dynamic thread-pool server notify parameter + */ + private DynamicThreadPoolRegisterServerNotifyParameter dynamicThreadPoolRegisterServerNotifyParameter; } diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/notify/DynamicThreadPoolRegisterCoreNotifyParameter.java b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/notify/DynamicThreadPoolRegisterCoreNotifyParameter.java new file mode 100644 index 00000000..ca595243 --- /dev/null +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/notify/DynamicThreadPoolRegisterCoreNotifyParameter.java @@ -0,0 +1,41 @@ +package cn.hippo4j.common.model.register.notify; + +import lombok.*; + +/** + * Dynamic thread-pool register core notify parameter. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class DynamicThreadPoolRegisterCoreNotifyParameter { + + /** + * Whether to enable thread pool running alarm + */ + @NonNull + private Boolean alarm; + + /** + * Active alarm + */ + @NonNull + private Integer activeAlarm; + + /** + * Capacity alarm + */ + @NonNull + private Integer capacityAlarm; + + /** + * Interval + */ + private Integer interval; + + /** + * Receive + */ + private String receives; +} diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/notify/DynamicThreadPoolRegisterServerNotifyParameter.java b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/notify/DynamicThreadPoolRegisterServerNotifyParameter.java new file mode 100644 index 00000000..ce6f72d5 --- /dev/null +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/notify/DynamicThreadPoolRegisterServerNotifyParameter.java @@ -0,0 +1,46 @@ +package cn.hippo4j.common.model.register.notify; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Dynamic thread-pool register server notify parameter. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class DynamicThreadPoolRegisterServerNotifyParameter { + + /** + * Thread-pool id + */ + private String threadPoolId; + + /** + * Platform + */ + private String platform; + + /** + * Config type + */ + private String type; + + /** + * Secret key + */ + private String secretKey; + + /** + * Interval + */ + private Integer interval; + + /** + * Receives + */ + private String receives; +} diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ConfigServiceImpl.java b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ConfigServiceImpl.java index 636b105e..71476a55 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ConfigServiceImpl.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ConfigServiceImpl.java @@ -139,11 +139,7 @@ public class ConfigServiceImpl implements ConfigService { @Override public void register(DynamicThreadPoolRegisterWrapper registerWrapper) { - DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); - ConfigAllInfo configAllInfo = JSONUtil.parseObject(JSONUtil.toJSONString(registerParameter), ConfigAllInfo.class); - configAllInfo.setTenantId(registerWrapper.getTenantId()); - configAllInfo.setItemId(registerWrapper.getItemId()); - configAllInfo.setTpId(registerParameter.getThreadPoolId()); + ConfigAllInfo configAllInfo = parseConfigAllInfo(registerWrapper); TenantService tenantService = ApplicationContextHolder.getBean(TenantService.class); ItemService itemService = ApplicationContextHolder.getBean(ItemService.class); Assert.isTrue(tenantService.getTenantByTenantId(registerWrapper.getTenantId()) != null, "Tenant does not exist"); @@ -157,6 +153,16 @@ public class ConfigServiceImpl implements ConfigService { } } + private ConfigAllInfo parseConfigAllInfo(DynamicThreadPoolRegisterWrapper registerWrapper) { + DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); + ConfigAllInfo configAllInfo = JSONUtil.parseObject(JSONUtil.toJSONString(registerParameter), ConfigAllInfo.class); + configAllInfo.setTenantId(registerWrapper.getTenantId()); + configAllInfo.setItemId(registerWrapper.getItemId()); + configAllInfo.setTpId(registerParameter.getThreadPoolId()); + configAllInfo.setAllowCoreThreadTimeOut(registerParameter.getAllowCoreThreadTimeOut() ? 1 : 0); + return configAllInfo; + } + private void verification(String identify) { if (StringUtil.isNotBlank(identify)) { Map content = getContent(identify); diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/manage/GlobalThreadPoolManage.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/manage/GlobalThreadPoolManage.java index 17c428b1..1fa8672b 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/manage/GlobalThreadPoolManage.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/manage/GlobalThreadPoolManage.java @@ -21,7 +21,7 @@ import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.model.ThreadPoolParameter; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; -import cn.hippo4j.core.executor.support.DynamicThreadPoolService; +import cn.hippo4j.core.executor.support.service.DynamicThreadPoolService; import com.google.common.collect.Lists; import java.util.List; diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/service/AbstractDynamicThreadPoolService.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/service/AbstractDynamicThreadPoolService.java new file mode 100644 index 00000000..d624a0a3 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/service/AbstractDynamicThreadPoolService.java @@ -0,0 +1,34 @@ +package cn.hippo4j.core.executor.support.service; + +import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter; +import cn.hippo4j.core.executor.support.QueueTypeEnum; +import cn.hippo4j.core.executor.support.RejectedTypeEnum; +import cn.hippo4j.core.executor.support.ThreadPoolBuilder; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Abstract dynamic thread-pool service. + */ +public abstract class AbstractDynamicThreadPoolService implements DynamicThreadPoolService { + + /** + * Build dynamic thread-pool executor. + * + * @param registerParameter + * @return + */ + public ThreadPoolExecutor buildDynamicThreadPoolExecutor(DynamicThreadPoolRegisterParameter registerParameter) { + ThreadPoolExecutor dynamicThreadPoolExecutor = ThreadPoolBuilder.builder() + .threadPoolId(registerParameter.getThreadPoolId()) + .corePoolSize(registerParameter.getCorePoolSize()) + .maxPoolNum(registerParameter.getMaximumPoolSize()) + .workQueue(QueueTypeEnum.createBlockingQueue(registerParameter.getQueueType(), registerParameter.getCapacity())) + .threadFactory(registerParameter.getThreadNamePrefix()) + .keepAliveTime(registerParameter.getKeepAliveTime()) + .rejected(RejectedTypeEnum.createPolicy(registerParameter.getRejectedType())) + .dynamicPool() + .build(); + return dynamicThreadPoolExecutor; + } +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/DynamicThreadPoolService.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/service/DynamicThreadPoolService.java similarity index 96% rename from hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/DynamicThreadPoolService.java rename to hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/service/DynamicThreadPoolService.java index b967a08c..dbb6fd5d 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/DynamicThreadPoolService.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/service/DynamicThreadPoolService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.core.executor.support; +package cn.hippo4j.core.executor.support.service; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; diff --git a/hippo4j-message/src/main/java/cn/hippo4j/message/service/ThreadPoolNotifyAlarm.java b/hippo4j-message/src/main/java/cn/hippo4j/message/service/ThreadPoolNotifyAlarm.java index 00ae2c63..d0c2f5c9 100644 --- a/hippo4j-message/src/main/java/cn/hippo4j/message/service/ThreadPoolNotifyAlarm.java +++ b/hippo4j-message/src/main/java/cn/hippo4j/message/service/ThreadPoolNotifyAlarm.java @@ -22,8 +22,6 @@ import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.RequiredArgsConstructor; -import java.util.Map; - /** * Thread pool notify alarm. */ @@ -55,16 +53,8 @@ public class ThreadPoolNotifyAlarm { */ private Integer interval; - /** - * Receive - */ - private String receive; - /** * Receives - * - *

Do not enable this configuration for the time being, it may be useful if you develop mailboxes in the future. */ - @Deprecated - private Map receives; + private String receives; } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/BootstrapCoreProperties.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/BootstrapCoreProperties.java index 93696763..12faa712 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/BootstrapCoreProperties.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/BootstrapCoreProperties.java @@ -133,9 +133,9 @@ public class BootstrapCoreProperties implements BootstrapPropertiesInterface { private Integer alarmInterval; /** - * Receive. + * Receives. */ - private String receive; + private String receives; /** * Executors. diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java index ec672a43..0aa3940d 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java @@ -28,8 +28,9 @@ import cn.hippo4j.core.springboot.starter.refresher.event.AdapterExecutorsListen import cn.hippo4j.core.springboot.starter.refresher.event.ExecutorsListener; import cn.hippo4j.core.springboot.starter.refresher.event.PlatformsListener; import cn.hippo4j.core.springboot.starter.refresher.event.WebExecutorListener; +import cn.hippo4j.core.springboot.starter.support.DynamicThreadPoolAdapterRegister; +import cn.hippo4j.core.springboot.starter.support.DynamicThreadPoolConfigService; import cn.hippo4j.core.springboot.starter.support.DynamicThreadPoolPostProcessor; -import cn.hippo4j.core.springboot.starter.support.ThreadPoolAdapterRegister; import cn.hippo4j.message.api.NotifyConfigBuilder; import cn.hippo4j.message.config.MessageConfiguration; import cn.hippo4j.message.service.AlarmControlHandler; @@ -115,12 +116,17 @@ public class DynamicThreadPoolCoreAutoConfiguration { } @Bean - public ThreadPoolAdapterRegister threadPoolAdapterRegister() { - return new ThreadPoolAdapterRegister(bootstrapCoreProperties); + public DynamicThreadPoolAdapterRegister threadPoolAdapterRegister() { + return new DynamicThreadPoolAdapterRegister(bootstrapCoreProperties); } @Bean public DynamicThreadPoolBannerHandler threadPoolBannerHandler() { return new DynamicThreadPoolBannerHandler(bootstrapCoreProperties); } + + @Bean + public DynamicThreadPoolConfigService dynamicThreadPoolConfigService() { + return new DynamicThreadPoolConfigService(); + } } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/ExecutorProperties.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/ExecutorProperties.java index dbcf4973..60d013fa 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/ExecutorProperties.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/ExecutorProperties.java @@ -18,18 +18,20 @@ package cn.hippo4j.core.springboot.starter.config; import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; -import com.google.common.collect.Maps; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import lombok.experimental.Accessors; -import java.util.Map; -import java.util.Objects; - /** * Executor properties. */ @Data +@Builder @Accessors(chain = true) +@NoArgsConstructor +@AllArgsConstructor public class ExecutorProperties { /** @@ -86,8 +88,4 @@ public class ExecutorProperties { * Notify */ private ThreadPoolNotifyAlarm notify; - - public Map receives() { - return Objects.isNull(this.notify) || this.notify.getReceives() == null ? Maps.newHashMap() : this.notify.getReceives(); - } } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/notify/CoreNotifyConfigBuilder.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/notify/CoreNotifyConfigBuilder.java index b829885e..230e8d6c 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/notify/CoreNotifyConfigBuilder.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/notify/CoreNotifyConfigBuilder.java @@ -121,20 +121,12 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder { private String buildReceive(ExecutorProperties executor, NotifyPlatformProperties platformProperties) { String receive; if (executor.getNotify() != null) { - receive = executor.getNotify().getReceive(); + receive = executor.getNotify().getReceives(); if (StrUtil.isBlank(receive)) { - receive = bootstrapCoreProperties.getReceive(); - if (StrUtil.isBlank(receive)) { - Map receives = executor.receives(); - receive = receives.get(platformProperties.getPlatform()); - } + receive = bootstrapCoreProperties.getReceives(); } } else { - receive = bootstrapCoreProperties.getReceive(); - if (StrUtil.isBlank(receive)) { - Map receives = executor.receives(); - receive = receives.get(platformProperties.getPlatform()); - } + receive = bootstrapCoreProperties.getReceives(); } return receive; } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/BootstrapCorePropertiesBinderAdapt.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/BootstrapCorePropertiesBinderAdapt.java index f8bc5311..ea655781 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/BootstrapCorePropertiesBinderAdapt.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/BootstrapCorePropertiesBinderAdapt.java @@ -22,7 +22,6 @@ import cn.hippo4j.common.toolkit.StringUtil; import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties; import cn.hippo4j.core.springboot.starter.config.ExecutorProperties; import cn.hippo4j.core.springboot.starter.config.NotifyPlatformProperties; -import cn.hippo4j.message.enums.NotifyPlatformEnum; import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.copier.CopyOptions; @@ -82,8 +81,8 @@ public class BootstrapCorePropertiesBinderAdapt { boolean containFlag = key != null && StringUtil.isNotBlank((String) key) && (((String) key).indexOf(PREFIX + ".executors") != -1 - || ((String) key).indexOf(PREFIX + ".notify-platforms") != -1 - || ((String) key).indexOf(PREFIX + ".notifyPlatforms") != -1); + || ((String) key).indexOf(PREFIX + ".notify-platforms") != -1 + || ((String) key).indexOf(PREFIX + ".notifyPlatforms") != -1); if (containFlag) { String targetKey = key.toString().replace(PREFIX + ".", ""); targetMap.put(targetKey, val); @@ -138,14 +137,7 @@ public class BootstrapCorePropertiesBinderAdapt { if (executorProperties != null) { if (CollectionUtil.isNotEmpty(notifySingleMap)) { ThreadPoolNotifyAlarm alarm = BeanUtil.mapToBean(notifySingleMap, ThreadPoolNotifyAlarm.class, true, CopyOptions.create()); - Map notifyReceivesMap = Maps.newHashMap(); - for (NotifyPlatformEnum value : NotifyPlatformEnum.values()) { - Object receives = targetMap.get("executors[" + i + "].notify.receives." + value.name()); - if (receives != null && StringUtil.isNotBlank((String) receives)) { - notifyReceivesMap.put(value.name(), (String) receives); - } - } - alarm.setReceives(notifyReceivesMap); + alarm.setReceives(alarm.getReceives()); executorProperties.setNotify(alarm); } executorPropertiesList.add(executorProperties); diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/ZookeeperRefresherHandler.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/ZookeeperRefresherHandler.java index 1f7cd9ab..25f875e2 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/ZookeeperRefresherHandler.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/ZookeeperRefresherHandler.java @@ -114,7 +114,7 @@ public class ZookeeperRefresherHandler extends AbstractCoreThreadPoolDynamicRefr executorProperties.getNotify().getCapacityAlarm(), executorProperties.getNotify().getActiveAlarm()); threadPoolNotifyAlarm.setInterval(executorProperties.getNotify().getInterval()); - threadPoolNotifyAlarm.setReceives(executorProperties.receives()); + threadPoolNotifyAlarm.setReceives(executorProperties.getNotify().getReceives()); GlobalNotifyAlarmManage.put(executorProperties.getThreadPoolId(), threadPoolNotifyAlarm); }); } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/event/AdapterExecutorsListener.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/event/AdapterExecutorsListener.java index 2bbcb0f1..217f1288 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/event/AdapterExecutorsListener.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/event/AdapterExecutorsListener.java @@ -33,7 +33,7 @@ import java.util.Objects; import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL; import static cn.hippo4j.core.springboot.starter.refresher.event.Hippo4jCoreDynamicRefreshEventOrder.ADAPTER_EXECUTORS_LISTENER; -import static cn.hippo4j.core.springboot.starter.support.ThreadPoolAdapterRegister.ADAPTER_EXECUTORS_MAP; +import static cn.hippo4j.core.springboot.starter.support.DynamicThreadPoolAdapterRegister.ADAPTER_EXECUTORS_MAP; /** * Adapter executors listener. diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/ThreadPoolAdapterRegister.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolAdapterRegister.java similarity index 94% rename from hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/ThreadPoolAdapterRegister.java rename to hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolAdapterRegister.java index f9204fc8..dad7a70f 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/ThreadPoolAdapterRegister.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolAdapterRegister.java @@ -32,11 +32,11 @@ import java.util.Map; import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL; /** - * Thread-pool adapter register. + * Dynamic thread-pool adapter register. */ @Slf4j @AllArgsConstructor -public class ThreadPoolAdapterRegister implements ApplicationRunner { +public class DynamicThreadPoolAdapterRegister implements ApplicationRunner { private final BootstrapCoreProperties bootstrapCoreProperties; diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolConfigService.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolConfigService.java new file mode 100644 index 00000000..84fce801 --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolConfigService.java @@ -0,0 +1,60 @@ +package cn.hippo4j.core.springboot.starter.support; + +import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter; +import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; +import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterCoreNotifyParameter; +import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; +import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; +import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; +import cn.hippo4j.core.executor.support.QueueTypeEnum; +import cn.hippo4j.core.executor.support.RejectedTypeEnum; +import cn.hippo4j.core.executor.support.service.AbstractDynamicThreadPoolService; +import cn.hippo4j.core.springboot.starter.config.ExecutorProperties; +import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Dynamic thread-pool config service. + */ +public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolService { + + @Override + public ThreadPoolExecutor registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper) { + DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); + String threadPoolId = registerParameter.getThreadPoolId(); + ThreadPoolExecutor dynamicThreadPoolExecutor = buildDynamicThreadPoolExecutor(registerParameter); + DynamicThreadPoolWrapper dynamicThreadPoolWrapper = DynamicThreadPoolWrapper.builder() + .threadPoolId(threadPoolId) + .executor(dynamicThreadPoolExecutor) + .build(); + // Register pool. + GlobalThreadPoolManage.registerPool(threadPoolId, dynamicThreadPoolWrapper); + ExecutorProperties executorProperties = buildExecutorProperties(registerWrapper); + // Register properties. + GlobalCoreThreadPoolManage.register(threadPoolId, executorProperties); + DynamicThreadPoolRegisterCoreNotifyParameter notifyParameter = registerWrapper.getDynamicThreadPoolRegisterCoreNotifyParameter(); + ThreadPoolNotifyAlarm notifyAlarm = new ThreadPoolNotifyAlarm(true, notifyParameter.getActiveAlarm(), notifyParameter.getCapacityAlarm()); + notifyAlarm.setReceives(notifyParameter.getReceives()); + notifyAlarm.setInterval(notifyParameter.getInterval()); + // Register notify. + GlobalNotifyAlarmManage.put(threadPoolId, notifyAlarm); + return dynamicThreadPoolExecutor; + } + + private ExecutorProperties buildExecutorProperties(DynamicThreadPoolRegisterWrapper registerWrapper) { + DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); + ExecutorProperties executorProperties = ExecutorProperties.builder() + .corePoolSize(registerParameter.getCorePoolSize()) + .maximumPoolSize(registerParameter.getMaximumPoolSize()) + .allowCoreThreadTimeOut(registerParameter.getAllowCoreThreadTimeOut()) + .keepAliveTime(registerParameter.getKeepAliveTime()) + .blockingQueue(QueueTypeEnum.getBlockingQueueNameByType(registerParameter.getQueueType())) + .threadNamePrefix(registerParameter.getThreadNamePrefix()) + .rejectedHandler(RejectedTypeEnum.getRejectedNameByType(registerParameter.getRejectedType())) + .executeTimeOut(registerParameter.getExecuteTimeOut()) + .threadPoolId(registerParameter.getThreadPoolId()) + .build(); + return executorProperties; + } +} diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolPostProcessor.java index 5a44a5ae..ee9c3be4 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolPostProcessor.java @@ -147,10 +147,10 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { int interval = Optional.ofNullable(notify) .map(each -> each.getInterval()).orElseGet(() -> bootstrapCoreProperties.getAlarmInterval() != null ? bootstrapCoreProperties.getAlarmInterval() : 5); String receive = Optional.ofNullable(notify) - .map(each -> each.getReceive()).orElseGet(() -> bootstrapCoreProperties.getReceive() != null ? bootstrapCoreProperties.getReceive() : null); + .map(each -> each.getReceives()).orElseGet(() -> bootstrapCoreProperties.getReceives() != null ? bootstrapCoreProperties.getReceives() : null); ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(isAlarm, activeAlarm, capacityAlarm); threadPoolNotifyAlarm.setInterval(interval); - threadPoolNotifyAlarm.setReceive(receive); + threadPoolNotifyAlarm.setReceives(receive); GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm); TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor()).getTaskDecorator(); ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator); diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java index 24f40176..6d750d47 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -26,7 +26,7 @@ import cn.hippo4j.core.config.UtilAutoConfiguration; import cn.hippo4j.core.enable.MarkerConfiguration; import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler; -import cn.hippo4j.core.executor.support.DynamicThreadPoolService; +import cn.hippo4j.core.executor.support.service.DynamicThreadPoolService; import cn.hippo4j.core.handler.DynamicThreadPoolBannerHandler; import cn.hippo4j.core.toolkit.IdentifyUtil; import cn.hippo4j.core.toolkit.inet.InetUtils; diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolConfigService.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolConfigService.java index c000520a..83b96ace 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolConfigService.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolConfigService.java @@ -26,10 +26,7 @@ import cn.hippo4j.common.web.base.Result; import cn.hippo4j.common.web.exception.ServiceException; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; -import cn.hippo4j.core.executor.support.DynamicThreadPoolService; -import cn.hippo4j.core.executor.support.QueueTypeEnum; -import cn.hippo4j.core.executor.support.RejectedTypeEnum; -import cn.hippo4j.core.executor.support.ThreadPoolBuilder; +import cn.hippo4j.core.executor.support.service.AbstractDynamicThreadPoolService; import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent; import cn.hippo4j.springboot.starter.remote.HttpAgent; @@ -46,7 +43,7 @@ import static cn.hippo4j.common.constant.Constants.REGISTER_DYNAMIC_THREAD_POOL_ */ @Slf4j @RequiredArgsConstructor -public class DynamicThreadPoolConfigService implements DynamicThreadPoolService, ApplicationListener { +public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolService implements ApplicationListener { private final HttpAgent httpAgent; @@ -56,11 +53,6 @@ public class DynamicThreadPoolConfigService implements DynamicThreadPoolService, private final DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig; - @Override - public void onApplicationEvent(ApplicationCompleteEvent event) { - clientWorker.notifyApplicationComplete(); - } - @Override public ThreadPoolExecutor registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper) { DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); @@ -88,18 +80,9 @@ public class DynamicThreadPoolConfigService implements DynamicThreadPoolService, return dynamicThreadPoolExecutor; } - private ThreadPoolExecutor buildDynamicThreadPoolExecutor(DynamicThreadPoolRegisterParameter registerParameter) { - ThreadPoolExecutor dynamicThreadPoolExecutor = ThreadPoolBuilder.builder() - .threadPoolId(registerParameter.getThreadPoolId()) - .corePoolSize(registerParameter.getCorePoolSize()) - .maxPoolNum(registerParameter.getMaximumPoolSize()) - .workQueue(QueueTypeEnum.createBlockingQueue(registerParameter.getQueueType(), registerParameter.getCapacity())) - .threadFactory(registerParameter.getThreadPoolId()) - .keepAliveTime(registerParameter.getKeepAliveTime()) - .rejected(RejectedTypeEnum.createPolicy(registerParameter.getRejectedType())) - .dynamicPool() - .build(); - return dynamicThreadPoolExecutor; + @Override + public void onApplicationEvent(ApplicationCompleteEvent event) { + clientWorker.notifyApplicationComplete(); } private void checkThreadPoolParameter(DynamicThreadPoolRegisterParameter registerParameter) {