修复客户端启动后,服务端 30 秒内无法搜索到实例. (#42)

pull/84/head
chen.ma 3 years ago
parent cbaa3a9fe7
commit d02187d552

@ -7,6 +7,7 @@ import cn.hippo4j.starter.core.DynamicThreadPoolPostProcessor;
import cn.hippo4j.starter.core.ThreadPoolConfigService; import cn.hippo4j.starter.core.ThreadPoolConfigService;
import cn.hippo4j.starter.core.ThreadPoolOperation; import cn.hippo4j.starter.core.ThreadPoolOperation;
import cn.hippo4j.starter.enable.MarkerConfiguration; import cn.hippo4j.starter.enable.MarkerConfiguration;
import cn.hippo4j.starter.event.ApplicationContentPostProcessor;
import cn.hippo4j.starter.handler.DynamicThreadPoolBannerHandler; import cn.hippo4j.starter.handler.DynamicThreadPoolBannerHandler;
import cn.hippo4j.starter.handler.ThreadPoolRunStateHandler; import cn.hippo4j.starter.handler.ThreadPoolRunStateHandler;
import cn.hippo4j.starter.monitor.ReportingEventExecutor; import cn.hippo4j.starter.monitor.ReportingEventExecutor;
@ -111,6 +112,11 @@ public class DynamicThreadPoolAutoConfiguration {
return new RunTimeInfoCollector(properties); return new RunTimeInfoCollector(properties);
} }
@Bean
public ApplicationContentPostProcessor applicationContentPostProcessor() {
return new ApplicationContentPostProcessor();
}
} }

@ -15,10 +15,7 @@ import org.springframework.util.StringUtils;
import java.net.URLDecoder; import java.net.URLDecoder;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static cn.hippo4j.common.constant.Constants.*; import static cn.hippo4j.common.constant.Constants.*;
@ -31,10 +28,10 @@ import static cn.hippo4j.common.constant.Constants.*;
@Slf4j @Slf4j
public class ClientWorker { public class ClientWorker {
private double currentLongingTaskCount = 0;
private long timeout; private long timeout;
private double currentLongingTaskCount = 0;
private final HttpAgent agent; private final HttpAgent agent;
private final String identification; private final String identification;
@ -45,6 +42,8 @@ public class ClientWorker {
private final ScheduledExecutorService executorService; private final ScheduledExecutorService executorService;
private final CountDownLatch awaitApplicationComplete = new CountDownLatch(1);
private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap(16); private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap(16);
@SuppressWarnings("all") @SuppressWarnings("all")
@ -69,11 +68,15 @@ public class ClientWorker {
this.executor.scheduleWithFixedDelay(() -> { this.executor.scheduleWithFixedDelay(() -> {
try { try {
// 等待 spring 容器启动成功
awaitApplicationComplete.await();
// 检查动态线程池配置是否被更新
checkConfigInfo(); checkConfigInfo();
} catch (Throwable e) { } catch (Throwable e) {
log.error("Sub check rotate check error.", e); log.error("Sub check rotate check error.", e);
} }
}, 1L, 10L, TimeUnit.MILLISECONDS); }, 1L, 1024L, TimeUnit.MILLISECONDS);
} }
public void checkConfigInfo() { public void checkConfigInfo() {
@ -246,13 +249,13 @@ public class ClientWorker {
cacheData = new CacheData(namespace, itemId, tpId); cacheData = new CacheData(namespace, itemId, tpId);
CacheData lastCacheData = cacheMap.putIfAbsent(tpId, cacheData); CacheData lastCacheData = cacheMap.putIfAbsent(tpId, cacheData);
if (lastCacheData == null) { if (lastCacheData == null) {
String serverConfig = null; String serverConfig;
try { try {
serverConfig = getServerConfig(namespace, itemId, tpId, 3000L); serverConfig = getServerConfig(namespace, itemId, tpId, 3000L);
PoolParameterInfo poolInfo = JSONUtil.parseObject(serverConfig, PoolParameterInfo.class); PoolParameterInfo poolInfo = JSONUtil.parseObject(serverConfig, PoolParameterInfo.class);
cacheData.setContent(ContentUtil.getPoolContent(poolInfo)); cacheData.setContent(ContentUtil.getPoolContent(poolInfo));
} catch (Exception ex) { } catch (Exception ex) {
log.error("[Cache Data] Error. Service Unavailable :: {}", ex.getMessage()); log.error("Cache Data Error. Service Unavailable :: {}", ex.getMessage());
} }
int taskId = cacheMap.size() / CONFIG_LONG_POLL_TIMEOUT; int taskId = cacheMap.size() / CONFIG_LONG_POLL_TIMEOUT;
@ -268,4 +271,8 @@ public class ClientWorker {
this.serverHealthCheck.setHealthStatus(isHealthServer); this.serverHealthCheck.setHealthStatus(isHealthServer);
} }
protected void notifyApplicationComplete() {
awaitApplicationComplete.countDown();
}
} }

@ -1,7 +1,9 @@
package cn.hippo4j.starter.core; package cn.hippo4j.starter.core;
import cn.hippo4j.starter.event.ApplicationCompleteEvent;
import cn.hippo4j.starter.remote.HttpAgent; import cn.hippo4j.starter.remote.HttpAgent;
import cn.hippo4j.starter.remote.ServerHealthCheck; import cn.hippo4j.starter.remote.ServerHealthCheck;
import org.springframework.context.ApplicationListener;
import java.util.Arrays; import java.util.Arrays;
@ -11,7 +13,7 @@ import java.util.Arrays;
* @author chen.ma * @author chen.ma
* @date 2021/6/21 21:50 * @date 2021/6/21 21:50
*/ */
public class ThreadPoolConfigService implements ConfigService { public class ThreadPoolConfigService implements ConfigService, ApplicationListener<ApplicationCompleteEvent> {
private final ClientWorker clientWorker; private final ClientWorker clientWorker;
@ -36,4 +38,9 @@ public class ThreadPoolConfigService implements ConfigService {
} }
} }
@Override
public void onApplicationEvent(ApplicationCompleteEvent event) {
clientWorker.notifyApplicationComplete();
}
} }

@ -0,0 +1,23 @@
package cn.hippo4j.starter.event;
import org.springframework.context.ApplicationEvent;
/**
* Execute after the spring application context is successfully started.
*
* @author chen.ma
* @date 2021/12/25 21:19
*/
public class ApplicationCompleteEvent extends ApplicationEvent {
/**
* Create a new {@code ApplicationEvent}.
*
* @param source the object on which the event initially occurred or with
* which the event is associated (never {@code null})
*/
public ApplicationCompleteEvent(Object source) {
super(source);
}
}

@ -0,0 +1,31 @@
package cn.hippo4j.starter.event;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import javax.annotation.Resource;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Application content post processor.
*
* @author chen.ma
* @date 2021/12/25 20:21
*/
public class ApplicationContentPostProcessor implements ApplicationListener<ContextRefreshedEvent> {
@Resource
private ApplicationContext applicationContext;
private AtomicBoolean executeOnlyOnce = new AtomicBoolean(Boolean.TRUE);
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext().getParent() == null && executeOnlyOnce.get()) {
applicationContext.publishEvent(new ApplicationCompleteEvent(this));
executeOnlyOnce.set(Boolean.FALSE);
}
}
}
Loading…
Cancel
Save