Improve the dynamic registration thread pool logic

pull/512/head
chen.ma 2 years ago
parent d27eeba812
commit 840e9bdab4

@ -17,6 +17,7 @@
package cn.hippo4j.common.model.register; package cn.hippo4j.common.model.register;
import com.fasterxml.jackson.annotation.JsonAlias;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
@ -65,7 +66,7 @@ public class DynamicThreadPoolRegisterParameter {
/** /**
* Keep alive time * Keep alive time
*/ */
private Integer keepAliveTime; private Long keepAliveTime;
/** /**
* Rejected type * Rejected type
@ -83,12 +84,23 @@ public class DynamicThreadPoolRegisterParameter {
private Integer capacityAlarm; private Integer capacityAlarm;
/** /**
* Liveness alarm * Active alarm
*/ */
private Integer livenessAlarm; @JsonAlias("livenessAlarm")
private Integer activeAlarm;
/** /**
* Allow core thread timeout * Allow core thread timeout
*/ */
private Integer allowCoreThreadTimeOut; private Boolean allowCoreThreadTimeOut;
/**
* Thread name prefix
*/
private String threadNamePrefix;
/**
* Execute timeout
*/
private Long executeTimeOut;
} }

@ -17,6 +17,8 @@
package cn.hippo4j.common.model.register; package cn.hippo4j.common.model.register;
import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterCoreNotifyParameter;
import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterServerNotifyParameter;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
@ -50,4 +52,14 @@ public class DynamicThreadPoolRegisterWrapper {
* Dynamic thread-pool register parameter * Dynamic thread-pool register parameter
*/ */
private DynamicThreadPoolRegisterParameter dynamicThreadPoolRegisterParameter; private DynamicThreadPoolRegisterParameter dynamicThreadPoolRegisterParameter;
/**
* Dynamic thread-pool core notify parameter
*/
private DynamicThreadPoolRegisterCoreNotifyParameter dynamicThreadPoolRegisterCoreNotifyParameter;
/**
* Dynamic thread-pool server notify parameter
*/
private DynamicThreadPoolRegisterServerNotifyParameter dynamicThreadPoolRegisterServerNotifyParameter;
} }

@ -0,0 +1,41 @@
package cn.hippo4j.common.model.register.notify;
import lombok.*;
/**
* Dynamic thread-pool register core notify parameter.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DynamicThreadPoolRegisterCoreNotifyParameter {
/**
* Whether to enable thread pool running alarm
*/
@NonNull
private Boolean alarm;
/**
* Active alarm
*/
@NonNull
private Integer activeAlarm;
/**
* Capacity alarm
*/
@NonNull
private Integer capacityAlarm;
/**
* Interval
*/
private Integer interval;
/**
* Receive
*/
private String receives;
}

@ -0,0 +1,46 @@
package cn.hippo4j.common.model.register.notify;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* Dynamic thread-pool register server notify parameter.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DynamicThreadPoolRegisterServerNotifyParameter {
/**
* Thread-pool id
*/
private String threadPoolId;
/**
* Platform
*/
private String platform;
/**
* Config type
*/
private String type;
/**
* Secret key
*/
private String secretKey;
/**
* Interval
*/
private Integer interval;
/**
* Receives
*/
private String receives;
}

