Merge pull request #497 from mabaiwan/develop

Change thread pool dynamic registration logic
pull/502/head
小马哥 2 years ago committed by GitHub
commit 6a029ef74c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,13 +1,10 @@
package cn.hippo4j.common.model.register; package cn.hippo4j.common.model.register;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import java.util.concurrent.ThreadPoolExecutor;
/** /**
* Dynamic thread-pool register wrapper. * Dynamic thread-pool register wrapper.
*/ */
@ -32,12 +29,6 @@ public class DynamicThreadPoolRegisterWrapper {
*/ */
private Boolean updateIfExists = Boolean.TRUE; private Boolean updateIfExists = Boolean.TRUE;
/**
* Dynamic thread-pool executor
*/
@JsonIgnore
private ThreadPoolExecutor dynamicThreadPoolExecutor;
/** /**
* Dynamic thread-pool register parameter * Dynamic thread-pool register parameter
*/ */

@ -2,6 +2,8 @@ package cn.hippo4j.core.executor.support;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import java.util.concurrent.ThreadPoolExecutor;
/** /**
* Dynamic thread-pool service. * Dynamic thread-pool service.
*/ */
@ -11,6 +13,7 @@ public interface DynamicThreadPoolService {
* Registering dynamic thread pools at runtime. * Registering dynamic thread pools at runtime.
* *
* @param registerWrapper * @param registerWrapper
* @return
*/ */
void registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper); ThreadPoolExecutor registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper);
} }

