From d02e989c5474392db645c24470591e9fc6952f0f Mon Sep 17 00:00:00 2001 From: WuLang <48200100+wulangcode@users.noreply.github.com> Date: Wed, 2 Nov 2022 21:10:21 +0800 Subject: [PATCH 01/36] Adapt to delayed loading (#886) * feat:Adapt to delayed loading * feat:Field Rename --- .../hippo4j/common/toolkit/ContentUtil.java | 2 ++ .../DynamicThreadPoolAutoConfiguration.java | 3 +-- .../springboot/starter/core/ClientWorker.java | 24 +++++++++++++++---- .../ApplicationContentPostProcessor.java | 22 ++++++++++------- ...nt.java => ApplicationRefreshedEvent.java} | 4 ++-- .../starter/remote/AbstractHealthCheck.java | 8 +++---- .../DynamicThreadPoolConfigService.java | 11 ++------- 7 files changed, 44 insertions(+), 30 deletions(-) rename hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/event/{ApplicationCompleteEvent.java => ApplicationRefreshedEvent.java} (91%) diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/ContentUtil.java b/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/ContentUtil.java index 13e3151d..68e19378 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/ContentUtil.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/ContentUtil.java @@ -21,6 +21,8 @@ import cn.hippo4j.common.constant.Constants; import cn.hippo4j.common.model.ThreadPoolParameter; import cn.hippo4j.common.model.ThreadPoolParameterInfo; +import java.util.Objects; + /** * Content util. */ diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java index 5d39572b..b950b423 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -106,12 +106,11 @@ public class DynamicThreadPoolAutoConfiguration { @Bean @SuppressWarnings("all") public DynamicThreadPoolService dynamicThreadPoolConfigService(HttpAgent httpAgent, - ClientWorker clientWorker, ServerHealthCheck serverHealthCheck, ServerNotifyConfigBuilder notifyConfigBuilder, Hippo4jBaseSendMessageService hippo4jBaseSendMessageService, DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig) { - return new DynamicThreadPoolConfigService(httpAgent, clientWorker, properties, notifyConfigBuilder, hippo4jBaseSendMessageService, dynamicThreadPoolSubscribeConfig); + return new DynamicThreadPoolConfigService(httpAgent, properties, notifyConfigBuilder, hippo4jBaseSendMessageService, dynamicThreadPoolSubscribeConfig); } @Bean diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java index 0e30e6a2..60506c44 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java @@ -18,7 +18,6 @@ package cn.hippo4j.springboot.starter.core; import cn.hippo4j.common.model.ThreadPoolParameterInfo; -import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.common.toolkit.ContentUtil; import cn.hippo4j.common.toolkit.GroupKey; import cn.hippo4j.common.toolkit.IdUtil; @@ -72,6 +71,8 @@ public class ClientWorker { private final CountDownLatch awaitApplicationComplete = new CountDownLatch(1); + private final CountDownLatch cacheCondition = new CountDownLatch(1); + private final ConcurrentHashMap cacheMap = new ConcurrentHashMap(16); @SuppressWarnings("all") @@ -92,9 +93,7 @@ public class ClientWorker { this.executor.schedule(() -> { try { awaitApplicationComplete.await(); - if (CollectionUtil.isNotEmpty(cacheMap)) { - executorService.execute(new LongPollingRunnable()); - } + executorService.execute(new LongPollingRunnable(cacheMap.isEmpty(), cacheCondition)); } catch (Throwable ex) { log.error("Sub check rotate check error.", ex); } @@ -103,9 +102,22 @@ public class ClientWorker { class LongPollingRunnable implements Runnable { + private boolean cacheMapInitEmptyFlag; + + private final CountDownLatch cacheCondition; + + public LongPollingRunnable(boolean cacheMapInitEmptyFlag, CountDownLatch cacheCondition) { + this.cacheMapInitEmptyFlag = cacheMapInitEmptyFlag; + this.cacheCondition = cacheCondition; + } + @Override @SneakyThrows public void run() { + if (cacheMapInitEmptyFlag) { + cacheCondition.await(); + cacheMapInitEmptyFlag = false; + } serverHealthCheck.isHealthStatus(); List cacheDataList = new ArrayList(); List inInitializingCacheList = new ArrayList(); @@ -227,6 +239,10 @@ public class ClientWorker { for (Listener listener : listeners) { cacheData.addListener(listener); } + // Lazy loading + if (awaitApplicationComplete.getCount() == 0L) { + cacheCondition.countDown(); + } } public CacheData addCacheDataIfAbsent(String namespace, String itemId, String threadPoolId) { diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/event/ApplicationContentPostProcessor.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/event/ApplicationContentPostProcessor.java index f4e2cc23..3dd1adf2 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/event/ApplicationContentPostProcessor.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/event/ApplicationContentPostProcessor.java @@ -17,29 +17,33 @@ package cn.hippo4j.springboot.starter.event; -import org.springframework.boot.context.event.ApplicationReadyEvent; +import cn.hippo4j.springboot.starter.core.ClientWorker; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextRefreshedEvent; import javax.annotation.Resource; +import java.util.concurrent.atomic.AtomicBoolean; /** * Application content post processor. */ -public class ApplicationContentPostProcessor implements ApplicationListener { +public class ApplicationContentPostProcessor implements ApplicationListener { @Resource private ApplicationContext applicationContext; - private boolean executeOnlyOnce = true; + @Resource + private ClientWorker clientWorker; + + private final AtomicBoolean executeOnlyOnce = new AtomicBoolean(false); @Override - public void onApplicationEvent(ApplicationReadyEvent event) { - synchronized (ApplicationContentPostProcessor.class) { - if (executeOnlyOnce) { - applicationContext.publishEvent(new ApplicationCompleteEvent(this)); - executeOnlyOnce = false; - } + public void onApplicationEvent(ContextRefreshedEvent event) { + if (!executeOnlyOnce.compareAndSet(false, true)) { + return; } + applicationContext.publishEvent(new ApplicationRefreshedEvent(this)); + clientWorker.notifyApplicationComplete(); } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/event/ApplicationCompleteEvent.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/event/ApplicationRefreshedEvent.java similarity index 91% rename from hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/event/ApplicationCompleteEvent.java rename to hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/event/ApplicationRefreshedEvent.java index 9393062c..dbac0303 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/event/ApplicationCompleteEvent.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/event/ApplicationRefreshedEvent.java @@ -22,7 +22,7 @@ import org.springframework.context.ApplicationEvent; /** * Execute after the spring application context is successfully started. */ -public class ApplicationCompleteEvent extends ApplicationEvent { +public class ApplicationRefreshedEvent extends ApplicationEvent { /** * Create a new {@code ApplicationEvent}. @@ -30,7 +30,7 @@ public class ApplicationCompleteEvent extends ApplicationEvent { * @param source the object on which the event initially occurred or with * which the event is associated (never {@code null}) */ - public ApplicationCompleteEvent(Object source) { + public ApplicationRefreshedEvent(Object source) { super(source); } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/AbstractHealthCheck.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/AbstractHealthCheck.java index b79595b5..1fd78118 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/AbstractHealthCheck.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/AbstractHealthCheck.java @@ -18,7 +18,7 @@ package cn.hippo4j.springboot.starter.remote; import cn.hippo4j.common.toolkit.ThreadUtil; -import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent; +import cn.hippo4j.springboot.starter.event.ApplicationRefreshedEvent; import cn.hippo4j.springboot.starter.core.ShutdownExecuteException; import cn.hippo4j.common.design.builder.ThreadFactoryBuilder; import lombok.SneakyThrows; @@ -38,7 +38,7 @@ import static cn.hippo4j.common.constant.Constants.HEALTH_CHECK_INTERVAL; * Abstract health check. */ @Slf4j -public abstract class AbstractHealthCheck implements ServerHealthCheck, InitializingBean, ApplicationListener { +public abstract class AbstractHealthCheck implements ServerHealthCheck, InitializingBean, ApplicationListener { /** * Health status @@ -157,11 +157,11 @@ public abstract class AbstractHealthCheck implements ServerHealthCheck, Initiali clientShutdownHook = true; signalAllBizThread(); })); - healthCheckExecutor.scheduleWithFixedDelay(() -> healthCheck(), 0, HEALTH_CHECK_INTERVAL, TimeUnit.SECONDS); + healthCheckExecutor.scheduleWithFixedDelay(this::healthCheck, 0, HEALTH_CHECK_INTERVAL, TimeUnit.SECONDS); } @Override - public void onApplicationEvent(ApplicationCompleteEvent event) { + public void onApplicationEvent(ApplicationRefreshedEvent event) { contextInitComplete = true; } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolConfigService.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolConfigService.java index d66765e4..96e8d717 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolConfigService.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolConfigService.java @@ -36,7 +36,7 @@ import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.core.ClientWorker; import cn.hippo4j.springboot.starter.core.DynamicThreadPoolSubscribeConfig; -import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent; +import cn.hippo4j.springboot.starter.event.ApplicationRefreshedEvent; import cn.hippo4j.springboot.starter.notify.ServerNotifyConfigBuilder; import cn.hippo4j.springboot.starter.remote.HttpAgent; import lombok.RequiredArgsConstructor; @@ -55,12 +55,10 @@ import static cn.hippo4j.common.constant.Constants.REGISTER_DYNAMIC_THREAD_POOL_ */ @Slf4j @RequiredArgsConstructor -public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolService implements ApplicationListener { +public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolService { private final HttpAgent httpAgent; - private final ClientWorker clientWorker; - private final BootstrapProperties properties; private final ServerNotifyConfigBuilder notifyConfigBuilder; @@ -77,11 +75,6 @@ public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolSer return dynamicThreadPoolExecutor; } - @Override - public void onApplicationEvent(ApplicationCompleteEvent event) { - clientWorker.notifyApplicationComplete(); - } - private ThreadPoolExecutor registerExecutor(DynamicThreadPoolRegisterWrapper registerWrapper) { DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); checkThreadPoolParameter(registerParameter); From 4831cd6ecbf0f3a3582c4c6e7498724646e1816c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E7=A7=B0=20Ma=20Chen?= Date: Thu, 3 Nov 2022 09:55:46 +0800 Subject: [PATCH 02/36] Supplemental code comments (#898) --- .../core/executor/DynamicThreadPool.java | 25 +- .../support/FastThreadPoolExecutor.java | 8 + .../core/executor/support/TaskQueue.java | 22 +- .../executor/support/ThreadPoolBuilder.java | 226 ++++++++++++++++-- .../support/ThreadPoolExecutorTemplate.java | 28 ++- 5 files changed, 276 insertions(+), 33 deletions(-) diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPool.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPool.java index b9a5e58f..eb6632ff 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPool.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPool.java @@ -24,7 +24,30 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** - * Dynamic thread pool. + * An annotation that enhances the functionality of the jdk acoustic thread pool, + * with the following list of enhancements. + * + *