Optimize the dynamic registration thread pool (#526)

* Migration code package location

* Add condition to update if notification exists

* hippo4j alarm notification initialization

* Optimize the dynamic registration thread pool
pull/528/head^2
小马哥 2 years ago committed by GitHub
parent 592c524396
commit dd4e534b8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -76,7 +76,7 @@ public class DynamicThreadPoolRegisterParameter {
/** /**
* Is alarm * Is alarm
*/ */
private Integer isAlarm; private Boolean isAlarm;
/** /**
* Capacity alarm * Capacity alarm
@ -104,6 +104,10 @@ public class DynamicThreadPoolRegisterParameter {
*/ */
private Long executeTimeOut; private Long executeTimeOut;
public Integer getIsAlarm() {
return this.isAlarm ? 1 : 0;
}
public Integer getAllowCoreThreadTimeOut() { public Integer getAllowCoreThreadTimeOut() {
return this.allowCoreThreadTimeOut ? 1 : 0; return this.allowCoreThreadTimeOut ? 1 : 0;
} }

@ -23,6 +23,7 @@ import cn.hippo4j.core.executor.support.RejectedTypeEnum;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder; import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** /**
* Abstract dynamic thread-pool service. * Abstract dynamic thread-pool service.
@ -42,7 +43,8 @@ public abstract class AbstractDynamicThreadPoolService implements DynamicThreadP
.maxPoolNum(registerParameter.getMaximumPoolSize()) .maxPoolNum(registerParameter.getMaximumPoolSize())
.workQueue(QueueTypeEnum.createBlockingQueue(registerParameter.getQueueType(), registerParameter.getCapacity())) .workQueue(QueueTypeEnum.createBlockingQueue(registerParameter.getQueueType(), registerParameter.getCapacity()))
.threadFactory(registerParameter.getThreadNamePrefix()) .threadFactory(registerParameter.getThreadNamePrefix())
.keepAliveTime(registerParameter.getKeepAliveTime()) .keepAliveTime(registerParameter.getKeepAliveTime(), TimeUnit.SECONDS)
.executeTimeOut(registerParameter.getExecuteTimeOut())
.rejected(RejectedTypeEnum.createPolicy(registerParameter.getRejectedType())) .rejected(RejectedTypeEnum.createPolicy(registerParameter.getRejectedType()))
.dynamicPool() .dynamicPool()
.build(); .build();

@ -23,36 +23,37 @@ import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterCoreNoti
import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterServerNotifyParameter; import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterServerNotifyParameter;
import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.support.RejectedTypeEnum;
import cn.hippo4j.message.enums.NotifyPlatformEnum; import cn.hippo4j.message.enums.NotifyPlatformEnum;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
/** /**
* Register dynamic thread-pool test. * Register dynamic thread-pool test.
*/ */
@Slf4j @Slf4j
@Component
public class RegisterDynamicThreadPoolTest { public class RegisterDynamicThreadPoolTest {
@PostConstruct public static ThreadPoolExecutor registerDynamicThreadPool(String threadPoolId) {
public void registerDynamicThreadPool() { DynamicThreadPoolRegisterParameter parameterInfo = DynamicThreadPoolRegisterParameter.builder()
String threadPoolId = "register-dynamic-thread-pool"; .corePoolSize(3)
DynamicThreadPoolRegisterParameter parameterInfo = new DynamicThreadPoolRegisterParameter(); .maximumPoolSize(10)
parameterInfo.setThreadPoolId(threadPoolId); .queueType(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.type)
parameterInfo.setThreadNamePrefix(threadPoolId); .capacity(110)
parameterInfo.setCorePoolSize(3); // TimeUnit.SECONDS
parameterInfo.setMaximumPoolSize(14); .keepAliveTime(100L)
parameterInfo.setQueueType(9); // TimeUnit.MILLISECONDS
parameterInfo.setCapacity(110); .executeTimeOut(800L)
parameterInfo.setKeepAliveTime(110L); .rejectedType(RejectedTypeEnum.ABORT_POLICY.type)
parameterInfo.setRejectedType(2); .isAlarm(true)
parameterInfo.setIsAlarm(0); .capacityAlarm(80)
parameterInfo.setCapacityAlarm(90); .activeAlarm(80)
parameterInfo.setActiveAlarm(90); .threadPoolId(threadPoolId)
parameterInfo.setAllowCoreThreadTimeOut(Boolean.TRUE); .threadNamePrefix(threadPoolId)
.allowCoreThreadTimeOut(true)
.build();
// Core mode and server mode, you can choose one of them. // Core mode and server mode, you can choose one of them.
DynamicThreadPoolRegisterCoreNotifyParameter coreNotifyParameter = DynamicThreadPoolRegisterCoreNotifyParameter.builder() DynamicThreadPoolRegisterCoreNotifyParameter coreNotifyParameter = DynamicThreadPoolRegisterCoreNotifyParameter.builder()
.activeAlarm(80) .activeAlarm(80)
@ -74,5 +75,6 @@ public class RegisterDynamicThreadPoolTest {
.build(); .build();
ThreadPoolExecutor dynamicThreadPool = GlobalThreadPoolManage.dynamicRegister(registerWrapper); ThreadPoolExecutor dynamicThreadPool = GlobalThreadPoolManage.dynamicRegister(registerWrapper);
log.info("Dynamic registration thread pool parameter details: {}", JSONUtil.toJSONString(dynamicThreadPool)); log.info("Dynamic registration thread pool parameter details: {}", JSONUtil.toJSONString(dynamicThreadPool));
return dynamicThreadPool;
} }
} }

@ -43,8 +43,8 @@ public class RunStateHandlerTest {
private ThreadPoolExecutor messageProduceDynamicThreadPool; private ThreadPoolExecutor messageProduceDynamicThreadPool;
private final ThreadPoolExecutor runStateHandlerTestExecutor = new ThreadPoolExecutor( private final ThreadPoolExecutor runStateHandlerTestExecutor = new ThreadPoolExecutor(
2, 3,
2, 3,
0L, 0L,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,
new SynchronousQueue<>(), new SynchronousQueue<>(),
@ -63,6 +63,9 @@ public class RunStateHandlerTest {
// Start the dynamic thread pool to simulate running tasks // Start the dynamic thread pool to simulate running tasks
runTask(messageConsumeTtlDynamicThreadPool); runTask(messageConsumeTtlDynamicThreadPool);
runTask(messageProduceDynamicThreadPool); runTask(messageProduceDynamicThreadPool);
// Dynamically register thread pool
ThreadPoolExecutor registerDynamicThreadPool = RegisterDynamicThreadPoolTest.registerDynamicThreadPool("register-dynamic-thread-pool");
runTask(registerDynamicThreadPool);
} }
private void runTask(Executor executor) { private void runTask(Executor executor) {

Loading…
Cancel
Save