Optimize active parameter as empty task code

pull/1130/head
machen 2 years ago
parent d37323d6ad
commit 516999f73d

@ -19,6 +19,7 @@ package cn.hippo4j.springboot.starter.config;
import cn.hippo4j.common.model.InstanceInfo; import cn.hippo4j.common.model.InstanceInfo;
import cn.hippo4j.core.toolkit.inet.InetUtils; import cn.hippo4j.core.toolkit.inet.InetUtils;
import cn.hippo4j.springboot.starter.core.ClientShutdown;
import cn.hippo4j.springboot.starter.core.DiscoveryClient; import cn.hippo4j.springboot.starter.core.DiscoveryClient;
import cn.hippo4j.springboot.starter.provider.InstanceInfoProviderFactory; import cn.hippo4j.springboot.starter.provider.InstanceInfoProviderFactory;
import cn.hippo4j.springboot.starter.remote.HttpAgent; import cn.hippo4j.springboot.starter.remote.HttpAgent;
@ -44,7 +45,14 @@ public class DiscoveryConfiguration {
} }
@Bean @Bean
public DiscoveryClient hippo4JDiscoveryClient(HttpAgent httpAgent, InstanceInfo instanceInfo) { public ClientShutdown hippo4jClientShutdown() {
return new DiscoveryClient(httpAgent, instanceInfo); return new ClientShutdown();
}
@Bean
public DiscoveryClient hippo4JDiscoveryClient(HttpAgent httpAgent,
InstanceInfo instanceInfo,
ClientShutdown hippo4jClientShutdown) {
return new DiscoveryClient(httpAgent, instanceInfo, hippo4jClientShutdown);
} }
} }

@ -114,9 +114,10 @@ public class DynamicThreadPoolAutoConfiguration {
public ClientWorker hippo4jClientWorker(HttpAgent httpAgent, public ClientWorker hippo4jClientWorker(HttpAgent httpAgent,
InetUtils hippo4JInetUtils, InetUtils hippo4JInetUtils,
ServerHealthCheck serverHealthCheck, ServerHealthCheck serverHealthCheck,
DynamicThreadPoolBannerHandler dynamicThreadPoolBannerHandlers) { DynamicThreadPoolBannerHandler dynamicThreadPoolBannerHandlers,
ClientShutdown hippo4jClientShutdown) {
String identify = IdentifyUtil.generate(environment, hippo4JInetUtils); String identify = IdentifyUtil.generate(environment, hippo4JInetUtils);
return new ClientWorker(httpAgent, identify, serverHealthCheck, dynamicThreadPoolBannerHandlers.getVersion()); return new ClientWorker(httpAgent, identify, serverHealthCheck, dynamicThreadPoolBannerHandlers.getVersion(), hippo4jClientShutdown);
} }
@Bean @Bean
@ -247,9 +248,4 @@ public class DynamicThreadPoolAutoConfiguration {
public ThreadPoolPluginRegisterPostProcessor threadPoolPluginRegisterPostProcessor() { public ThreadPoolPluginRegisterPostProcessor threadPoolPluginRegisterPostProcessor() {
return new ThreadPoolPluginRegisterPostProcessor(); return new ThreadPoolPluginRegisterPostProcessor();
} }
@Bean
public ClientShutdown hippo4jClientShutdown() {
return new ClientShutdown();
}
} }

@ -17,29 +17,48 @@
package cn.hippo4j.springboot.starter.core; package cn.hippo4j.springboot.starter.core;
import lombok.Getter;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import lombok.Getter;
/** /**
* Client Shutdown * Called when the application is closed to avoid data exceptions caused by
* long polling on the client side.
*
* @version 1.5.0
* @see <a href="https://github.com/opengoofy/hippo4j/issues/1121" />
*/ */
public class ClientShutdown { public class ClientShutdown {
@Getter @Getter
private volatile boolean prepareClose = false; private volatile boolean prepareClose = false;
private CountDownLatch countDownLatch = new CountDownLatch(1);
private final static Long TIME_OUT_SECOND = 1L; private final static Long TIME_OUT_SECOND = 1L;
private static final int DEFAULT_COUNT = 1;
private final CountDownLatch countDownLatch = new CountDownLatch(DEFAULT_COUNT);
/**
* Called when the application is closed.
*
* @throws InterruptedException
*/
public void prepareDestroy() throws InterruptedException { public void prepareDestroy() throws InterruptedException {
prepareClose = true; prepareClose = true;
countDownLatch.await(TIME_OUT_SECOND, TimeUnit.SECONDS); countDownLatch.await(TIME_OUT_SECOND, TimeUnit.SECONDS);
} }
/**
* Decrements the count of the latch, releasing all waiting threads if
* the count reaches zero.
*
* <p>If the current count is greater than zero then it is decremented.
* If the new count is zero then all waiting threads are re-enabled for
* thread scheduling purposes.
*
* <p>If the current count equals zero then nothing happens.
*/
public void countDown() { public void countDown() {
countDownLatch.countDown(); countDownLatch.countDown();
} }
} }

