Adapt to delayed loading (#886)

* feat:Adapt to delayed loading

* feat:Field Rename
pull/898/head
WuLang 3 years ago committed by GitHub
parent 024f694734
commit d02e989c54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -21,6 +21,8 @@ import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.model.ThreadPoolParameter; import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo; import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import java.util.Objects;
/** /**
* Content util. * Content util.
*/ */

@ -106,12 +106,11 @@ public class DynamicThreadPoolAutoConfiguration {
@Bean @Bean
@SuppressWarnings("all") @SuppressWarnings("all")
public DynamicThreadPoolService dynamicThreadPoolConfigService(HttpAgent httpAgent, public DynamicThreadPoolService dynamicThreadPoolConfigService(HttpAgent httpAgent,
ClientWorker clientWorker,
ServerHealthCheck serverHealthCheck, ServerHealthCheck serverHealthCheck,
ServerNotifyConfigBuilder notifyConfigBuilder, ServerNotifyConfigBuilder notifyConfigBuilder,
Hippo4jBaseSendMessageService hippo4jBaseSendMessageService, Hippo4jBaseSendMessageService hippo4jBaseSendMessageService,
DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig) { DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig) {
return new DynamicThreadPoolConfigService(httpAgent, clientWorker, properties, notifyConfigBuilder, hippo4jBaseSendMessageService, dynamicThreadPoolSubscribeConfig); return new DynamicThreadPoolConfigService(httpAgent, properties, notifyConfigBuilder, hippo4jBaseSendMessageService, dynamicThreadPoolSubscribeConfig);
} }
@Bean @Bean

@ -18,7 +18,6 @@
package cn.hippo4j.springboot.starter.core; package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.common.model.ThreadPoolParameterInfo; import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.ContentUtil; import cn.hippo4j.common.toolkit.ContentUtil;
import cn.hippo4j.common.toolkit.GroupKey; import cn.hippo4j.common.toolkit.GroupKey;
import cn.hippo4j.common.toolkit.IdUtil; import cn.hippo4j.common.toolkit.IdUtil;
@ -72,6 +71,8 @@ public class ClientWorker {
private final CountDownLatch awaitApplicationComplete = new CountDownLatch(1); private final CountDownLatch awaitApplicationComplete = new CountDownLatch(1);
private final CountDownLatch cacheCondition = new CountDownLatch(1);
private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap(16); private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap(16);
@SuppressWarnings("all") @SuppressWarnings("all")
@ -92,9 +93,7 @@ public class ClientWorker {
this.executor.schedule(() -> { this.executor.schedule(() -> {
try { try {
awaitApplicationComplete.await(); awaitApplicationComplete.await();
if (CollectionUtil.isNotEmpty(cacheMap)) { executorService.execute(new LongPollingRunnable(cacheMap.isEmpty(), cacheCondition));
executorService.execute(new LongPollingRunnable());
}
} catch (Throwable ex) { } catch (Throwable ex) {
log.error("Sub check rotate check error.", ex); log.error("Sub check rotate check error.", ex);
} }
@ -103,9 +102,22 @@ public class ClientWorker {
class LongPollingRunnable implements Runnable { class LongPollingRunnable implements Runnable {
private boolean cacheMapInitEmptyFlag;
private final CountDownLatch cacheCondition;
public LongPollingRunnable(boolean cacheMapInitEmptyFlag, CountDownLatch cacheCondition) {
this.cacheMapInitEmptyFlag = cacheMapInitEmptyFlag;
this.cacheCondition = cacheCondition;
}
@Override @Override
@SneakyThrows @SneakyThrows
public void run() { public void run() {
if (cacheMapInitEmptyFlag) {
cacheCondition.await();
cacheMapInitEmptyFlag = false;
}
serverHealthCheck.isHealthStatus(); serverHealthCheck.isHealthStatus();
List<CacheData> cacheDataList = new ArrayList(); List<CacheData> cacheDataList = new ArrayList();
List<String> inInitializingCacheList = new ArrayList(); List<String> inInitializingCacheList = new ArrayList();
@ -227,6 +239,10 @@ public class ClientWorker {
for (Listener listener : listeners) { for (Listener listener : listeners) {
cacheData.addListener(listener); cacheData.addListener(listener);
} }
// Lazy loading
if (awaitApplicationComplete.getCount() == 0L) {
cacheCondition.countDown();
}
} }
public CacheData addCacheDataIfAbsent(String namespace, String itemId, String threadPoolId) { public CacheData addCacheDataIfAbsent(String namespace, String itemId, String threadPoolId) {

@ -17,29 +17,33 @@
package cn.hippo4j.springboot.starter.event; package cn.hippo4j.springboot.starter.event;
import org.springframework.boot.context.event.ApplicationReadyEvent; import cn.hippo4j.springboot.starter.core.ClientWorker;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* Application content post processor. * Application content post processor.
*/ */
public class ApplicationContentPostProcessor implements ApplicationListener<ApplicationReadyEvent> { public class ApplicationContentPostProcessor implements ApplicationListener<ContextRefreshedEvent> {
@Resource @Resource
private ApplicationContext applicationContext; private ApplicationContext applicationContext;
private boolean executeOnlyOnce = true; @Resource
private ClientWorker clientWorker;
private final AtomicBoolean executeOnlyOnce = new AtomicBoolean(false);
@Override @Override
public void onApplicationEvent(ApplicationReadyEvent event) { public void onApplicationEvent(ContextRefreshedEvent event) {
synchronized (ApplicationContentPostProcessor.class) { if (!executeOnlyOnce.compareAndSet(false, true)) {
if (executeOnlyOnce) { return;
applicationContext.publishEvent(new ApplicationCompleteEvent(this));
executeOnlyOnce = false;
}
} }
applicationContext.publishEvent(new ApplicationRefreshedEvent(this));
clientWorker.notifyApplicationComplete();
} }
} }

@ -22,7 +22,7 @@ import org.springframework.context.ApplicationEvent;
/** /**
* Execute after the spring application context is successfully started. * Execute after the spring application context is successfully started.
*/ */
public class ApplicationCompleteEvent extends ApplicationEvent { public class ApplicationRefreshedEvent extends ApplicationEvent {
/** /**
* Create a new {@code ApplicationEvent}. * Create a new {@code ApplicationEvent}.
@ -30,7 +30,7 @@ public class ApplicationCompleteEvent extends ApplicationEvent {
* @param source the object on which the event initially occurred or with * @param source the object on which the event initially occurred or with
* which the event is associated (never {@code null}) * which the event is associated (never {@code null})
*/ */
public ApplicationCompleteEvent(Object source) { public ApplicationRefreshedEvent(Object source) {
super(source); super(source);
} }
} }

@ -18,7 +18,7 @@
package cn.hippo4j.springboot.starter.remote; package cn.hippo4j.springboot.starter.remote;
import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent; import cn.hippo4j.springboot.starter.event.ApplicationRefreshedEvent;
import cn.hippo4j.springboot.starter.core.ShutdownExecuteException; import cn.hippo4j.springboot.starter.core.ShutdownExecuteException;
import cn.hippo4j.common.design.builder.ThreadFactoryBuilder; import cn.hippo4j.common.design.builder.ThreadFactoryBuilder;
import lombok.SneakyThrows; import lombok.SneakyThrows;
@ -38,7 +38,7 @@ import static cn.hippo4j.common.constant.Constants.HEALTH_CHECK_INTERVAL;
* Abstract health check. * Abstract health check.
*/ */
@Slf4j @Slf4j
public abstract class AbstractHealthCheck implements ServerHealthCheck, InitializingBean, ApplicationListener<ApplicationCompleteEvent> { public abstract class AbstractHealthCheck implements ServerHealthCheck, InitializingBean, ApplicationListener<ApplicationRefreshedEvent> {
/** /**
* Health status * Health status
@ -157,11 +157,11 @@ public abstract class AbstractHealthCheck implements ServerHealthCheck, Initiali
clientShutdownHook = true; clientShutdownHook = true;
signalAllBizThread(); signalAllBizThread();
})); }));
healthCheckExecutor.scheduleWithFixedDelay(() -> healthCheck(), 0, HEALTH_CHECK_INTERVAL, TimeUnit.SECONDS); healthCheckExecutor.scheduleWithFixedDelay(this::healthCheck, 0, HEALTH_CHECK_INTERVAL, TimeUnit.SECONDS);
} }
@Override @Override
public void onApplicationEvent(ApplicationCompleteEvent event) { public void onApplicationEvent(ApplicationRefreshedEvent event) {
contextInitComplete = true; contextInitComplete = true;
} }
} }

@ -36,7 +36,7 @@ import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import cn.hippo4j.springboot.starter.core.ClientWorker; import cn.hippo4j.springboot.starter.core.ClientWorker;
import cn.hippo4j.springboot.starter.core.DynamicThreadPoolSubscribeConfig; import cn.hippo4j.springboot.starter.core.DynamicThreadPoolSubscribeConfig;
import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent; import cn.hippo4j.springboot.starter.event.ApplicationRefreshedEvent;
import cn.hippo4j.springboot.starter.notify.ServerNotifyConfigBuilder; import cn.hippo4j.springboot.starter.notify.ServerNotifyConfigBuilder;
import cn.hippo4j.springboot.starter.remote.HttpAgent; import cn.hippo4j.springboot.starter.remote.HttpAgent;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
@ -55,12 +55,10 @@ import static cn.hippo4j.common.constant.Constants.REGISTER_DYNAMIC_THREAD_POOL_
*/ */
@Slf4j @Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor
public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolService implements ApplicationListener<ApplicationCompleteEvent> { public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolService {
private final HttpAgent httpAgent; private final HttpAgent httpAgent;
private final ClientWorker clientWorker;
private final BootstrapProperties properties; private final BootstrapProperties properties;
private final ServerNotifyConfigBuilder notifyConfigBuilder; private final ServerNotifyConfigBuilder notifyConfigBuilder;
@ -77,11 +75,6 @@ public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolSer
return dynamicThreadPoolExecutor; return dynamicThreadPoolExecutor;
} }
@Override
public void onApplicationEvent(ApplicationCompleteEvent event) {
clientWorker.notifyApplicationComplete();
}
private ThreadPoolExecutor registerExecutor(DynamicThreadPoolRegisterWrapper registerWrapper) { private ThreadPoolExecutor registerExecutor(DynamicThreadPoolRegisterWrapper registerWrapper) {
DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter();
checkThreadPoolParameter(registerParameter); checkThreadPoolParameter(registerParameter);

Loading…
Cancel
Save