@ -139,11 +139,7 @@ public class ConfigServiceImpl implements ConfigService {
@Override @Override
public void register(DynamicThreadPoolRegisterWrapper registerWrapper) { public void register(DynamicThreadPoolRegisterWrapper registerWrapper) {
DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); ConfigAllInfo configAllInfo = parseConfigAllInfo(registerWrapper);
ConfigAllInfo configAllInfo = JSONUtil.parseObject(JSONUtil.toJSONString(registerParameter), ConfigAllInfo.class);
configAllInfo.setTenantId(registerWrapper.getTenantId());
configAllInfo.setItemId(registerWrapper.getItemId());
configAllInfo.setTpId(registerParameter.getThreadPoolId());
TenantService tenantService = ApplicationContextHolder.getBean(TenantService.class); TenantService tenantService = ApplicationContextHolder.getBean(TenantService.class);
ItemService itemService = ApplicationContextHolder.getBean(ItemService.class); ItemService itemService = ApplicationContextHolder.getBean(ItemService.class);
Assert.isTrue(tenantService.getTenantByTenantId(registerWrapper.getTenantId()) != null, "Tenant does not exist"); Assert.isTrue(tenantService.getTenantByTenantId(registerWrapper.getTenantId()) != null, "Tenant does not exist");
@ -157,6 +153,16 @@ public class ConfigServiceImpl implements ConfigService {
} }
} }
private ConfigAllInfo parseConfigAllInfo(DynamicThreadPoolRegisterWrapper registerWrapper) {
DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter();
ConfigAllInfo configAllInfo = JSONUtil.parseObject(JSONUtil.toJSONString(registerParameter), ConfigAllInfo.class);
configAllInfo.setTenantId(registerWrapper.getTenantId());
configAllInfo.setItemId(registerWrapper.getItemId());
configAllInfo.setTpId(registerParameter.getThreadPoolId());
configAllInfo.setAllowCoreThreadTimeOut(registerParameter.getAllowCoreThreadTimeOut() ? 1 : 0);
return configAllInfo;
}
private void verification(String identify) { private void verification(String identify) {
if (StringUtil.isNotBlank(identify)) { if (StringUtil.isNotBlank(identify)) {
Map content = getContent(identify); Map content = getContent(identify);

@ -21,7 +21,7 @@ import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.model.ThreadPoolParameter; import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.support.DynamicThreadPoolService; import cn.hippo4j.core.executor.support.service.DynamicThreadPoolService;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import java.util.List; import java.util.List;

@ -0,0 +1,34 @@
package cn.hippo4j.core.executor.support.service;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.core.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.support.RejectedTypeEnum;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Abstract dynamic thread-pool service.
*/
public abstract class AbstractDynamicThreadPoolService implements DynamicThreadPoolService {
/**
* Build dynamic thread-pool executor.
*
* @param registerParameter
* @return
*/
public ThreadPoolExecutor buildDynamicThreadPoolExecutor(DynamicThreadPoolRegisterParameter registerParameter) {
ThreadPoolExecutor dynamicThreadPoolExecutor = ThreadPoolBuilder.builder()
.threadPoolId(registerParameter.getThreadPoolId())
.corePoolSize(registerParameter.getCorePoolSize())
.maxPoolNum(registerParameter.getMaximumPoolSize())
.workQueue(QueueTypeEnum.createBlockingQueue(registerParameter.getQueueType(), registerParameter.getCapacity()))
.threadFactory(registerParameter.getThreadNamePrefix())
.keepAliveTime(registerParameter.getKeepAliveTime())
.rejected(RejectedTypeEnum.createPolicy(registerParameter.getRejectedType()))
.dynamicPool()
.build();
return dynamicThreadPoolExecutor;
}
}

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.core.executor.support; package cn.hippo4j.core.executor.support.service;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;

@ -22,8 +22,6 @@ import lombok.NoArgsConstructor;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import java.util.Map;
/** /**
* Thread pool notify alarm. * Thread pool notify alarm.
*/ */
@ -55,16 +53,8 @@ public class ThreadPoolNotifyAlarm {
*/ */
private Integer interval; private Integer interval;
/**
* Receive
*/
private String receive;
/** /**
* Receives * Receives
*
* <p> Do not enable this configuration for the time being, it may be useful if you develop mailboxes in the future.
*/ */
@Deprecated private String receives;
private Map<String, String> receives;
} }

@ -133,9 +133,9 @@ public class BootstrapCoreProperties implements BootstrapPropertiesInterface {
private Integer alarmInterval; private Integer alarmInterval;
/** /**
* Receive. * Receives.
*/ */
private String receive; private String receives;
/** /**
* Executors. * Executors.

@ -28,8 +28,9 @@ import cn.hippo4j.core.springboot.starter.refresher.event.AdapterExecutorsListen
import cn.hippo4j.core.springboot.starter.refresher.event.ExecutorsListener; import cn.hippo4j.core.springboot.starter.refresher.event.ExecutorsListener;
import cn.hippo4j.core.springboot.starter.refresher.event.PlatformsListener; import cn.hippo4j.core.springboot.starter.refresher.event.PlatformsListener;
import cn.hippo4j.core.springboot.starter.refresher.event.WebExecutorListener; import cn.hippo4j.core.springboot.starter.refresher.event.WebExecutorListener;
import cn.hippo4j.core.springboot.starter.support.DynamicThreadPoolAdapterRegister;
import cn.hippo4j.core.springboot.starter.support.DynamicThreadPoolConfigService;
import cn.hippo4j.core.springboot.starter.support.DynamicThreadPoolPostProcessor; import cn.hippo4j.core.springboot.starter.support.DynamicThreadPoolPostProcessor;
import cn.hippo4j.core.springboot.starter.support.ThreadPoolAdapterRegister;
import cn.hippo4j.message.api.NotifyConfigBuilder; import cn.hippo4j.message.api.NotifyConfigBuilder;
import cn.hippo4j.message.config.MessageConfiguration; import cn.hippo4j.message.config.MessageConfiguration;
import cn.hippo4j.message.service.AlarmControlHandler; import cn.hippo4j.message.service.AlarmControlHandler;
@ -115,12 +116,17 @@ public class DynamicThreadPoolCoreAutoConfiguration {
} }
@Bean @Bean
public ThreadPoolAdapterRegister threadPoolAdapterRegister() { public DynamicThreadPoolAdapterRegister threadPoolAdapterRegister() {
return new ThreadPoolAdapterRegister(bootstrapCoreProperties); return new DynamicThreadPoolAdapterRegister(bootstrapCoreProperties);
} }
@Bean @Bean
public DynamicThreadPoolBannerHandler threadPoolBannerHandler() { public DynamicThreadPoolBannerHandler threadPoolBannerHandler() {
return new DynamicThreadPoolBannerHandler(bootstrapCoreProperties); return new DynamicThreadPoolBannerHandler(bootstrapCoreProperties);
} }
@Bean
public DynamicThreadPoolConfigService dynamicThreadPoolConfigService() {
return new DynamicThreadPoolConfigService();
}
} }

@ -18,18 +18,20 @@
package cn.hippo4j.core.springboot.starter.config; package cn.hippo4j.core.springboot.starter.config;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import com.google.common.collect.Maps; import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import java.util.Map;
import java.util.Objects;
/** /**
* Executor properties. * Executor properties.
*/ */
@Data @Data
@Builder
@Accessors(chain = true) @Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
public class ExecutorProperties { public class ExecutorProperties {
/** /**
@ -86,8 +88,4 @@ public class ExecutorProperties {
* Notify * Notify
*/ */
private ThreadPoolNotifyAlarm notify; private ThreadPoolNotifyAlarm notify;
public Map<String, String> receives() {
return Objects.isNull(this.notify) || this.notify.getReceives() == null ? Maps.newHashMap() : this.notify.getReceives();
}
} }

@ -121,20 +121,12 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder {
private String buildReceive(ExecutorProperties executor, NotifyPlatformProperties platformProperties) { private String buildReceive(ExecutorProperties executor, NotifyPlatformProperties platformProperties) {
String receive; String receive;
if (executor.getNotify() != null) { if (executor.getNotify() != null) {
receive = executor.getNotify().getReceive(); receive = executor.getNotify().getReceives();
if (StrUtil.isBlank(receive)) { if (StrUtil.isBlank(receive)) {
receive = bootstrapCoreProperties.getReceive(); receive = bootstrapCoreProperties.getReceives();
if (StrUtil.isBlank(receive)) {
Map<String, String> receives = executor.receives();
receive = receives.get(platformProperties.getPlatform());
}
} }
} else { } else {
receive = bootstrapCoreProperties.getReceive(); receive = bootstrapCoreProperties.getReceives();
if (StrUtil.isBlank(receive)) {
Map<String, String> receives = executor.receives();
receive = receives.get(platformProperties.getPlatform());
}
} }
return receive; return receive;
} }

@ -22,7 +22,6 @@ import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties; import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties;
import cn.hippo4j.core.springboot.starter.config.ExecutorProperties; import cn.hippo4j.core.springboot.starter.config.ExecutorProperties;
import cn.hippo4j.core.springboot.starter.config.NotifyPlatformProperties; import cn.hippo4j.core.springboot.starter.config.NotifyPlatformProperties;
import cn.hippo4j.message.enums.NotifyPlatformEnum;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions; import cn.hutool.core.bean.copier.CopyOptions;
@ -82,8 +81,8 @@ public class BootstrapCorePropertiesBinderAdapt {
boolean containFlag = key != null boolean containFlag = key != null
&& StringUtil.isNotBlank((String) key) && StringUtil.isNotBlank((String) key)
&& (((String) key).indexOf(PREFIX + ".executors") != -1 && (((String) key).indexOf(PREFIX + ".executors") != -1
|| ((String) key).indexOf(PREFIX + ".notify-platforms") != -1 || ((String) key).indexOf(PREFIX + ".notify-platforms") != -1
|| ((String) key).indexOf(PREFIX + ".notifyPlatforms") != -1); || ((String) key).indexOf(PREFIX + ".notifyPlatforms") != -1);
if (containFlag) { if (containFlag) {
String targetKey = key.toString().replace(PREFIX + ".", ""); String targetKey = key.toString().replace(PREFIX + ".", "");
targetMap.put(targetKey, val); targetMap.put(targetKey, val);
@ -138,14 +137,7 @@ public class BootstrapCorePropertiesBinderAdapt {
if (executorProperties != null) { if (executorProperties != null) {
if (CollectionUtil.isNotEmpty(notifySingleMap)) { if (CollectionUtil.isNotEmpty(notifySingleMap)) {
ThreadPoolNotifyAlarm alarm = BeanUtil.mapToBean(notifySingleMap, ThreadPoolNotifyAlarm.class, true, CopyOptions.create()); ThreadPoolNotifyAlarm alarm = BeanUtil.mapToBean(notifySingleMap, ThreadPoolNotifyAlarm.class, true, CopyOptions.create());
Map<String, String> notifyReceivesMap = Maps.newHashMap(); alarm.setReceives(alarm.getReceives());
for (NotifyPlatformEnum value : NotifyPlatformEnum.values()) {
Object receives = targetMap.get("executors[" + i + "].notify.receives." + value.name());
if (receives != null && StringUtil.isNotBlank((String) receives)) {
notifyReceivesMap.put(value.name(), (String) receives);
}
}
alarm.setReceives(notifyReceivesMap);
executorProperties.setNotify(alarm); executorProperties.setNotify(alarm);
} }
executorPropertiesList.add(executorProperties); executorPropertiesList.add(executorProperties);

@ -114,7 +114,7 @@ public class ZookeeperRefresherHandler extends AbstractCoreThreadPoolDynamicRefr
executorProperties.getNotify().getCapacityAlarm(), executorProperties.getNotify().getCapacityAlarm(),
executorProperties.getNotify().getActiveAlarm()); executorProperties.getNotify().getActiveAlarm());
threadPoolNotifyAlarm.setInterval(executorProperties.getNotify().getInterval()); threadPoolNotifyAlarm.setInterval(executorProperties.getNotify().getInterval());
threadPoolNotifyAlarm.setReceives(executorProperties.receives()); threadPoolNotifyAlarm.setReceives(executorProperties.getNotify().getReceives());
GlobalNotifyAlarmManage.put(executorProperties.getThreadPoolId(), threadPoolNotifyAlarm); GlobalNotifyAlarmManage.put(executorProperties.getThreadPoolId(), threadPoolNotifyAlarm);
}); });
} }

@ -33,7 +33,7 @@ import java.util.Objects;
import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL; import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL;
import static cn.hippo4j.core.springboot.starter.refresher.event.Hippo4jCoreDynamicRefreshEventOrder.ADAPTER_EXECUTORS_LISTENER; import static cn.hippo4j.core.springboot.starter.refresher.event.Hippo4jCoreDynamicRefreshEventOrder.ADAPTER_EXECUTORS_LISTENER;
import static cn.hippo4j.core.springboot.starter.support.ThreadPoolAdapterRegister.ADAPTER_EXECUTORS_MAP; import static cn.hippo4j.core.springboot.starter.support.DynamicThreadPoolAdapterRegister.ADAPTER_EXECUTORS_MAP;
/** /**
* Adapter executors listener. * Adapter executors listener.

@ -32,11 +32,11 @@ import java.util.Map;
import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL; import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL;
/** /**
* Thread-pool adapter register. * Dynamic thread-pool adapter register.
*/ */
@Slf4j @Slf4j
@AllArgsConstructor @AllArgsConstructor
public class ThreadPoolAdapterRegister implements ApplicationRunner { public class DynamicThreadPoolAdapterRegister implements ApplicationRunner {
private final BootstrapCoreProperties bootstrapCoreProperties; private final BootstrapCoreProperties bootstrapCoreProperties;

@ -0,0 +1,60 @@
package cn.hippo4j.core.springboot.starter.support;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterCoreNotifyParameter;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.support.RejectedTypeEnum;
import cn.hippo4j.core.executor.support.service.AbstractDynamicThreadPoolService;
import cn.hippo4j.core.springboot.starter.config.ExecutorProperties;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Dynamic thread-pool config service.
*/
public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolService {
@Override
public ThreadPoolExecutor registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper) {
DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter();
String threadPoolId = registerParameter.getThreadPoolId();
ThreadPoolExecutor dynamicThreadPoolExecutor = buildDynamicThreadPoolExecutor(registerParameter);
DynamicThreadPoolWrapper dynamicThreadPoolWrapper = DynamicThreadPoolWrapper.builder()
.threadPoolId(threadPoolId)
.executor(dynamicThreadPoolExecutor)
.build();
// Register pool.
GlobalThreadPoolManage.registerPool(threadPoolId, dynamicThreadPoolWrapper);
ExecutorProperties executorProperties = buildExecutorProperties(registerWrapper);
// Register properties.
GlobalCoreThreadPoolManage.register(threadPoolId, executorProperties);
DynamicThreadPoolRegisterCoreNotifyParameter notifyParameter = registerWrapper.getDynamicThreadPoolRegisterCoreNotifyParameter();
ThreadPoolNotifyAlarm notifyAlarm = new ThreadPoolNotifyAlarm(true, notifyParameter.getActiveAlarm(), notifyParameter.getCapacityAlarm());
notifyAlarm.setReceives(notifyParameter.getReceives());
notifyAlarm.setInterval(notifyParameter.getInterval());
// Register notify.
GlobalNotifyAlarmManage.put(threadPoolId, notifyAlarm);
return dynamicThreadPoolExecutor;
}
private ExecutorProperties buildExecutorProperties(DynamicThreadPoolRegisterWrapper registerWrapper) {
DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter();
ExecutorProperties executorProperties = ExecutorProperties.builder()
.corePoolSize(registerParameter.getCorePoolSize())
.maximumPoolSize(registerParameter.getMaximumPoolSize())
.allowCoreThreadTimeOut(registerParameter.getAllowCoreThreadTimeOut())
.keepAliveTime(registerParameter.getKeepAliveTime())
.blockingQueue(QueueTypeEnum.getBlockingQueueNameByType(registerParameter.getQueueType()))
.threadNamePrefix(registerParameter.getThreadNamePrefix())
.rejectedHandler(RejectedTypeEnum.getRejectedNameByType(registerParameter.getRejectedType()))
.executeTimeOut(registerParameter.getExecuteTimeOut())
.threadPoolId(registerParameter.getThreadPoolId())
.build();
return executorProperties;
}
}

@ -147,10 +147,10 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
int interval = Optional.ofNullable(notify) int interval = Optional.ofNullable(notify)
.map(each -> each.getInterval()).orElseGet(() -> bootstrapCoreProperties.getAlarmInterval() != null ? bootstrapCoreProperties.getAlarmInterval() : 5); .map(each -> each.getInterval()).orElseGet(() -> bootstrapCoreProperties.getAlarmInterval() != null ? bootstrapCoreProperties.getAlarmInterval() : 5);
String receive = Optional.ofNullable(notify) String receive = Optional.ofNullable(notify)
.map(each -> each.getReceive()).orElseGet(() -> bootstrapCoreProperties.getReceive() != null ? bootstrapCoreProperties.getReceive() : null); .map(each -> each.getReceives()).orElseGet(() -> bootstrapCoreProperties.getReceives() != null ? bootstrapCoreProperties.getReceives() : null);
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(isAlarm, activeAlarm, capacityAlarm); ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(isAlarm, activeAlarm, capacityAlarm);
threadPoolNotifyAlarm.setInterval(interval); threadPoolNotifyAlarm.setInterval(interval);
threadPoolNotifyAlarm.setReceive(receive); threadPoolNotifyAlarm.setReceives(receive);
GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm); GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm);
TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor()).getTaskDecorator(); TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor()).getTaskDecorator();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator); ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator);

