From 97fb1deb745397630053d7d1a8c891b268e89d4b Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Thu, 11 Aug 2022 08:31:48 +0800 Subject: [PATCH 1/6] Migration code package location --- .../starter/config/DynamicThreadPoolAutoConfiguration.java | 1 + .../java/cn/hippo4j/springboot/starter/core/ClientWorker.java | 2 +- .../{core => support}/DynamicThreadPoolConfigService.java | 4 +++- 3 files changed, 5 insertions(+), 2 deletions(-) rename hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/{core => support}/DynamicThreadPoolConfigService.java (96%) diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java index 0213fc88..2cb39f27 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -47,6 +47,7 @@ import cn.hippo4j.springboot.starter.notify.ServerNotifyConfigBuilder; import cn.hippo4j.springboot.starter.remote.HttpAgent; import cn.hippo4j.springboot.starter.remote.HttpScheduledHealthCheck; import cn.hippo4j.springboot.starter.remote.ServerHealthCheck; +import cn.hippo4j.springboot.starter.support.DynamicThreadPoolConfigService; import cn.hippo4j.springboot.starter.support.DynamicThreadPoolPostProcessor; import lombok.AllArgsConstructor; import org.springframework.boot.autoconfigure.ImportAutoConfiguration; diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java index 8e0deebe..2e945da9 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java @@ -252,7 +252,7 @@ public class ClientWorker { this.serverHealthCheck.setHealthStatus(isHealthServer); } - protected void notifyApplicationComplete() { + public void notifyApplicationComplete() { awaitApplicationComplete.countDown(); } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolConfigService.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolConfigService.java similarity index 96% rename from hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolConfigService.java rename to hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolConfigService.java index 5dc69414..22898ab5 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolConfigService.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolConfigService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.springboot.starter.core; +package cn.hippo4j.springboot.starter.support; import cn.hippo4j.common.model.ThreadPoolParameterInfo; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter; @@ -31,6 +31,8 @@ import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.support.service.AbstractDynamicThreadPoolService; import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import cn.hippo4j.springboot.starter.config.BootstrapProperties; +import cn.hippo4j.springboot.starter.core.ClientWorker; +import cn.hippo4j.springboot.starter.core.DynamicThreadPoolSubscribeConfig; import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent; import cn.hippo4j.springboot.starter.remote.HttpAgent; import lombok.RequiredArgsConstructor; From 568fc56fd8037545f0c2ad9afdcf053501ad3e70 Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Thu, 11 Aug 2022 08:37:28 +0800 Subject: [PATCH 2/6] Add condition to update if notification exists --- .../model/register/DynamicThreadPoolRegisterWrapper.java | 7 ++++++- .../java/cn/hippo4j/config/service/biz/NotifyService.java | 3 ++- .../hippo4j/config/service/biz/impl/ConfigServiceImpl.java | 7 ++----- .../hippo4j/config/service/biz/impl/NotifyServiceImpl.java | 5 ++++- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterWrapper.java b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterWrapper.java index 1df939c0..a543223e 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterWrapper.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterWrapper.java @@ -55,7 +55,12 @@ public class DynamicThreadPoolRegisterWrapper { /** * Update if exists */ - private Boolean updateIfExists = Boolean.TRUE; + private Boolean updateIfExists = Boolean.FALSE; + + /** + * Notify update if exists + */ + private Boolean notifyUpdateIfExists = Boolean.FALSE; /** * Dynamic thread-pool server notify parameter diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/NotifyService.java b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/NotifyService.java index a0bfc412..fbad36ef 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/NotifyService.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/NotifyService.java @@ -63,9 +63,10 @@ public interface NotifyService { /** * Save or update. * + * @param notifyUpdateIfExists * @param reqDTO */ - void saveOrUpdate(NotifyReqDTO reqDTO); + void saveOrUpdate(boolean notifyUpdateIfExists, NotifyReqDTO reqDTO); /** * Delete. diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ConfigServiceImpl.java b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ConfigServiceImpl.java index 04034503..ffab5101 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ConfigServiceImpl.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ConfigServiceImpl.java @@ -27,10 +27,7 @@ import cn.hippo4j.common.web.exception.ServiceException; import cn.hippo4j.config.event.LocalDataChangeEvent; import cn.hippo4j.config.mapper.ConfigInfoMapper; import cn.hippo4j.config.mapper.ConfigInstanceMapper; -import cn.hippo4j.config.model.ConfigAllInfo; -import cn.hippo4j.config.model.ConfigInfoBase; -import cn.hippo4j.config.model.ConfigInstanceInfo; -import cn.hippo4j.config.model.LogRecordInfo; +import cn.hippo4j.config.model.*; import cn.hippo4j.config.model.biz.notify.NotifyReqDTO; import cn.hippo4j.config.service.ConfigCacheService; import cn.hippo4j.config.service.ConfigChangePublisher; @@ -172,7 +169,7 @@ public class ConfigServiceImpl implements ConfigService { } else { notifyReqDTO.setConfigType(true); } - notifyService.saveOrUpdate(notifyReqDTO); + notifyService.saveOrUpdate(registerWrapper.getNotifyUpdateIfExists(), notifyReqDTO); }); } } diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/NotifyServiceImpl.java b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/NotifyServiceImpl.java index 35c886ee..0a597c5d 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/NotifyServiceImpl.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/NotifyServiceImpl.java @@ -120,11 +120,14 @@ public class NotifyServiceImpl implements NotifyService { } @Override - public void saveOrUpdate(NotifyReqDTO reqDTO) { + public void saveOrUpdate(boolean notifyUpdateIfExists, NotifyReqDTO reqDTO) { try { existNotify(reqDTO.getType(), reqDTO); save(reqDTO); } catch (Exception ignored) { + if (!notifyUpdateIfExists) { + return; + } LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(NotifyInfo.class) .eq(NotifyInfo::getTenantId, reqDTO.getTenantId()) .eq(NotifyInfo::getItemId, reqDTO.getItemId()) From 524697953bdab90e655715abc0d2c64c46b8b4e1 Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Thu, 11 Aug 2022 08:54:05 +0800 Subject: [PATCH 3/6] hippo4j alarm notification initialization --- .../config/DynamicThreadPoolAutoConfiguration.java | 5 ++++- .../starter/notify/ServerNotifyConfigBuilder.java | 8 ++++++-- .../support/DynamicThreadPoolConfigService.java | 12 ++++++++++++ 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java index 2cb39f27..c4caf35c 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -33,6 +33,7 @@ import cn.hippo4j.core.toolkit.inet.InetUtils; import cn.hippo4j.message.api.NotifyConfigBuilder; import cn.hippo4j.message.config.MessageConfiguration; import cn.hippo4j.message.service.AlarmControlHandler; +import cn.hippo4j.message.service.Hippo4jBaseSendMessageService; import cn.hippo4j.message.service.Hippo4jSendMessageService; import cn.hippo4j.springboot.starter.controller.ThreadPoolAdapterController; import cn.hippo4j.springboot.starter.controller.WebThreadPoolController; @@ -101,8 +102,10 @@ public class DynamicThreadPoolAutoConfiguration { public DynamicThreadPoolService dynamicThreadPoolConfigService(HttpAgent httpAgent, ClientWorker clientWorker, ServerHealthCheck serverHealthCheck, + ServerNotifyConfigBuilder notifyConfigBuilder, + Hippo4jBaseSendMessageService hippo4jBaseSendMessageService, DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig) { - return new DynamicThreadPoolConfigService(httpAgent, clientWorker, properties, dynamicThreadPoolSubscribeConfig); + return new DynamicThreadPoolConfigService(httpAgent, clientWorker, properties, notifyConfigBuilder, hippo4jBaseSendMessageService, dynamicThreadPoolSubscribeConfig); } @Bean diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/notify/ServerNotifyConfigBuilder.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/notify/ServerNotifyConfigBuilder.java index 9b51cc2d..4065b27c 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/notify/ServerNotifyConfigBuilder.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/notify/ServerNotifyConfigBuilder.java @@ -55,12 +55,16 @@ public class ServerNotifyConfigBuilder implements NotifyConfigBuilder { @Override public Map> buildNotify() { - Map> resultMap = Maps.newHashMap(); List threadPoolIds = GlobalThreadPoolManage.listThreadPoolId(); if (CollUtil.isEmpty(threadPoolIds)) { log.warn("The client does not have a dynamic thread pool instance configured."); - return resultMap; + return Maps.newHashMap(); } + return getAndInitNotify(threadPoolIds); + } + + public Map> getAndInitNotify(List threadPoolIds) { + Map> resultMap = Maps.newHashMap(); List groupKeys = Lists.newArrayList(); threadPoolIds.forEach(each -> { String groupKey = GroupKey.getKeyTenant(each, properties.getItemId(), properties.getNamespace()); diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolConfigService.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolConfigService.java index 22898ab5..64e86112 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolConfigService.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolConfigService.java @@ -29,16 +29,22 @@ import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.support.service.AbstractDynamicThreadPoolService; +import cn.hippo4j.message.dto.NotifyConfigDTO; +import cn.hippo4j.message.service.Hippo4jBaseSendMessageService; import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.core.ClientWorker; import cn.hippo4j.springboot.starter.core.DynamicThreadPoolSubscribeConfig; import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent; +import cn.hippo4j.springboot.starter.notify.ServerNotifyConfigBuilder; import cn.hippo4j.springboot.starter.remote.HttpAgent; +import com.google.common.collect.Lists; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationListener; +import java.util.List; +import java.util.Map; import java.util.concurrent.ThreadPoolExecutor; import static cn.hippo4j.common.constant.Constants.REGISTER_DYNAMIC_THREAD_POOL_PATH; @@ -56,6 +62,10 @@ public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolSer private final BootstrapProperties properties; + private final ServerNotifyConfigBuilder notifyConfigBuilder; + + private final Hippo4jBaseSendMessageService hippo4jBaseSendMessageService; + private final DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig; @Override @@ -106,6 +116,8 @@ public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolSer registerParameter.getActiveAlarm(), registerParameter.getCapacityAlarm()); GlobalNotifyAlarmManage.put(registerParameter.getThreadPoolId(), threadPoolNotifyAlarm); + Map> builderNotify = notifyConfigBuilder.getAndInitNotify(Lists.newArrayList(registerParameter.getThreadPoolId())); + hippo4jBaseSendMessageService.putPlatform(builderNotify); } private void checkThreadPoolParameter(DynamicThreadPoolRegisterParameter registerParameter) { From e8f5c719d811f2b47a6f0ea9413679249019ff68 Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Thu, 11 Aug 2022 21:03:03 +0800 Subject: [PATCH 4/6] Optimize the dynamic registration thread pool --- .../DynamicThreadPoolRegisterParameter.java | 6 ++- .../AbstractDynamicThreadPoolService.java | 4 +- .../RegisterDynamicThreadPoolTest.java | 40 ++++++++++--------- .../core/inittest/RunStateHandlerTest.java | 7 +++- 4 files changed, 34 insertions(+), 23 deletions(-) diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java index 5cbfb3cf..812ba82f 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java @@ -76,7 +76,7 @@ public class DynamicThreadPoolRegisterParameter { /** * Is alarm */ - private Integer isAlarm; + private Boolean isAlarm; /** * Capacity alarm @@ -104,6 +104,10 @@ public class DynamicThreadPoolRegisterParameter { */ private Long executeTimeOut; + public Integer getIsAlarm() { + return this.isAlarm ? 1 : 0; + } + public Integer getAllowCoreThreadTimeOut() { return this.allowCoreThreadTimeOut ? 1 : 0; } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/service/AbstractDynamicThreadPoolService.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/service/AbstractDynamicThreadPoolService.java index be89d576..5f54d5a9 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/service/AbstractDynamicThreadPoolService.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/service/AbstractDynamicThreadPoolService.java @@ -23,6 +23,7 @@ import cn.hippo4j.core.executor.support.RejectedTypeEnum; import cn.hippo4j.core.executor.support.ThreadPoolBuilder; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Abstract dynamic thread-pool service. @@ -42,7 +43,8 @@ public abstract class AbstractDynamicThreadPoolService implements DynamicThreadP .maxPoolNum(registerParameter.getMaximumPoolSize()) .workQueue(QueueTypeEnum.createBlockingQueue(registerParameter.getQueueType(), registerParameter.getCapacity())) .threadFactory(registerParameter.getThreadNamePrefix()) - .keepAliveTime(registerParameter.getKeepAliveTime()) + .keepAliveTime(registerParameter.getKeepAliveTime(), TimeUnit.SECONDS) + .executeTimeOut(registerParameter.getExecuteTimeOut()) .rejected(RejectedTypeEnum.createPolicy(registerParameter.getRejectedType())) .dynamicPool() .build(); diff --git a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RegisterDynamicThreadPoolTest.java b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RegisterDynamicThreadPoolTest.java index d66be339..c541a6f2 100644 --- a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RegisterDynamicThreadPoolTest.java +++ b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RegisterDynamicThreadPoolTest.java @@ -23,36 +23,37 @@ import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterCoreNoti import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterServerNotifyParameter; import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; +import cn.hippo4j.core.executor.support.QueueTypeEnum; +import cn.hippo4j.core.executor.support.RejectedTypeEnum; import cn.hippo4j.message.enums.NotifyPlatformEnum; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; import java.util.concurrent.ThreadPoolExecutor; /** * Register dynamic thread-pool test. */ @Slf4j -@Component public class RegisterDynamicThreadPoolTest { - @PostConstruct - public void registerDynamicThreadPool() { - String threadPoolId = "register-dynamic-thread-pool"; - DynamicThreadPoolRegisterParameter parameterInfo = new DynamicThreadPoolRegisterParameter(); - parameterInfo.setThreadPoolId(threadPoolId); - parameterInfo.setThreadNamePrefix(threadPoolId); - parameterInfo.setCorePoolSize(3); - parameterInfo.setMaximumPoolSize(14); - parameterInfo.setQueueType(9); - parameterInfo.setCapacity(110); - parameterInfo.setKeepAliveTime(110L); - parameterInfo.setRejectedType(2); - parameterInfo.setIsAlarm(0); - parameterInfo.setCapacityAlarm(90); - parameterInfo.setActiveAlarm(90); - parameterInfo.setAllowCoreThreadTimeOut(Boolean.TRUE); + public static ThreadPoolExecutor registerDynamicThreadPool(String threadPoolId) { + DynamicThreadPoolRegisterParameter parameterInfo = DynamicThreadPoolRegisterParameter.builder() + .corePoolSize(3) + .maximumPoolSize(10) + .queueType(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.type) + .capacity(110) + // TimeUnit.SECONDS + .keepAliveTime(100L) + // TimeUnit.MILLISECONDS + .executeTimeOut(800L) + .rejectedType(RejectedTypeEnum.ABORT_POLICY.type) + .isAlarm(true) + .capacityAlarm(80) + .activeAlarm(80) + .threadPoolId(threadPoolId) + .threadNamePrefix(threadPoolId) + .allowCoreThreadTimeOut(true) + .build(); // Core mode and server mode, you can choose one of them. DynamicThreadPoolRegisterCoreNotifyParameter coreNotifyParameter = DynamicThreadPoolRegisterCoreNotifyParameter.builder() .activeAlarm(80) @@ -74,5 +75,6 @@ public class RegisterDynamicThreadPoolTest { .build(); ThreadPoolExecutor dynamicThreadPool = GlobalThreadPoolManage.dynamicRegister(registerWrapper); log.info("Dynamic registration thread pool parameter details: {}", JSONUtil.toJSONString(dynamicThreadPool)); + return dynamicThreadPool; } } diff --git a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RunStateHandlerTest.java b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RunStateHandlerTest.java index b076da90..c4791508 100644 --- a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RunStateHandlerTest.java +++ b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RunStateHandlerTest.java @@ -43,8 +43,8 @@ public class RunStateHandlerTest { private ThreadPoolExecutor messageProduceDynamicThreadPool; private final ThreadPoolExecutor runStateHandlerTestExecutor = new ThreadPoolExecutor( - 2, - 2, + 3, + 3, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), @@ -63,6 +63,9 @@ public class RunStateHandlerTest { // Start the dynamic thread pool to simulate running tasks runTask(messageConsumeTtlDynamicThreadPool); runTask(messageProduceDynamicThreadPool); + // Dynamically register thread pool + ThreadPoolExecutor registerDynamicThreadPool = RegisterDynamicThreadPoolTest.registerDynamicThreadPool("register-dynamic-thread-pool"); + runTask(registerDynamicThreadPool); } private void runTask(Executor executor) { From 95f1543d702217e33faed5c770c7a31556de3781 Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Thu, 11 Aug 2022 22:06:07 +0800 Subject: [PATCH 5/6] Core code refactoring --- .../core/executor/support/QueueTypeEnum.java | 1 - .../executor/support/RejectedPolicies.java | 65 ------------------- .../executor/support/RejectedTypeEnum.java | 4 +- .../support/RunsOldestTaskPolicy.java | 44 +++++++++++++ .../executor/support/SyncPutQueuePolicy.java | 42 ++++++++++++ 5 files changed, 88 insertions(+), 68 deletions(-) delete mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedPolicies.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RunsOldestTaskPolicy.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/SyncPutQueuePolicy.java diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/QueueTypeEnum.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/QueueTypeEnum.java index f605d6ea..283d4618 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/QueueTypeEnum.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/QueueTypeEnum.java @@ -105,7 +105,6 @@ public enum QueueTypeEnum { if (capacity == null || capacity <= 0) { temCapacity = 1024; } - return new LinkedBlockingQueue(temCapacity); })); return blockingQueue; diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedPolicies.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedPolicies.java deleted file mode 100644 index 931d06da..00000000 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedPolicies.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.hippo4j.core.executor.support; - -import lombok.extern.slf4j.Slf4j; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadPoolExecutor; - -/** - * Rejected policies. - */ -@Slf4j -public class RejectedPolicies { - - public static class RunsOldestTaskPolicy implements RejectedExecutionHandler { - - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - if (executor.isShutdown()) { - return; - } - BlockingQueue workQueue = executor.getQueue(); - Runnable firstWork = workQueue.poll(); - boolean newTaskAdd = workQueue.offer(r); - if (firstWork != null) { - firstWork.run(); - } - if (!newTaskAdd) { - executor.execute(r); - } - } - } - - public static class SyncPutQueuePolicy implements RejectedExecutionHandler { - - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - if (executor.isShutdown()) { - return; - } - try { - executor.getQueue().put(r); - } catch (InterruptedException e) { - log.error("Adding Queue task to thread pool failed.", e); - } - } - } -} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedTypeEnum.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedTypeEnum.java index ac8e8e43..5dd8d61c 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedTypeEnum.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedTypeEnum.java @@ -40,9 +40,9 @@ public enum RejectedTypeEnum { DISCARD_OLDEST_POLICY(4, "DiscardOldestPolicy", new ThreadPoolExecutor.DiscardOldestPolicy()), - RUNS_OLDEST_TASK_POLICY(5, "RunsOldestTaskPolicy", new RejectedPolicies.RunsOldestTaskPolicy()), + RUNS_OLDEST_TASK_POLICY(5, "RunsOldestTaskPolicy", new RunsOldestTaskPolicy()), - SYNC_PUT_QUEUE_POLICY(6, "SyncPutQueuePolicy", new RejectedPolicies.SyncPutQueuePolicy()); + SYNC_PUT_QUEUE_POLICY(6, "SyncPutQueuePolicy", new SyncPutQueuePolicy()); public Integer type; diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RunsOldestTaskPolicy.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RunsOldestTaskPolicy.java new file mode 100644 index 00000000..0e115865 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RunsOldestTaskPolicy.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.executor.support; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Run the oldest task policy. + */ +public class RunsOldestTaskPolicy implements RejectedExecutionHandler { + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + if (executor.isShutdown()) { + return; + } + BlockingQueue workQueue = executor.getQueue(); + Runnable firstWork = workQueue.poll(); + boolean newTaskAdd = workQueue.offer(r); + if (firstWork != null) { + firstWork.run(); + } + if (!newTaskAdd) { + executor.execute(r); + } + } +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/SyncPutQueuePolicy.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/SyncPutQueuePolicy.java new file mode 100644 index 00000000..ef803bee --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/SyncPutQueuePolicy.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.executor.support; + +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Synchronous put queue policy. + */ +@Slf4j +public class SyncPutQueuePolicy implements RejectedExecutionHandler { + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + if (executor.isShutdown()) { + return; + } + try { + executor.getQueue().put(r); + } catch (InterruptedException e) { + log.error("Adding Queue task to thread pool failed.", e); + } + } +} From 8f4dcda73eb7630f6084592ad7e12db1f4d16d3e Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Thu, 11 Aug 2022 22:09:42 +0800 Subject: [PATCH 6/6] Remove extra spaces --- .../core/executor/ThreadPoolNotifyAlarmHandler.java | 10 ---------- .../cn/hippo4j/core/executor/support/TaskQueue.java | 2 -- .../core/inittest/RegisterDynamicThreadPoolTest.java | 2 +- 3 files changed, 1 insertion(+), 13 deletions(-) diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java index 20d9086e..de06d187 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java @@ -220,17 +220,11 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner AlarmNotifyRequest request = new AlarmNotifyRequest(); String appName = StrUtil.isBlank(itemId) ? applicationName : itemId; request.setAppName(appName); - // 核心线程数 int corePoolSize = threadPoolExecutor.getCorePoolSize(); - // 最大线程数 int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize(); - // 线程池当前线程数 (有锁) int poolSize = threadPoolExecutor.getPoolSize(); - // 活跃线程数 (有锁) int activeCount = threadPoolExecutor.getActiveCount(); - // 同时进入池中的最大线程数 (有锁) int largestPoolSize = threadPoolExecutor.getLargestPoolSize(); - // 线程池中执行任务总数量 (有锁) long completedTaskCount = threadPoolExecutor.getCompletedTaskCount(); request.setActive(active.toUpperCase()); request.setIdentify(IdentifyUtil.getIdentify()); @@ -241,13 +235,9 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner request.setLargestPoolSize(largestPoolSize); request.setCompletedTaskCount(completedTaskCount); BlockingQueue queue = threadPoolExecutor.getQueue(); - // 队列元素个数 int queueSize = queue.size(); - // 队列类型 String queueType = queue.getClass().getSimpleName(); - // 队列剩余容量 int remainingCapacity = queue.remainingCapacity(); - // 队列容量 int queueCapacity = queueSize + remainingCapacity; request.setQueueName(queueType); request.setCapacity(queueCapacity); diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/TaskQueue.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/TaskQueue.java index 479387c4..3d16c439 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/TaskQueue.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/TaskQueue.java @@ -45,13 +45,11 @@ public class TaskQueue extends LinkedBlockingQueue if (executor.getSubmittedTaskCount() < currentPoolThreadSize) { return super.offer(runnable); } - // The current number of threads in the thread pool is less than the maximum number of threads, and returns false. // According to the thread pool source code, non-core threads will be created. if (currentPoolThreadSize < executor.getMaximumPoolSize()) { return false; } - // If the current thread pool number is greater than the maximum number of threads, the task is added to the blocking queue. return super.offer(runnable); } diff --git a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RegisterDynamicThreadPoolTest.java b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RegisterDynamicThreadPoolTest.java index c541a6f2..54105fbc 100644 --- a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RegisterDynamicThreadPoolTest.java +++ b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RegisterDynamicThreadPoolTest.java @@ -42,7 +42,7 @@ public class RegisterDynamicThreadPoolTest { .maximumPoolSize(10) .queueType(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.type) .capacity(110) - // TimeUnit.SECONDS + // TimeUnit.SECONDS .keepAliveTime(100L) // TimeUnit.MILLISECONDS .executeTimeOut(800L)