@ -60,30 +60,30 @@ import static cn.hippo4j.common.constant.Constants.WORD_SEPARATOR;
public class ClientWorker implements DisposableBean { public class ClientWorker implements DisposableBean {
private final long timeout; private final long timeout;
private final HttpAgent agent;
private final String identify; private final String identify;
private final String version; private final String version;
private final HttpAgent agent;
private final ServerHealthCheck serverHealthCheck; private final ServerHealthCheck serverHealthCheck;
private final ScheduledExecutorService executorService; private final ScheduledExecutorService executorService;
private final ClientShutdown hippo4jClientShutdown;
private final CountDownLatch awaitApplicationComplete = new CountDownLatch(1); private final CountDownLatch awaitApplicationComplete = new CountDownLatch(1);
private final CountDownLatch cacheCondition = new CountDownLatch(1); private final CountDownLatch cacheCondition = new CountDownLatch(1);
private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap<>(16); private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap<>(16);
@SuppressWarnings("all") @SuppressWarnings("all")
public ClientWorker(HttpAgent httpAgent, String identify, ServerHealthCheck serverHealthCheck, String version) { public ClientWorker(HttpAgent httpAgent,
String identify,
ServerHealthCheck serverHealthCheck,
String version,
ClientShutdown hippo4jClientShutdown) {
this.agent = httpAgent; this.agent = httpAgent;
this.identify = identify; this.identify = identify;
this.timeout = CONFIG_LONG_POLL_TIMEOUT; this.timeout = CONFIG_LONG_POLL_TIMEOUT;
this.version = version; this.version = version;
this.serverHealthCheck = serverHealthCheck; this.serverHealthCheck = serverHealthCheck;
this.hippo4jClientShutdown = hippo4jClientShutdown;
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, runnable -> { ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, runnable -> {
Thread thread = new Thread(runnable); Thread thread = new Thread(runnable);
thread.setName("client.worker.executor"); thread.setName("client.worker.executor");
@ -122,7 +122,9 @@ public class ClientWorker implements DisposableBean {
@Override @Override
@SneakyThrows @SneakyThrows
public void run() { public void run() {
if (executorService.isShutdown()) { if (executorService.isShutdown() || hippo4jClientShutdown.isPrepareClose()) {
hippo4jClientShutdown.countDown();
log.info("The task of monitoring dynamic thread pool changes has stopped.");
return; return;
} }
if (cacheMapInitEmptyFlag) { if (cacheMapInitEmptyFlag) {

@ -43,20 +43,18 @@ import static cn.hippo4j.common.constant.Constants.BASE_PATH;
public class DiscoveryClient implements DisposableBean { public class DiscoveryClient implements DisposableBean {
private final ScheduledExecutorService scheduler; private final ScheduledExecutorService scheduler;
private final HttpAgent httpAgent; private final HttpAgent httpAgent;
private final InstanceInfo instanceInfo; private final InstanceInfo instanceInfo;
private final ClientShutdown hippo4jClientShutdown;
private volatile long lastSuccessfulHeartbeatTimestamp = -1; private volatile long lastSuccessfulHeartbeatTimestamp = -1;
private static final String PREFIX = "DiscoveryClient_"; private static final String PREFIX = "DiscoveryClient_";
private final String appPathIdentifier; private final String appPathIdentifier;
public DiscoveryClient(HttpAgent httpAgent, InstanceInfo instanceInfo) { public DiscoveryClient(HttpAgent httpAgent, InstanceInfo instanceInfo, ClientShutdown hippo4jClientShutdown) {
this.httpAgent = httpAgent; this.httpAgent = httpAgent;
this.instanceInfo = instanceInfo; this.instanceInfo = instanceInfo;
this.hippo4jClientShutdown = hippo4jClientShutdown;
this.appPathIdentifier = instanceInfo.getAppName().toUpperCase() + "/" + instanceInfo.getInstanceId(); this.appPathIdentifier = instanceInfo.getAppName().toUpperCase() + "/" + instanceInfo.getInstanceId();
this.scheduler = new ScheduledThreadPoolExecutor( this.scheduler = new ScheduledThreadPoolExecutor(
new Integer(1), new Integer(1),
@ -104,7 +102,7 @@ public class DiscoveryClient implements DisposableBean {
.setGroupKey(groupKeyIp); .setGroupKey(groupKeyIp);
clientCloseResult = httpAgent.httpPostByDiscovery(clientCloseUrlPath, clientCloseHookReq); clientCloseResult = httpAgent.httpPostByDiscovery(clientCloseUrlPath, clientCloseHookReq);
if (clientCloseResult.isSuccess()) { if (clientCloseResult.isSuccess()) {
log.info("{}{} -client close hook success.", PREFIX, appPathIdentifier); log.info("{}{} - client close hook success.", PREFIX, appPathIdentifier);
} }
} catch (Throwable ex) { } catch (Throwable ex) {
if (ex instanceof ShutdownExecuteException) { if (ex instanceof ShutdownExecuteException) {
@ -115,9 +113,9 @@ public class DiscoveryClient implements DisposableBean {
} }
private void prepareDestroy() throws InterruptedException { private void prepareDestroy() throws InterruptedException {
this.scheduler.shutdownNow(); scheduler.shutdownNow();
// try to make sure the ClientWorker is closed first // Try to make sure the ClientWorker is closed first.
ApplicationContextHolder.getBean(ClientShutdown.class).prepareDestroy(); hippo4jClientShutdown.prepareDestroy();
} }
public class HeartbeatThread implements Runnable { public class HeartbeatThread implements Runnable {
@ -133,7 +131,7 @@ public class DiscoveryClient implements DisposableBean {
private boolean renew() { private boolean renew() {
Result renewResult; Result renewResult;
try { try {
if (this.scheduler.isShutdown()) { if (scheduler.isShutdown()) {
return false; return false;
} }
InstanceInfo.InstanceRenew instanceRenew = new InstanceInfo.InstanceRenew() InstanceInfo.InstanceRenew instanceRenew = new InstanceInfo.InstanceRenew()

@ -19,12 +19,11 @@ package cn.hippo4j.springboot.starter.remote;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.constant.Constants; import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.design.builder.ThreadFactoryBuilder;
import cn.hippo4j.common.toolkit.StringUtil; import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.common.toolkit.http.HttpUtil; import cn.hippo4j.common.toolkit.http.HttpUtil;
import cn.hippo4j.common.web.base.Result; import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.design.builder.ThreadFactoryBuilder;
import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import cn.hippo4j.springboot.starter.core.ClientShutdown;
import cn.hippo4j.springboot.starter.security.SecurityProxy; import cn.hippo4j.springboot.starter.security.SecurityProxy;
import java.util.HashMap; import java.util.HashMap;
@ -32,12 +31,10 @@ import java.util.Map;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
/** /**
* Server http agent. * Server http agent.
*/ */
@Slf4j
public class ServerHttpAgent implements HttpAgent { public class ServerHttpAgent implements HttpAgent {
private final BootstrapProperties dynamicThreadPoolProperties; private final BootstrapProperties dynamicThreadPoolProperties;
@ -48,8 +45,6 @@ public class ServerHttpAgent implements HttpAgent {
private ServerHealthCheck serverHealthCheck; private ServerHealthCheck serverHealthCheck;
private ClientShutdown clientShutdown;
private ScheduledExecutorService executorService; private ScheduledExecutorService executorService;
private final long securityInfoRefreshIntervalMills = TimeUnit.SECONDS.toMillis(5); private final long securityInfoRefreshIntervalMills = TimeUnit.SECONDS.toMillis(5);
@ -115,9 +110,6 @@ public class ServerHttpAgent implements HttpAgent {
public Result httpPostByConfig(String path, Map<String, String> headers, Map<String, String> paramValues, long readTimeoutMs) { public Result httpPostByConfig(String path, Map<String, String> headers, Map<String, String> paramValues, long readTimeoutMs) {
isHealthStatus(); isHealthStatus();
injectSecurityInfo(paramValues); injectSecurityInfo(paramValues);
if (isPrepareClose()) {
return new Result();
}
return HttpUtil.post(buildUrl(path), headers, paramValues, readTimeoutMs, Result.class); return HttpUtil.post(buildUrl(path), headers, paramValues, readTimeoutMs, Result.class);
} }
@ -137,18 +129,6 @@ public class ServerHttpAgent implements HttpAgent {
serverHealthCheck.isHealthStatus(); serverHealthCheck.isHealthStatus();
} }
private boolean isPrepareClose() {
if (clientShutdown == null) {
clientShutdown = ApplicationContextHolder.getBean(ClientShutdown.class);
}
if (clientShutdown.isPrepareClose()) {
clientShutdown.countDown();
log.info("client prepare shutdown");
return true;
}
return false;
}
private Map injectSecurityInfo(Map<String, String> params) { private Map injectSecurityInfo(Map<String, String> params) {
if (StringUtil.isNotBlank(securityProxy.getAccessToken())) { if (StringUtil.isNotBlank(securityProxy.getAccessToken())) {
params.put(Constants.ACCESS_TOKEN, securityProxy.getAccessToken()); params.put(Constants.ACCESS_TOKEN, securityProxy.getAccessToken());

Loading…
Cancel
Save