@ -1,27 +1,28 @@
package cn.hippo4j.example.server; package cn.hippo4j.example.core.inittest;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.core.executor.support.DynamicThreadPoolService; import cn.hippo4j.core.executor.support.DynamicThreadPoolService;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.springframework.boot.ApplicationArguments; import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
/** /**
* Register dynamic thread-pool test * Register dynamic thread-pool test
*/ */
@Slf4j
@Component @Component
@AllArgsConstructor @AllArgsConstructor
public class RegisterDynamicThreadPoolTest implements ApplicationRunner { public class RegisterDynamicThreadPoolTest {
private final DynamicThreadPoolService dynamicThreadPoolService; private final DynamicThreadPoolService dynamicThreadPoolService;
@Override @PostConstruct
public void run(ApplicationArguments args) throws Exception { public void registerDynamicThreadPool() {
String threadPoolId = "register-dynamic-thread-pool"; String threadPoolId = "register-dynamic-thread-pool";
DynamicThreadPoolRegisterParameter parameterInfo = new DynamicThreadPoolRegisterParameter(); DynamicThreadPoolRegisterParameter parameterInfo = new DynamicThreadPoolRegisterParameter();
parameterInfo.setThreadPoolId(threadPoolId); parameterInfo.setThreadPoolId(threadPoolId);
@ -35,15 +36,10 @@ public class RegisterDynamicThreadPoolTest implements ApplicationRunner {
parameterInfo.setCapacityAlarm(90); parameterInfo.setCapacityAlarm(90);
parameterInfo.setLivenessAlarm(90); parameterInfo.setLivenessAlarm(90);
parameterInfo.setAllowCoreThreadTimeOut(0); parameterInfo.setAllowCoreThreadTimeOut(0);
ThreadPoolExecutor threadPoolExecutor = ThreadPoolBuilder.builder()
.threadPoolId(threadPoolId)
.threadFactory(threadPoolId)
.dynamicPool()
.build();
DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder() DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder()
.dynamicThreadPoolRegisterParameter(parameterInfo) .dynamicThreadPoolRegisterParameter(parameterInfo)
.dynamicThreadPoolExecutor(threadPoolExecutor)
.build(); .build();
dynamicThreadPoolService.registerDynamicThreadPool(registerWrapper); ThreadPoolExecutor dynamicThreadPool = dynamicThreadPoolService.registerDynamicThreadPool(registerWrapper);
log.info("Dynamic registration thread pool parameter details: {}", JSONUtil.toJSONString(dynamicThreadPool));
} }
} }

@ -18,14 +18,18 @@
package cn.hippo4j.springboot.starter.core; package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.common.model.ThreadPoolParameterInfo; import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.web.base.Result; import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.exception.ServiceException;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.core.executor.support.DynamicThreadPoolService; import cn.hippo4j.core.executor.support.DynamicThreadPoolService;
import cn.hippo4j.core.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.support.RejectedTypeEnum;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent; import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent;
import cn.hippo4j.springboot.starter.remote.HttpAgent; import cn.hippo4j.springboot.starter.remote.HttpAgent;
@ -33,6 +37,8 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import java.util.concurrent.ThreadPoolExecutor;
import static cn.hippo4j.common.constant.Constants.REGISTER_DYNAMIC_THREAD_POOL_PATH; import static cn.hippo4j.common.constant.Constants.REGISTER_DYNAMIC_THREAD_POOL_PATH;
/** /**
@ -56,28 +62,44 @@ public class DynamicThreadPoolConfigService implements DynamicThreadPoolService,
} }
@Override @Override
public void registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper) { public ThreadPoolExecutor registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper) {
DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter();
checkThreadPoolParameter(registerParameter); checkThreadPoolParameter(registerParameter);
ThreadPoolParameterInfo parameter = JSONUtil.parseObject(JSONUtil.toJSONString(registerParameter), ThreadPoolParameterInfo.class); ThreadPoolParameterInfo parameter = JSONUtil.parseObject(JSONUtil.toJSONString(registerParameter), ThreadPoolParameterInfo.class);
String threadPoolId = registerParameter.getThreadPoolId(); String threadPoolId = registerParameter.getThreadPoolId();
Result registerResult;
try { try {
failDynamicThreadPoolRegisterWrapper(registerWrapper); failDynamicThreadPoolRegisterWrapper(registerWrapper);
registerResult = httpAgent.httpPost(REGISTER_DYNAMIC_THREAD_POOL_PATH, registerWrapper); Result registerResult = httpAgent.httpPost(REGISTER_DYNAMIC_THREAD_POOL_PATH, registerWrapper);
if (registerResult == null || !registerResult.isSuccess()) {
throw new ServiceException("Dynamic thread pool registration returns error.");
}
} catch (Throwable ex) { } catch (Throwable ex) {
log.error("Failed to dynamically register thread pool: {}", threadPoolId, ex); log.error("Dynamic thread pool registration execution error: {}", threadPoolId, ex);
throw ex; throw ex;
} }
if (registerResult.isSuccess()) { ThreadPoolExecutor dynamicThreadPoolExecutor = buildDynamicThreadPoolExecutor(registerParameter);
DynamicThreadPoolWrapper dynamicThreadPoolWrapper = DynamicThreadPoolWrapper.builder() DynamicThreadPoolWrapper dynamicThreadPoolWrapper = DynamicThreadPoolWrapper.builder()
.threadPoolId(threadPoolId) .threadPoolId(threadPoolId)
.executor(registerWrapper.getDynamicThreadPoolExecutor()) .executor(dynamicThreadPoolExecutor)
.build(); .build();
GlobalThreadPoolManage.register(threadPoolId, parameter, dynamicThreadPoolWrapper); GlobalThreadPoolManage.register(threadPoolId, parameter, dynamicThreadPoolWrapper);
dynamicThreadPoolSubscribeConfig.subscribeConfig(threadPoolId); dynamicThreadPoolSubscribeConfig.subscribeConfig(threadPoolId);
clientWorker.addCacheDataIfAbsent(properties.getNamespace(), properties.getItemId(), threadPoolId); clientWorker.addCacheDataIfAbsent(properties.getNamespace(), properties.getItemId(), threadPoolId);
return dynamicThreadPoolExecutor;
} }
private ThreadPoolExecutor buildDynamicThreadPoolExecutor(DynamicThreadPoolRegisterParameter registerParameter) {
ThreadPoolExecutor dynamicThreadPoolExecutor = ThreadPoolBuilder.builder()
.threadPoolId(registerParameter.getThreadPoolId())
.corePoolSize(registerParameter.getCorePoolSize())
.maxPoolNum(registerParameter.getMaximumPoolSize())
.workQueue(QueueTypeEnum.createBlockingQueue(registerParameter.getQueueType(), registerParameter.getCapacity()))
.threadFactory(registerParameter.getThreadPoolId())
.keepAliveTime(registerParameter.getKeepAliveTime())
.rejected(RejectedTypeEnum.createPolicy(registerParameter.getRejectedType()))
.dynamicPool()
.build();
return dynamicThreadPoolExecutor;
} }
private void checkThreadPoolParameter(DynamicThreadPoolRegisterParameter registerParameter) { private void checkThreadPoolParameter(DynamicThreadPoolRegisterParameter registerParameter) {

Loading…
Cancel
Save