Try to solve the active empty problem (#1127)

* fix:active empty

* fix:active empty

* fix:add log and adjust details

* fix:adjust timeout and close executor
pull/1130/head
zjHe 1 year ago committed by GitHub
parent 69275d2888
commit d37323d6ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -43,6 +43,7 @@ import cn.hippo4j.springboot.starter.controller.ThreadPoolAdapterController;
import cn.hippo4j.springboot.starter.controller.WebThreadPoolController;
import cn.hippo4j.springboot.starter.controller.WebThreadPoolRunStateController;
import cn.hippo4j.springboot.starter.core.BaseThreadDetailStateHandler;
import cn.hippo4j.springboot.starter.core.ClientShutdown;
import cn.hippo4j.springboot.starter.core.ClientWorker;
import cn.hippo4j.springboot.starter.core.DynamicThreadPoolSubscribeConfig;
import cn.hippo4j.springboot.starter.core.ServerThreadPoolDynamicRefresh;
@ -247,4 +248,8 @@ public class DynamicThreadPoolAutoConfiguration {
return new ThreadPoolPluginRegisterPostProcessor();
}
@Bean
public ClientShutdown hippo4jClientShutdown() {
return new ClientShutdown();
}
}

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.core;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
/**
* Client Shutdown
*/
public class ClientShutdown {
@Getter
private volatile boolean prepareClose = false;
private CountDownLatch countDownLatch = new CountDownLatch(1);
private final static Long TIME_OUT_SECOND = 1L;
public void prepareDestroy() throws InterruptedException {
prepareClose = true;
countDownLatch.await(TIME_OUT_SECOND, TimeUnit.SECONDS);
}
public void countDown() {
countDownLatch.countDown();
}
}

@ -28,6 +28,7 @@ import cn.hippo4j.springboot.starter.remote.HttpAgent;
import cn.hippo4j.springboot.starter.remote.ServerHealthCheck;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.util.StringUtils;
import java.net.URLDecoder;
@ -56,7 +57,7 @@ import static cn.hippo4j.common.constant.Constants.WORD_SEPARATOR;
* Client worker.
*/
@Slf4j
public class ClientWorker {
public class ClientWorker implements DisposableBean {
private final long timeout;
@ -102,6 +103,11 @@ public class ClientWorker {
}, 1L, TimeUnit.MILLISECONDS);
}
@Override
public void destroy() throws Exception {
executorService.shutdownNow();
}
class LongPollingRunnable implements Runnable {
private boolean cacheMapInitEmptyFlag;
@ -116,6 +122,9 @@ public class ClientWorker {
@Override
@SneakyThrows
public void run() {
if (executorService.isShutdown()) {
return;
}
if (cacheMapInitEmptyFlag) {
cacheCondition.await();
cacheMapInitEmptyFlag = false;

@ -92,8 +92,7 @@ public class DiscoveryClient implements DisposableBean {
String clientCloseUrlPath = Constants.BASE_PATH + "/client/close";
Result clientCloseResult;
try {
// close scheduledExecutor
this.scheduler.shutdown();
this.prepareDestroy();
String groupKeyIp = new StringBuilder()
.append(instanceInfo.getGroupKey())
.append(Constants.GROUP_KEY_DELIMITER)
@ -115,6 +114,12 @@ public class DiscoveryClient implements DisposableBean {
}
}
private void prepareDestroy() throws InterruptedException {
this.scheduler.shutdownNow();
// try to make sure the ClientWorker is closed first
ApplicationContextHolder.getBean(ClientShutdown.class).prepareDestroy();
}
public class HeartbeatThread implements Runnable {
@Override

@ -24,6 +24,7 @@ import cn.hippo4j.common.toolkit.http.HttpUtil;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.design.builder.ThreadFactoryBuilder;
import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import cn.hippo4j.springboot.starter.core.ClientShutdown;
import cn.hippo4j.springboot.starter.security.SecurityProxy;
import java.util.HashMap;
@ -31,10 +32,12 @@ import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
/**
* Server http agent.
*/
@Slf4j
public class ServerHttpAgent implements HttpAgent {
private final BootstrapProperties dynamicThreadPoolProperties;
@ -45,6 +48,8 @@ public class ServerHttpAgent implements HttpAgent {
private ServerHealthCheck serverHealthCheck;
private ClientShutdown clientShutdown;
private ScheduledExecutorService executorService;
private final long securityInfoRefreshIntervalMills = TimeUnit.SECONDS.toMillis(5);
@ -110,6 +115,9 @@ public class ServerHttpAgent implements HttpAgent {
public Result httpPostByConfig(String path, Map<String, String> headers, Map<String, String> paramValues, long readTimeoutMs) {
isHealthStatus();
injectSecurityInfo(paramValues);
if (isPrepareClose()) {
return new Result();
}
return HttpUtil.post(buildUrl(path), headers, paramValues, readTimeoutMs, Result.class);
}
@ -129,6 +137,18 @@ public class ServerHttpAgent implements HttpAgent {
serverHealthCheck.isHealthStatus();
}
private boolean isPrepareClose() {
if (clientShutdown == null) {
clientShutdown = ApplicationContextHolder.getBean(ClientShutdown.class);
}
if (clientShutdown.isPrepareClose()) {
clientShutdown.countDown();
log.info("client prepare shutdown");
return true;
}
return false;
}
private Map injectSecurityInfo(Map<String, String> params) {
if (StringUtil.isNotBlank(securityProxy.getAccessToken())) {
params.put(Constants.ACCESS_TOKEN, securityProxy.getAccessToken());

Loading…
Cancel
Save