diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/ContentUtil.java b/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/ContentUtil.java index 13e3151d..129b011a 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/ContentUtil.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/ContentUtil.java @@ -21,12 +21,16 @@ import cn.hippo4j.common.constant.Constants; import cn.hippo4j.common.model.ThreadPoolParameter; import cn.hippo4j.common.model.ThreadPoolParameterInfo; +import java.util.Objects; + /** * Content util. */ public class ContentUtil { public static String getPoolContent(ThreadPoolParameter parameter) { + if (Objects.isNull(parameter)) + return null; ThreadPoolParameterInfo threadPoolParameterInfo = new ThreadPoolParameterInfo(); threadPoolParameterInfo.setTenantId(parameter.getTenantId()) .setItemId(parameter.getItemId()) diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java index 5d39572b..b950b423 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -106,12 +106,11 @@ public class DynamicThreadPoolAutoConfiguration { @Bean @SuppressWarnings("all") public DynamicThreadPoolService dynamicThreadPoolConfigService(HttpAgent httpAgent, - ClientWorker clientWorker, ServerHealthCheck serverHealthCheck, ServerNotifyConfigBuilder notifyConfigBuilder, Hippo4jBaseSendMessageService hippo4jBaseSendMessageService, DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig) { - return new DynamicThreadPoolConfigService(httpAgent, clientWorker, properties, notifyConfigBuilder, hippo4jBaseSendMessageService, dynamicThreadPoolSubscribeConfig); + return new DynamicThreadPoolConfigService(httpAgent, properties, notifyConfigBuilder, hippo4jBaseSendMessageService, dynamicThreadPoolSubscribeConfig); } @Bean diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java index 0e30e6a2..b083a162 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java @@ -72,6 +72,8 @@ public class ClientWorker { private final CountDownLatch awaitApplicationComplete = new CountDownLatch(1); + private final CountDownLatch cacheCondition = new CountDownLatch(1); + private final ConcurrentHashMap cacheMap = new ConcurrentHashMap(16); @SuppressWarnings("all") @@ -92,9 +94,7 @@ public class ClientWorker { this.executor.schedule(() -> { try { awaitApplicationComplete.await(); - if (CollectionUtil.isNotEmpty(cacheMap)) { - executorService.execute(new LongPollingRunnable()); - } + executorService.execute(new LongPollingRunnable(cacheMap.isEmpty(), cacheCondition)); } catch (Throwable ex) { log.error("Sub check rotate check error.", ex); } @@ -103,9 +103,22 @@ public class ClientWorker { class LongPollingRunnable implements Runnable { + private boolean flag; + + private final CountDownLatch cacheCondition; + + public LongPollingRunnable(boolean flag, CountDownLatch cacheCondition) { + this.flag = flag; + this.cacheCondition = cacheCondition; + } + @Override @SneakyThrows public void run() { + if (flag) { + cacheCondition.await(); + flag = false; + } serverHealthCheck.isHealthStatus(); List cacheDataList = new ArrayList(); List inInitializingCacheList = new ArrayList(); @@ -227,6 +240,10 @@ public class ClientWorker { for (Listener listener : listeners) { cacheData.addListener(listener); } + // Lazy loading + if (awaitApplicationComplete.getCount() == 0L) { + cacheCondition.countDown(); + } } public CacheData addCacheDataIfAbsent(String namespace, String itemId, String threadPoolId) { diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/event/ApplicationContentPostProcessor.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/event/ApplicationContentPostProcessor.java index f4e2cc23..3dd1adf2 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/event/ApplicationContentPostProcessor.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/event/ApplicationContentPostProcessor.java @@ -17,29 +17,33 @@ package cn.hippo4j.springboot.starter.event; -import org.springframework.boot.context.event.ApplicationReadyEvent; +import cn.hippo4j.springboot.starter.core.ClientWorker; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextRefreshedEvent; import javax.annotation.Resource; +import java.util.concurrent.atomic.AtomicBoolean; /** * Application content post processor. */ -public class ApplicationContentPostProcessor implements ApplicationListener { +public class ApplicationContentPostProcessor implements ApplicationListener { @Resource private ApplicationContext applicationContext; - private boolean executeOnlyOnce = true; + @Resource + private ClientWorker clientWorker; + + private final AtomicBoolean executeOnlyOnce = new AtomicBoolean(false); @Override - public void onApplicationEvent(ApplicationReadyEvent event) { - synchronized (ApplicationContentPostProcessor.class) { - if (executeOnlyOnce) { - applicationContext.publishEvent(new ApplicationCompleteEvent(this)); - executeOnlyOnce = false; - } + public void onApplicationEvent(ContextRefreshedEvent event) { + if (!executeOnlyOnce.compareAndSet(false, true)) { + return; } + applicationContext.publishEvent(new ApplicationRefreshedEvent(this)); + clientWorker.notifyApplicationComplete(); } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/event/ApplicationCompleteEvent.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/event/ApplicationRefreshedEvent.java similarity index 91% rename from hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/event/ApplicationCompleteEvent.java rename to hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/event/ApplicationRefreshedEvent.java index 9393062c..dbac0303 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/event/ApplicationCompleteEvent.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/event/ApplicationRefreshedEvent.java @@ -22,7 +22,7 @@ import org.springframework.context.ApplicationEvent; /** * Execute after the spring application context is successfully started. */ -public class ApplicationCompleteEvent extends ApplicationEvent { +public class ApplicationRefreshedEvent extends ApplicationEvent { /** * Create a new {@code ApplicationEvent}. @@ -30,7 +30,7 @@ public class ApplicationCompleteEvent extends ApplicationEvent { * @param source the object on which the event initially occurred or with * which the event is associated (never {@code null}) */ - public ApplicationCompleteEvent(Object source) { + public ApplicationRefreshedEvent(Object source) { super(source); } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/AbstractHealthCheck.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/AbstractHealthCheck.java index b79595b5..1fd78118 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/AbstractHealthCheck.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/AbstractHealthCheck.java @@ -18,7 +18,7 @@ package cn.hippo4j.springboot.starter.remote; import cn.hippo4j.common.toolkit.ThreadUtil; -import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent; +import cn.hippo4j.springboot.starter.event.ApplicationRefreshedEvent; import cn.hippo4j.springboot.starter.core.ShutdownExecuteException; import cn.hippo4j.common.design.builder.ThreadFactoryBuilder; import lombok.SneakyThrows; @@ -38,7 +38,7 @@ import static cn.hippo4j.common.constant.Constants.HEALTH_CHECK_INTERVAL; * Abstract health check. */ @Slf4j -public abstract class AbstractHealthCheck implements ServerHealthCheck, InitializingBean, ApplicationListener { +public abstract class AbstractHealthCheck implements ServerHealthCheck, InitializingBean, ApplicationListener { /** * Health status @@ -157,11 +157,11 @@ public abstract class AbstractHealthCheck implements ServerHealthCheck, Initiali clientShutdownHook = true; signalAllBizThread(); })); - healthCheckExecutor.scheduleWithFixedDelay(() -> healthCheck(), 0, HEALTH_CHECK_INTERVAL, TimeUnit.SECONDS); + healthCheckExecutor.scheduleWithFixedDelay(this::healthCheck, 0, HEALTH_CHECK_INTERVAL, TimeUnit.SECONDS); } @Override - public void onApplicationEvent(ApplicationCompleteEvent event) { + public void onApplicationEvent(ApplicationRefreshedEvent event) { contextInitComplete = true; } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolConfigService.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolConfigService.java index d66765e4..96e8d717 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolConfigService.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolConfigService.java @@ -36,7 +36,7 @@ import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.core.ClientWorker; import cn.hippo4j.springboot.starter.core.DynamicThreadPoolSubscribeConfig; -import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent; +import cn.hippo4j.springboot.starter.event.ApplicationRefreshedEvent; import cn.hippo4j.springboot.starter.notify.ServerNotifyConfigBuilder; import cn.hippo4j.springboot.starter.remote.HttpAgent; import lombok.RequiredArgsConstructor; @@ -55,12 +55,10 @@ import static cn.hippo4j.common.constant.Constants.REGISTER_DYNAMIC_THREAD_POOL_ */ @Slf4j @RequiredArgsConstructor -public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolService implements ApplicationListener { +public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolService { private final HttpAgent httpAgent; - private final ClientWorker clientWorker; - private final BootstrapProperties properties; private final ServerNotifyConfigBuilder notifyConfigBuilder; @@ -77,11 +75,6 @@ public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolSer return dynamicThreadPoolExecutor; } - @Override - public void onApplicationEvent(ApplicationCompleteEvent event) { - clientWorker.notifyApplicationComplete(); - } - private ThreadPoolExecutor registerExecutor(DynamicThreadPoolRegisterWrapper registerWrapper) { DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); checkThreadPoolParameter(registerParameter);