Change thread pool dynamic registration logic

pull/497/head
chen.ma 2 years ago
parent 5251d0a82c
commit e0824f88a1

@ -1,13 +1,10 @@
package cn.hippo4j.common.model.register;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Dynamic thread-pool register wrapper.
*/
@ -32,12 +29,6 @@ public class DynamicThreadPoolRegisterWrapper {
*/
private Boolean updateIfExists = Boolean.TRUE;
/**
* Dynamic thread-pool executor
*/
@JsonIgnore
private ThreadPoolExecutor dynamicThreadPoolExecutor;
/**
* Dynamic thread-pool register parameter
*/

@ -2,6 +2,8 @@ package cn.hippo4j.core.executor.support;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Dynamic thread-pool service.
*/
@ -11,6 +13,7 @@ public interface DynamicThreadPoolService {
* Registering dynamic thread pools at runtime.
*
* @param registerWrapper
* @return
*/
void registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper);
ThreadPoolExecutor registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper);
}

@ -1,27 +1,28 @@
package cn.hippo4j.example.server;
package cn.hippo4j.example.core.inittest;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.core.executor.support.DynamicThreadPoolService;
import lombok.AllArgsConstructor;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Register dynamic thread-pool test
*/
@Slf4j
@Component
@AllArgsConstructor
public class RegisterDynamicThreadPoolTest implements ApplicationRunner {
public class RegisterDynamicThreadPoolTest {
private final DynamicThreadPoolService dynamicThreadPoolService;
@Override
public void run(ApplicationArguments args) throws Exception {
@PostConstruct
public void registerDynamicThreadPool() {
String threadPoolId = "register-dynamic-thread-pool";
DynamicThreadPoolRegisterParameter parameterInfo = new DynamicThreadPoolRegisterParameter();
parameterInfo.setThreadPoolId(threadPoolId);
@ -35,15 +36,10 @@ public class RegisterDynamicThreadPoolTest implements ApplicationRunner {
parameterInfo.setCapacityAlarm(90);
parameterInfo.setLivenessAlarm(90);
parameterInfo.setAllowCoreThreadTimeOut(0);
ThreadPoolExecutor threadPoolExecutor = ThreadPoolBuilder.builder()
.threadPoolId(threadPoolId)
.threadFactory(threadPoolId)
.dynamicPool()
.build();
DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder()
.dynamicThreadPoolRegisterParameter(parameterInfo)
.dynamicThreadPoolExecutor(threadPoolExecutor)
.build();
dynamicThreadPoolService.registerDynamicThreadPool(registerWrapper);
ThreadPoolExecutor dynamicThreadPool = dynamicThreadPoolService.registerDynamicThreadPool(registerWrapper);
log.info("Dynamic registration thread pool parameter details: {}", JSONUtil.toJSONString(dynamicThreadPool));
}
}

@ -18,14 +18,18 @@
package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.exception.ServiceException;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.core.executor.support.DynamicThreadPoolService;
import cn.hippo4j.core.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.support.RejectedTypeEnum;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent;
import cn.hippo4j.springboot.starter.remote.HttpAgent;
@ -33,6 +37,8 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import java.util.concurrent.ThreadPoolExecutor;
import static cn.hippo4j.common.constant.Constants.REGISTER_DYNAMIC_THREAD_POOL_PATH;
/**
@ -56,28 +62,44 @@ public class DynamicThreadPoolConfigService implements DynamicThreadPoolService,
}
@Override
public void registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper) {
public ThreadPoolExecutor registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper) {
DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter();
checkThreadPoolParameter(registerParameter);
ThreadPoolParameterInfo parameter = JSONUtil.parseObject(JSONUtil.toJSONString(registerParameter), ThreadPoolParameterInfo.class);
String threadPoolId = registerParameter.getThreadPoolId();
Result registerResult;
try {
failDynamicThreadPoolRegisterWrapper(registerWrapper);
registerResult = httpAgent.httpPost(REGISTER_DYNAMIC_THREAD_POOL_PATH, registerWrapper);
Result registerResult = httpAgent.httpPost(REGISTER_DYNAMIC_THREAD_POOL_PATH, registerWrapper);
if (registerResult == null || !registerResult.isSuccess()) {
throw new ServiceException("Dynamic thread pool registration returns error.");
}
} catch (Throwable ex) {
log.error("Failed to dynamically register thread pool: {}", threadPoolId, ex);
log.error("Dynamic thread pool registration execution error: {}", threadPoolId, ex);
throw ex;
}
if (registerResult.isSuccess()) {
DynamicThreadPoolWrapper dynamicThreadPoolWrapper = DynamicThreadPoolWrapper.builder()
.threadPoolId(threadPoolId)
.executor(registerWrapper.getDynamicThreadPoolExecutor())
.build();
GlobalThreadPoolManage.register(threadPoolId, parameter, dynamicThreadPoolWrapper);
dynamicThreadPoolSubscribeConfig.subscribeConfig(threadPoolId);
clientWorker.addCacheDataIfAbsent(properties.getNamespace(), properties.getItemId(), threadPoolId);
}
ThreadPoolExecutor dynamicThreadPoolExecutor = buildDynamicThreadPoolExecutor(registerParameter);
DynamicThreadPoolWrapper dynamicThreadPoolWrapper = DynamicThreadPoolWrapper.builder()
.threadPoolId(threadPoolId)
.executor(dynamicThreadPoolExecutor)
.build();
GlobalThreadPoolManage.register(threadPoolId, parameter, dynamicThreadPoolWrapper);
dynamicThreadPoolSubscribeConfig.subscribeConfig(threadPoolId);
clientWorker.addCacheDataIfAbsent(properties.getNamespace(), properties.getItemId(), threadPoolId);
return dynamicThreadPoolExecutor;
}
private ThreadPoolExecutor buildDynamicThreadPoolExecutor(DynamicThreadPoolRegisterParameter registerParameter) {
ThreadPoolExecutor dynamicThreadPoolExecutor = ThreadPoolBuilder.builder()
.threadPoolId(registerParameter.getThreadPoolId())
.corePoolSize(registerParameter.getCorePoolSize())
.maxPoolNum(registerParameter.getMaximumPoolSize())
.workQueue(QueueTypeEnum.createBlockingQueue(registerParameter.getQueueType(), registerParameter.getCapacity()))
.threadFactory(registerParameter.getThreadPoolId())
.keepAliveTime(registerParameter.getKeepAliveTime())
.rejected(RejectedTypeEnum.createPolicy(registerParameter.getRejectedType()))
.dynamicPool()
.build();
return dynamicThreadPoolExecutor;
}
private void checkThreadPoolParameter(DynamicThreadPoolRegisterParameter registerParameter) {

Loading…
Cancel
Save