hippo-4j add execution time timeout dynamic change (#527)

pull/531/head
chen.ma 3 years ago
parent 4fb9c1b790
commit 127b5b326d

@ -14,7 +14,6 @@
<spring-web.version>5.2.15.RELEASE</spring-web.version> <spring-web.version>5.2.15.RELEASE</spring-web.version>
<spring-beans.version>5.2.21.RELEASE</spring-beans.version> <spring-beans.version>5.2.21.RELEASE</spring-beans.version>
<spring-core.version>5.2.22.RELEASE</spring-core.version> <spring-core.version>5.2.22.RELEASE</spring-core.version>
<jackson-databind.version>2.12.6.1</jackson-databind.version>
</properties> </properties>
<dependencies> <dependencies>
@ -45,7 +44,6 @@
<dependency> <dependency>
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId> <artifactId>jackson-databind</artifactId>
<version>${jackson-databind.version}</version>
</dependency> </dependency>
<dependency> <dependency>

@ -23,91 +23,98 @@ package cn.hippo4j.common.model;
public interface ThreadPoolParameter { public interface ThreadPoolParameter {
/** /**
* tenantId * Get tenant id
* *
* @return * @return
*/ */
String getTenantId(); String getTenantId();
/** /**
* itemId * Get item id
* *
* @return * @return
*/ */
String getItemId(); String getItemId();
/** /**
* tpId * Get thread-pool id
* *
* @return * @return
*/ */
String getTpId(); String getTpId();
/** /**
* coreSize * Get core size
* *
* @return * @return
*/ */
Integer getCoreSize(); Integer getCoreSize();
/** /**
* maxSize * Get max size
* *
* @return * @return
*/ */
Integer getMaxSize(); Integer getMaxSize();
/** /**
* queueType * Get queue type
* *
* @return * @return
*/ */
Integer getQueueType(); Integer getQueueType();
/** /**
* capacity * Get capacity
* *
* @return * @return
*/ */
Integer getCapacity(); Integer getCapacity();
/** /**
* keepAliveTime * Get keep alive time
* *
* @return * @return
*/ */
Integer getKeepAliveTime(); Integer getKeepAliveTime();
/** /**
* rejectedType * Get execute time out
*
* @return
*/
Long getExecuteTimeOut();
/**
* Get rejected type
* *
* @return * @return
*/ */
Integer getRejectedType(); Integer getRejectedType();
/** /**
* isAlarm * Get is alarm
* *
* @return * @return
*/ */
Integer getIsAlarm(); Integer getIsAlarm();
/** /**
* capacityAlarm * Get capacity alarm
* *
* @return * @return
*/ */
Integer getCapacityAlarm(); Integer getCapacityAlarm();
/** /**
* livenessAlarm * Get liveness alarm
* *
* @return * @return
*/ */
Integer getLivenessAlarm(); Integer getLivenessAlarm();
/** /**
* allowCoreThreadTimeOut * Get allow core thread timeOut
* *
* @return * @return
*/ */

@ -96,6 +96,11 @@ public class ThreadPoolParameterInfo implements ThreadPoolParameter, Serializabl
*/ */
private Integer keepAliveTime; private Integer keepAliveTime;
/**
* Execute time out
*/
private Long executeTimeOut;
/** /**
* Rejected type * Rejected type
*/ */

@ -36,6 +36,7 @@ public class ContentUtil {
.setQueueType(parameter.getQueueType()) .setQueueType(parameter.getQueueType())
.setCapacity(parameter.getCapacity()) .setCapacity(parameter.getCapacity())
.setKeepAliveTime(parameter.getKeepAliveTime()) .setKeepAliveTime(parameter.getKeepAliveTime())
.setExecuteTimeOut(parameter.getExecuteTimeOut())
.setIsAlarm(parameter.getIsAlarm()) .setIsAlarm(parameter.getIsAlarm())
.setCapacityAlarm(parameter.getCapacityAlarm()) .setCapacityAlarm(parameter.getCapacityAlarm())
.setLivenessAlarm(parameter.getLivenessAlarm()) .setLivenessAlarm(parameter.getLivenessAlarm())

@ -81,6 +81,11 @@ public class ConfigInfoBase implements Serializable {
*/ */
private Integer keepAliveTime; private Integer keepAliveTime;
/**
* executeTimeOut
*/
private Long executeTimeOut;
/** /**
* rejectedType * rejectedType
*/ */

@ -78,6 +78,11 @@ public class ThreadPoolRespDTO {
*/ */
private Integer keepAliveTime; private Integer keepAliveTime;
/**
* Execute time out
*/
private Long executeTimeOut;
/** /**
* Is alarm * Is alarm
*/ */

@ -70,6 +70,11 @@ public class ThreadPoolSaveOrUpdateReqDTO {
*/ */
private Integer keepAliveTime; private Integer keepAliveTime;
/**
* Execute time out
*/
private Long executeTimeOut;
/** /**
* Is alarm * Is alarm
*/ */

@ -82,7 +82,10 @@ public class ThreadPoolServiceImpl implements ThreadPoolService {
@Override @Override
public void saveOrUpdateThreadPoolConfig(String identify, ThreadPoolSaveOrUpdateReqDTO reqDTO) { public void saveOrUpdateThreadPoolConfig(String identify, ThreadPoolSaveOrUpdateReqDTO reqDTO) {
configService.insertOrUpdate(identify, false, BeanUtil.convert(reqDTO, ConfigAllInfo.class)); ConfigAllInfo configAllInfo = BeanUtil.convert(reqDTO, ConfigAllInfo.class);
Long executeTimeOut = configAllInfo.getExecuteTimeOut() == 0 ? null : configAllInfo.getExecuteTimeOut();
configAllInfo.setExecuteTimeOut(executeTimeOut);
configService.insertOrUpdate(identify, false, configAllInfo);
} }
@Override @Override

@ -0,0 +1,3 @@
ALTER TABLE config Modify COLUMN keep_alive_time int(11) COMMENT '线程存活时间(秒)';
ALTER TABLE config Add execute_time_out int(11) COMMENT '执行超时时间(毫秒)' AFTER keep_alive_time;

@ -35,6 +35,7 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -44,7 +45,7 @@ import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMI
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT;
/** /**
* Thread pool dynamic refresh. * Server thread-pool dynamic refresh.
*/ */
@Slf4j @Slf4j
@AllArgsConstructor @AllArgsConstructor
@ -94,6 +95,7 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh
request.setBeforeExecuteTimeOut(originalExecuteTimeOut); request.setBeforeExecuteTimeOut(originalExecuteTimeOut);
request.setThreadPoolId(threadPoolId); request.setThreadPoolId(threadPoolId);
changePoolInfo(executor, parameter); changePoolInfo(executor, parameter);
Long executeTimeOut = Optional.ofNullable(parameter.getExecuteTimeOut()).orElse(0L);
ThreadPoolExecutor afterExecutor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor(); ThreadPoolExecutor afterExecutor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor();
request.setNowCorePoolSize(afterExecutor.getCorePoolSize()); request.setNowCorePoolSize(afterExecutor.getCorePoolSize());
request.setNowMaximumPoolSize(afterExecutor.getMaximumPoolSize()); request.setNowMaximumPoolSize(afterExecutor.getMaximumPoolSize());
@ -101,7 +103,7 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh
request.setNowKeepAliveTime(afterExecutor.getKeepAliveTime(TimeUnit.SECONDS)); request.setNowKeepAliveTime(afterExecutor.getKeepAliveTime(TimeUnit.SECONDS));
request.setNowQueueCapacity((afterExecutor.getQueue().remainingCapacity() + afterExecutor.getQueue().size())); request.setNowQueueCapacity((afterExecutor.getQueue().remainingCapacity() + afterExecutor.getQueue().size()));
request.setNowRejectedName(RejectedTypeEnum.getRejectedNameByType(parameter.getRejectedType())); request.setNowRejectedName(RejectedTypeEnum.getRejectedNameByType(parameter.getRejectedType()));
request.setNowExecuteTimeOut(originalExecuteTimeOut); request.setNowExecuteTimeOut(executeTimeOut);
threadPoolNotifyAlarmHandler.sendPoolConfigChange(request); threadPoolNotifyAlarmHandler.sendPoolConfigChange(request);
log.info(CHANGE_THREAD_POOL_TEXT, log.info(CHANGE_THREAD_POOL_TEXT,
threadPoolId, threadPoolId,
@ -109,7 +111,7 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh
String.format(CHANGE_DELIMITER, originalMaximumPoolSize, afterExecutor.getMaximumPoolSize()), String.format(CHANGE_DELIMITER, originalMaximumPoolSize, afterExecutor.getMaximumPoolSize()),
String.format(CHANGE_DELIMITER, originalCapacity, (afterExecutor.getQueue().remainingCapacity() + afterExecutor.getQueue().size())), String.format(CHANGE_DELIMITER, originalCapacity, (afterExecutor.getQueue().remainingCapacity() + afterExecutor.getQueue().size())),
String.format(CHANGE_DELIMITER, originalKeepAliveTime, afterExecutor.getKeepAliveTime(TimeUnit.SECONDS)), String.format(CHANGE_DELIMITER, originalKeepAliveTime, afterExecutor.getKeepAliveTime(TimeUnit.SECONDS)),
String.format(CHANGE_DELIMITER, originalExecuteTimeOut, originalExecuteTimeOut), String.format(CHANGE_DELIMITER, originalExecuteTimeOut, executeTimeOut),
String.format(CHANGE_DELIMITER, originalRejected, RejectedTypeEnum.getRejectedNameByType(parameter.getRejectedType())), String.format(CHANGE_DELIMITER, originalRejected, RejectedTypeEnum.getRejectedNameByType(parameter.getRejectedType())),
String.format(CHANGE_DELIMITER, originalAllowCoreThreadTimeOut, EnableEnum.getBool(parameter.getAllowCoreThreadTimeOut()))); String.format(CHANGE_DELIMITER, originalAllowCoreThreadTimeOut, EnableEnum.getBool(parameter.getAllowCoreThreadTimeOut())));
} }
@ -149,6 +151,10 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh
if (parameter.getKeepAliveTime() != null) { if (parameter.getKeepAliveTime() != null) {
executor.setKeepAliveTime(parameter.getKeepAliveTime(), TimeUnit.SECONDS); executor.setKeepAliveTime(parameter.getKeepAliveTime(), TimeUnit.SECONDS);
} }
Long executeTimeOut = Optional.ofNullable(parameter.getExecuteTimeOut()).orElse(0L);
if (executeTimeOut != null && executor instanceof AbstractDynamicExecutorSupport) {
((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(executeTimeOut);
}
if (parameter.getRejectedType() != null) { if (parameter.getRejectedType() != null) {
RejectedExecutionHandler rejectedExecutionHandler = RejectedTypeEnum.createPolicy(parameter.getRejectedType()); RejectedExecutionHandler rejectedExecutionHandler = RejectedTypeEnum.createPolicy(parameter.getRejectedType());
if (executor instanceof AbstractDynamicExecutorSupport) { if (executor instanceof AbstractDynamicExecutorSupport) {

Loading…
Cancel
Save