@ -26,7 +26,7 @@ import cn.hippo4j.core.config.UtilAutoConfiguration;
import cn.hippo4j.core.enable.MarkerConfiguration; import cn.hippo4j.core.enable.MarkerConfiguration;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler; import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import cn.hippo4j.core.executor.support.DynamicThreadPoolService; import cn.hippo4j.core.executor.support.service.DynamicThreadPoolService;
import cn.hippo4j.core.handler.DynamicThreadPoolBannerHandler; import cn.hippo4j.core.handler.DynamicThreadPoolBannerHandler;
import cn.hippo4j.core.toolkit.IdentifyUtil; import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.core.toolkit.inet.InetUtils; import cn.hippo4j.core.toolkit.inet.InetUtils;

@ -26,10 +26,7 @@ import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.exception.ServiceException; import cn.hippo4j.common.web.exception.ServiceException;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.DynamicThreadPoolService; import cn.hippo4j.core.executor.support.service.AbstractDynamicThreadPoolService;
import cn.hippo4j.core.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.support.RejectedTypeEnum;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent; import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent;
import cn.hippo4j.springboot.starter.remote.HttpAgent; import cn.hippo4j.springboot.starter.remote.HttpAgent;
@ -46,7 +43,7 @@ import static cn.hippo4j.common.constant.Constants.REGISTER_DYNAMIC_THREAD_POOL_
*/ */
@Slf4j @Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor
public class DynamicThreadPoolConfigService implements DynamicThreadPoolService, ApplicationListener<ApplicationCompleteEvent> { public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolService implements ApplicationListener<ApplicationCompleteEvent> {
private final HttpAgent httpAgent; private final HttpAgent httpAgent;
@ -56,11 +53,6 @@ public class DynamicThreadPoolConfigService implements DynamicThreadPoolService,
private final DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig; private final DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig;
@Override
public void onApplicationEvent(ApplicationCompleteEvent event) {
clientWorker.notifyApplicationComplete();
}
@Override @Override
public ThreadPoolExecutor registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper) { public ThreadPoolExecutor registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper) {
DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter();
@ -88,18 +80,9 @@ public class DynamicThreadPoolConfigService implements DynamicThreadPoolService,
return dynamicThreadPoolExecutor; return dynamicThreadPoolExecutor;
} }
private ThreadPoolExecutor buildDynamicThreadPoolExecutor(DynamicThreadPoolRegisterParameter registerParameter) { @Override
ThreadPoolExecutor dynamicThreadPoolExecutor = ThreadPoolBuilder.builder() public void onApplicationEvent(ApplicationCompleteEvent event) {
.threadPoolId(registerParameter.getThreadPoolId()) clientWorker.notifyApplicationComplete();
.corePoolSize(registerParameter.getCorePoolSize())
.maxPoolNum(registerParameter.getMaximumPoolSize())
.workQueue(QueueTypeEnum.createBlockingQueue(registerParameter.getQueueType(), registerParameter.getCapacity()))
.threadFactory(registerParameter.getThreadPoolId())
.keepAliveTime(registerParameter.getKeepAliveTime())
.rejected(RejectedTypeEnum.createPolicy(registerParameter.getRejectedType()))
.dynamicPool()
.build();
return dynamicThreadPoolExecutor;
} }
private void checkThreadPoolParameter(DynamicThreadPoolRegisterParameter registerParameter) { private void checkThreadPoolParameter(DynamicThreadPoolRegisterParameter registerParameter) {

Loading…
Cancel
Save