重构客户端健康检查.

pull/28/head
chen.ma 3 years ago
parent f6c832f405
commit 4e2f79d3ee

@ -11,7 +11,6 @@ import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.net.URLDecoder; import java.net.URLDecoder;
@ -30,7 +29,7 @@ import static cn.hippo4j.common.constant.Constants.*;
* @date 2021/6/20 18:34 * @date 2021/6/20 18:34
*/ */
@Slf4j @Slf4j
public class ClientWorker implements DisposableBean { public class ClientWorker {
private double currentLongingTaskCount = 0; private double currentLongingTaskCount = 0;
@ -90,12 +89,6 @@ public class ClientWorker implements DisposableBean {
} }
} }
@Override
public void destroy() throws Exception {
Optional.ofNullable(executor).ifPresent((each) -> each.shutdown());
Optional.ofNullable(executorService).ifPresent((each) -> each.shutdown());
}
class LongPollingRunnable implements Runnable { class LongPollingRunnable implements Runnable {
@Override @Override

@ -15,7 +15,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.DisposableBean;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.concurrent.*; import java.util.concurrent.*;
import static cn.hippo4j.common.constant.Constants.GROUP_KEY; import static cn.hippo4j.common.constant.Constants.GROUP_KEY;
@ -102,6 +101,10 @@ public class DiscoveryClient implements DisposableBean {
log.info("{}{} - remove config cache success.", PREFIX, appPathIdentifier); log.info("{}{} - remove config cache success.", PREFIX, appPathIdentifier);
} }
} catch (Throwable ex) { } catch (Throwable ex) {
if (ex instanceof ShutdownExecuteException) {
return;
}
log.error("{}{} - remove config cache fail.", PREFIX, appPathIdentifier, ex); log.error("{}{} - remove config cache fail.", PREFIX, appPathIdentifier, ex);
} }
@ -115,8 +118,6 @@ public class DiscoveryClient implements DisposableBean {
} catch (Throwable ex) { } catch (Throwable ex) {
log.error("{}{} - destroy service fail.", PREFIX, appPathIdentifier, ex); log.error("{}{} - destroy service fail.", PREFIX, appPathIdentifier, ex);
} }
Optional.ofNullable(scheduler).ifPresent((each) -> each.shutdown());
} }
public class HeartbeatThread implements Runnable { public class HeartbeatThread implements Runnable {

@ -0,0 +1,15 @@
package cn.hippo4j.starter.core;
/**
* Shutdown execute exception.
*
* @author chen.ma
* @date 2021/12/9 21:48
*/
public class ShutdownExecuteException extends Exception {
public ShutdownExecuteException() {
super("Execute task when stopped.");
}
}

@ -95,7 +95,10 @@ public class ReportingEventExecutor extends AbstractThreadPoolRuntime implements
* 线, * 线,
*/ */
private void runTimeGatherTask() { private void runTimeGatherTask() {
serverHealthCheck.isHealthStatus(); boolean healthStatus = serverHealthCheck.isHealthStatus();
if (!healthStatus) {
return;
}
Message message = collectMessage(); Message message = collectMessage();
messageCollectVessel.offer(message); messageCollectVessel.offer(message);
} }

@ -1,5 +1,6 @@
package cn.hippo4j.starter.remote; package cn.hippo4j.starter.remote;
import cn.hippo4j.starter.core.ShutdownExecuteException;
import cn.hippo4j.starter.toolkit.thread.ThreadFactoryBuilder; import cn.hippo4j.starter.toolkit.thread.ThreadFactoryBuilder;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -11,6 +12,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import static cn.hippo4j.common.constant.Constants.HEALTH_CHECK_INTERVAL;
/** /**
* Abstract health check. * Abstract health check.
* *
@ -18,13 +21,18 @@ import java.util.concurrent.locks.ReentrantLock;
* @date 2021/12/8 20:19 * @date 2021/12/8 20:19
*/ */
@Slf4j @Slf4j
public abstract class AbstractHealthCheck implements InitializingBean, ServerHealthCheck { public abstract class AbstractHealthCheck implements ServerHealthCheck, InitializingBean {
/** /**
* Health status * Health status
*/ */
private volatile boolean healthStatus = true; private volatile boolean healthStatus = true;
/**
* Client shutdown hook
*/
private volatile boolean clientShutdownHook = false;
/** /**
* Health main lock * Health main lock
*/ */
@ -64,13 +72,12 @@ public abstract class AbstractHealthCheck implements InitializingBean, ServerHea
} else { } else {
healthStatus = false; healthStatus = false;
} }
} }
@Override @Override
@SneakyThrows @SneakyThrows
public boolean isHealthStatus() { public boolean isHealthStatus() {
while (!healthStatus) { while (!healthStatus && !clientShutdownHook) {
healthMainLock.lock(); healthMainLock.lock();
try { try {
healthCondition.await(); healthCondition.await();
@ -79,6 +86,10 @@ public abstract class AbstractHealthCheck implements InitializingBean, ServerHea
} }
} }
if (!healthStatus) {
throw new ShutdownExecuteException();
}
return healthStatus; return healthStatus;
} }
@ -107,7 +118,13 @@ public abstract class AbstractHealthCheck implements InitializingBean, ServerHea
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
healthCheckExecutor.scheduleWithFixedDelay(() -> healthCheck(), 0, 5, TimeUnit.SECONDS); // 添加钩子函数, Client 端停止时, 如果 Server 端是非健康状态, Client 销毁函数会暂停运行
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
clientShutdownHook = true;
signalAllBizThread();
}));
healthCheckExecutor.scheduleWithFixedDelay(() -> healthCheck(), 0, HEALTH_CHECK_INTERVAL, TimeUnit.SECONDS);
} }
} }

Loading…
Cancel
Save