diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterWrapper.java b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterWrapper.java index 7703d267..018578bd 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterWrapper.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterWrapper.java @@ -1,13 +1,10 @@ package cn.hippo4j.common.model.register; -import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; -import java.util.concurrent.ThreadPoolExecutor; - /** * Dynamic thread-pool register wrapper. */ @@ -32,12 +29,6 @@ public class DynamicThreadPoolRegisterWrapper { */ private Boolean updateIfExists = Boolean.TRUE; - /** - * Dynamic thread-pool executor - */ - @JsonIgnore - private ThreadPoolExecutor dynamicThreadPoolExecutor; - /** * Dynamic thread-pool register parameter */ diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/DynamicThreadPoolService.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/DynamicThreadPoolService.java index 38cdbdeb..e0dd1544 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/DynamicThreadPoolService.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/DynamicThreadPoolService.java @@ -2,6 +2,8 @@ package cn.hippo4j.core.executor.support; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; +import java.util.concurrent.ThreadPoolExecutor; + /** * Dynamic thread-pool service. */ @@ -11,6 +13,7 @@ public interface DynamicThreadPoolService { * Registering dynamic thread pools at runtime. * * @param registerWrapper + * @return */ - void registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper); + ThreadPoolExecutor registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper); } diff --git a/hippo4j-example/hippo4j-spring-boot-starter-example/src/main/java/cn/hippo4j/example/server/RegisterDynamicThreadPoolTest.java b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RegisterDynamicThreadPoolTest.java similarity index 65% rename from hippo4j-example/hippo4j-spring-boot-starter-example/src/main/java/cn/hippo4j/example/server/RegisterDynamicThreadPoolTest.java rename to hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RegisterDynamicThreadPoolTest.java index 243f9530..1819a605 100644 --- a/hippo4j-example/hippo4j-spring-boot-starter-example/src/main/java/cn/hippo4j/example/server/RegisterDynamicThreadPoolTest.java +++ b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RegisterDynamicThreadPoolTest.java @@ -1,27 +1,28 @@ -package cn.hippo4j.example.server; +package cn.hippo4j.example.core.inittest; -import cn.hippo4j.core.executor.support.ThreadPoolBuilder; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; +import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.core.executor.support.DynamicThreadPoolService; import lombok.AllArgsConstructor; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; import java.util.concurrent.ThreadPoolExecutor; /** * Register dynamic thread-pool test */ +@Slf4j @Component @AllArgsConstructor -public class RegisterDynamicThreadPoolTest implements ApplicationRunner { +public class RegisterDynamicThreadPoolTest { private final DynamicThreadPoolService dynamicThreadPoolService; - @Override - public void run(ApplicationArguments args) throws Exception { + @PostConstruct + public void registerDynamicThreadPool() { String threadPoolId = "register-dynamic-thread-pool"; DynamicThreadPoolRegisterParameter parameterInfo = new DynamicThreadPoolRegisterParameter(); parameterInfo.setThreadPoolId(threadPoolId); @@ -35,15 +36,10 @@ public class RegisterDynamicThreadPoolTest implements ApplicationRunner { parameterInfo.setCapacityAlarm(90); parameterInfo.setLivenessAlarm(90); parameterInfo.setAllowCoreThreadTimeOut(0); - ThreadPoolExecutor threadPoolExecutor = ThreadPoolBuilder.builder() - .threadPoolId(threadPoolId) - .threadFactory(threadPoolId) - .dynamicPool() - .build(); DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder() .dynamicThreadPoolRegisterParameter(parameterInfo) - .dynamicThreadPoolExecutor(threadPoolExecutor) .build(); - dynamicThreadPoolService.registerDynamicThreadPool(registerWrapper); + ThreadPoolExecutor dynamicThreadPool = dynamicThreadPoolService.registerDynamicThreadPool(registerWrapper); + log.info("Dynamic registration thread pool parameter details: {}", JSONUtil.toJSONString(dynamicThreadPool)); } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolConfigService.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolConfigService.java index d7db55ce..c000520a 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolConfigService.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolConfigService.java @@ -18,14 +18,18 @@ package cn.hippo4j.springboot.starter.core; import cn.hippo4j.common.model.ThreadPoolParameterInfo; +import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter; +import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.web.base.Result; +import cn.hippo4j.common.web.exception.ServiceException; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; -import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter; -import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; import cn.hippo4j.core.executor.support.DynamicThreadPoolService; +import cn.hippo4j.core.executor.support.QueueTypeEnum; +import cn.hippo4j.core.executor.support.RejectedTypeEnum; +import cn.hippo4j.core.executor.support.ThreadPoolBuilder; import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent; import cn.hippo4j.springboot.starter.remote.HttpAgent; @@ -33,6 +37,8 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationListener; +import java.util.concurrent.ThreadPoolExecutor; + import static cn.hippo4j.common.constant.Constants.REGISTER_DYNAMIC_THREAD_POOL_PATH; /** @@ -56,28 +62,44 @@ public class DynamicThreadPoolConfigService implements DynamicThreadPoolService, } @Override - public void registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper) { + public ThreadPoolExecutor registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper) { DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); checkThreadPoolParameter(registerParameter); ThreadPoolParameterInfo parameter = JSONUtil.parseObject(JSONUtil.toJSONString(registerParameter), ThreadPoolParameterInfo.class); String threadPoolId = registerParameter.getThreadPoolId(); - Result registerResult; try { failDynamicThreadPoolRegisterWrapper(registerWrapper); - registerResult = httpAgent.httpPost(REGISTER_DYNAMIC_THREAD_POOL_PATH, registerWrapper); + Result registerResult = httpAgent.httpPost(REGISTER_DYNAMIC_THREAD_POOL_PATH, registerWrapper); + if (registerResult == null || !registerResult.isSuccess()) { + throw new ServiceException("Dynamic thread pool registration returns error."); + } } catch (Throwable ex) { - log.error("Failed to dynamically register thread pool: {}", threadPoolId, ex); + log.error("Dynamic thread pool registration execution error: {}", threadPoolId, ex); throw ex; } - if (registerResult.isSuccess()) { - DynamicThreadPoolWrapper dynamicThreadPoolWrapper = DynamicThreadPoolWrapper.builder() - .threadPoolId(threadPoolId) - .executor(registerWrapper.getDynamicThreadPoolExecutor()) - .build(); - GlobalThreadPoolManage.register(threadPoolId, parameter, dynamicThreadPoolWrapper); - dynamicThreadPoolSubscribeConfig.subscribeConfig(threadPoolId); - clientWorker.addCacheDataIfAbsent(properties.getNamespace(), properties.getItemId(), threadPoolId); - } + ThreadPoolExecutor dynamicThreadPoolExecutor = buildDynamicThreadPoolExecutor(registerParameter); + DynamicThreadPoolWrapper dynamicThreadPoolWrapper = DynamicThreadPoolWrapper.builder() + .threadPoolId(threadPoolId) + .executor(dynamicThreadPoolExecutor) + .build(); + GlobalThreadPoolManage.register(threadPoolId, parameter, dynamicThreadPoolWrapper); + dynamicThreadPoolSubscribeConfig.subscribeConfig(threadPoolId); + clientWorker.addCacheDataIfAbsent(properties.getNamespace(), properties.getItemId(), threadPoolId); + return dynamicThreadPoolExecutor; + } + + private ThreadPoolExecutor buildDynamicThreadPoolExecutor(DynamicThreadPoolRegisterParameter registerParameter) { + ThreadPoolExecutor dynamicThreadPoolExecutor = ThreadPoolBuilder.builder() + .threadPoolId(registerParameter.getThreadPoolId()) + .corePoolSize(registerParameter.getCorePoolSize()) + .maxPoolNum(registerParameter.getMaximumPoolSize()) + .workQueue(QueueTypeEnum.createBlockingQueue(registerParameter.getQueueType(), registerParameter.getCapacity())) + .threadFactory(registerParameter.getThreadPoolId()) + .keepAliveTime(registerParameter.getKeepAliveTime()) + .rejected(RejectedTypeEnum.createPolicy(registerParameter.getRejectedType())) + .dynamicPool() + .build(); + return dynamicThreadPoolExecutor; } private void checkThreadPoolParameter(DynamicThreadPoolRegisterParameter registerParameter) {