diff --git a/hippo4j-adapter/hippo4j-adapter-alibaba-dubbo/src/main/java/cn/hippo4j/adapter/alibaba/dubbo/AlibabaDubboThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-alibaba-dubbo/src/main/java/cn/hippo4j/adapter/alibaba/dubbo/AlibabaDubboThreadPoolAdapter.java index 2eb12e3d..2209089e 100644 --- a/hippo4j-adapter/hippo4j-adapter-alibaba-dubbo/src/main/java/cn/hippo4j/adapter/alibaba/dubbo/AlibabaDubboThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-alibaba-dubbo/src/main/java/cn/hippo4j/adapter/alibaba/dubbo/AlibabaDubboThreadPoolAdapter.java @@ -42,7 +42,7 @@ import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMI @Slf4j public class AlibabaDubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener { - private final Map DUBBO_PROTOCOL_EXECUTOR = new HashMap<>(); + private final Map dubboProtocolExecutor = new HashMap<>(); @Override public String mark() { @@ -52,7 +52,7 @@ public class AlibabaDubboThreadPoolAdapter implements ThreadPoolAdapter, Applica @Override public ThreadPoolAdapterState getThreadPoolState(String identify) { ThreadPoolAdapterState threadPoolAdapterState = new ThreadPoolAdapterState(); - ThreadPoolExecutor executor = DUBBO_PROTOCOL_EXECUTOR.get(identify); + ThreadPoolExecutor executor = dubboProtocolExecutor.get(identify); if (executor == null) { log.warn("[{}] Alibaba Dubbo consuming thread pool not found.", identify); return threadPoolAdapterState; @@ -66,14 +66,14 @@ public class AlibabaDubboThreadPoolAdapter implements ThreadPoolAdapter, Applica @Override public List getThreadPoolStates() { List threadPoolAdapterStates = new ArrayList<>(); - DUBBO_PROTOCOL_EXECUTOR.forEach((key, val) -> threadPoolAdapterStates.add(getThreadPoolState(String.valueOf(key)))); + dubboProtocolExecutor.forEach((key, val) -> threadPoolAdapterStates.add(getThreadPoolState(String.valueOf(key)))); return threadPoolAdapterStates; } @Override public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey(); - ThreadPoolExecutor executor = DUBBO_PROTOCOL_EXECUTOR.get(threadPoolAdapterParameter.getThreadPoolKey()); + ThreadPoolExecutor executor = dubboProtocolExecutor.get(threadPoolAdapterParameter.getThreadPoolKey()); if (executor == null) { log.warn("[{}] Alibaba Dubbo consuming thread pool not found.", threadPoolKey); return false; @@ -94,7 +94,7 @@ public class AlibabaDubboThreadPoolAdapter implements ThreadPoolAdapter, Applica try { DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); Map executors = dataStore.get(poolKey); - executors.forEach((key, value) -> DUBBO_PROTOCOL_EXECUTOR.put(key, (ThreadPoolExecutor) value)); + executors.forEach((key, value) -> dubboProtocolExecutor.put(key, (ThreadPoolExecutor) value)); } catch (Exception ex) { log.error("Failed to get Alibaba Dubbo protocol thread pool", ex); } diff --git a/hippo4j-adapter/hippo4j-adapter-dubbo/src/main/java/cn/hippo4j/adapter/dubbo/DubboThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-dubbo/src/main/java/cn/hippo4j/adapter/dubbo/DubboThreadPoolAdapter.java index a3257f19..ffc61831 100644 --- a/hippo4j-adapter/hippo4j-adapter-dubbo/src/main/java/cn/hippo4j/adapter/dubbo/DubboThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-dubbo/src/main/java/cn/hippo4j/adapter/dubbo/DubboThreadPoolAdapter.java @@ -46,7 +46,7 @@ import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMI @Slf4j public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener { - private final Map DUBBO_PROTOCOL_EXECUTOR = new HashMap<>(); + private final Map dubboProtocolExecutor = new HashMap<>(); @Override public String mark() { @@ -56,7 +56,7 @@ public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationLis @Override public ThreadPoolAdapterState getThreadPoolState(String identify) { ThreadPoolAdapterState threadPoolAdapterState = new ThreadPoolAdapterState(); - ThreadPoolExecutor executor = DUBBO_PROTOCOL_EXECUTOR.get(identify); + ThreadPoolExecutor executor = dubboProtocolExecutor.get(identify); if (executor == null) { log.warn("[{}] Dubbo consuming thread pool not found.", identify); return threadPoolAdapterState; @@ -70,14 +70,14 @@ public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationLis @Override public List getThreadPoolStates() { List threadPoolAdapterStates = new ArrayList<>(); - DUBBO_PROTOCOL_EXECUTOR.forEach((key, val) -> threadPoolAdapterStates.add(getThreadPoolState(String.valueOf(key)))); + dubboProtocolExecutor.forEach((key, val) -> threadPoolAdapterStates.add(getThreadPoolState(String.valueOf(key)))); return threadPoolAdapterStates; } @Override public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey(); - ThreadPoolExecutor executor = DUBBO_PROTOCOL_EXECUTOR.get(threadPoolAdapterParameter.getThreadPoolKey()); + ThreadPoolExecutor executor = dubboProtocolExecutor.get(threadPoolAdapterParameter.getThreadPoolKey()); if (executor == null) { log.warn("[{}] Dubbo consuming thread pool not found.", threadPoolKey); return false; @@ -105,14 +105,14 @@ public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationLis if (isLegacyVersion) { DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); Map executors = dataStore.get(poolKey); - executors.forEach((key, value) -> DUBBO_PROTOCOL_EXECUTOR.put(key, (ThreadPoolExecutor) value)); + executors.forEach((key, value) -> dubboProtocolExecutor.put(key, (ThreadPoolExecutor) value)); return; } ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension(); ConcurrentMap> data = (ConcurrentMap>) ReflectUtil.getFieldValue(executorRepository, "data"); ConcurrentMap executorServiceMap = data.get(poolKey); - executorServiceMap.forEach((key, value) -> DUBBO_PROTOCOL_EXECUTOR.put(String.valueOf(key), (ThreadPoolExecutor) value)); + executorServiceMap.forEach((key, value) -> dubboProtocolExecutor.put(String.valueOf(key), (ThreadPoolExecutor) value)); } catch (Exception ex) { log.error("Failed to get Dubbo {} protocol thread pool", Version.getVersion(), ex); } diff --git a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/AbstractHystrixThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/AbstractHystrixThreadPoolAdapter.java index 441a1efb..36771408 100644 --- a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/AbstractHystrixThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/AbstractHystrixThreadPoolAdapter.java @@ -113,6 +113,9 @@ public abstract class AbstractHystrixThreadPoolAdapter implements ThreadPoolAdap scheduler.schedule(hystrixThreadPoolRefreshTask, taskIntervalSeconds, TimeUnit.SECONDS); } + /** + * hystrix thread-pool refresh task + */ class HystrixThreadPoolRefreshTask implements Runnable { private final ScheduledExecutorService scheduler; diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java b/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java index 2b651ffc..80e7951d 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java @@ -102,6 +102,22 @@ public class Constants { public static final int HEALTH_CHECK_INTERVAL = 5; + public static final int MAX_CHECK_FAILURE_COUNT = 4; + + public static final int INITIAL_CAPACITY = 3; + + public static final int DATA_GROUP_TENANT_SIZE = 3; + + public static final int ACTIVE_ALARM = 80; + + public static final int CAPACITY_ALARM = 80; + + public static final long EXECUTE_TIME_OUT = 10000L; + + public static final int SECONDS_IN_MILLISECONDS = 1000; + + public static final long FAILURE_SLEEP_INTERVAL = 25000L; + public static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors(); public static final String DEFAULT_GROUP = "default group"; @@ -110,7 +126,7 @@ public class Constants { public static final String EXECUTE_TIMEOUT_TRACE = "executeTimeoutTrace"; - public static final int HTTP_EXECUTE_TIMEOUT = 5000; + public static final long HTTP_EXECUTE_TIMEOUT = 5000L; public static final String CLIENT_VERSION = "Client-Version"; diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/BaseThreadDetailStateHandler.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/BaseThreadDetailStateHandler.java index 2e3f1051..72772f8b 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/BaseThreadDetailStateHandler.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/BaseThreadDetailStateHandler.java @@ -38,9 +38,9 @@ import java.util.concurrent.ThreadPoolExecutor; @Slf4j public class BaseThreadDetailStateHandler implements ThreadDetailState { - private final String WORKERS = "workers"; + private final String workersName = "workers"; - private final String THREAD = "thread"; + private final String threadName = "thread"; @Override public List getThreadDetailStateInfo(String threadPoolId) { @@ -53,14 +53,14 @@ public class BaseThreadDetailStateHandler implements ThreadDetailState { public List getThreadDetailStateInfo(ThreadPoolExecutor threadPoolExecutor) { List resultThreadStates = new ArrayList(); try { - HashSet workers = (HashSet) ReflectUtil.getFieldValue(threadPoolExecutor, WORKERS); + HashSet workers = (HashSet) ReflectUtil.getFieldValue(threadPoolExecutor, workersName); if (CollectionUtil.isEmpty(workers)) { return resultThreadStates; } for (Object worker : workers) { Thread thread; try { - thread = (Thread) ReflectUtil.getFieldValue(worker, THREAD); + thread = (Thread) ReflectUtil.getFieldValue(worker, threadName); if (thread == null) { log.warn("Reflection get worker thread is null. Worker: {}", worker); continue; diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/CacheData.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/CacheData.java index acd88ef2..19813c49 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/CacheData.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/CacheData.java @@ -35,15 +35,18 @@ import java.util.concurrent.CopyOnWriteArrayList; public class CacheData { @Getter - public volatile String md5; + private volatile String md5; - public volatile String content; + private volatile String content; - public final String tenantId; + @Getter + private final String tenantId; - public final String itemId; + @Getter + private final String itemId; - public final String threadPoolId; + @Getter + private final String threadPoolId; @Setter private volatile boolean isInitializing = true; @@ -56,7 +59,7 @@ public class CacheData { this.threadPoolId = threadPoolId; this.content = ContentUtil.getPoolContent(GlobalThreadPoolManage.getPoolParameter(threadPoolId)); this.md5 = getMd5String(content); - this.listeners = new CopyOnWriteArrayList(); + this.listeners = new CopyOnWriteArrayList<>(); } public void addListener(Listener listener) { diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientShutdown.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientShutdown.java index bd12c789..1434d4b2 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientShutdown.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientShutdown.java @@ -33,7 +33,7 @@ public class ClientShutdown { @Getter private volatile boolean prepareClose = false; - private final static Long TIME_OUT_SECOND = 1L; + private static final Long TIME_OUT_SECOND = 1L; private static final int DEFAULT_COUNT = 1; private final CountDownLatch countDownLatch = new CountDownLatch(DEFAULT_COUNT); diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java index 1e5af1a7..8656c5ce 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java @@ -30,7 +30,6 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.util.StringUtils; - import java.net.URLDecoder; import java.util.ArrayList; import java.util.Collections; @@ -43,20 +42,21 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - -import static cn.hippo4j.common.constant.Constants.CLIENT_VERSION; -import static cn.hippo4j.common.constant.Constants.CONFIG_CONTROLLER_PATH; import static cn.hippo4j.common.constant.Constants.CONFIG_LONG_POLL_TIMEOUT; import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER_TRANSLATION; +import static cn.hippo4j.common.constant.Constants.WORD_SEPARATOR; import static cn.hippo4j.common.constant.Constants.LINE_SEPARATOR; -import static cn.hippo4j.common.constant.Constants.LISTENER_PATH; -import static cn.hippo4j.common.constant.Constants.LONG_PULLING_CLIENT_IDENTIFICATION; +import static cn.hippo4j.common.constant.Constants.PROBE_MODIFY_REQUEST; +import static cn.hippo4j.common.constant.Constants.WEIGHT_CONFIGS; import static cn.hippo4j.common.constant.Constants.LONG_PULLING_TIMEOUT; +import static cn.hippo4j.common.constant.Constants.LONG_PULLING_CLIENT_IDENTIFICATION; import static cn.hippo4j.common.constant.Constants.LONG_PULLING_TIMEOUT_NO_HANGUP; +import static cn.hippo4j.common.constant.Constants.CLIENT_VERSION; +import static cn.hippo4j.common.constant.Constants.LISTENER_PATH; +import static cn.hippo4j.common.constant.Constants.INITIAL_CAPACITY; +import static cn.hippo4j.common.constant.Constants.DATA_GROUP_TENANT_SIZE; +import static cn.hippo4j.common.constant.Constants.CONFIG_CONTROLLER_PATH; import static cn.hippo4j.common.constant.Constants.NULL; -import static cn.hippo4j.common.constant.Constants.PROBE_MODIFY_REQUEST; -import static cn.hippo4j.common.constant.Constants.WEIGHT_CONFIGS; -import static cn.hippo4j.common.constant.Constants.WORD_SEPARATOR; /** * Client worker. @@ -77,6 +77,8 @@ public class ClientWorker implements DisposableBean { private final CountDownLatch cacheCondition = new CountDownLatch(1); private final ConcurrentHashMap cacheMap = new ConcurrentHashMap<>(16); + private final long defaultTimedOut = 3000L; + @SuppressWarnings("all") public ClientWorker(HttpAgent httpAgent, String identify, @@ -113,13 +115,16 @@ public class ClientWorker implements DisposableBean { executorService.shutdownNow(); } + /** + * LongPollingRunnable + */ class LongPollingRunnable implements Runnable { private boolean cacheMapInitEmptyFlag; private final CountDownLatch cacheCondition; - public LongPollingRunnable(boolean cacheMapInitEmptyFlag, CountDownLatch cacheCondition) { + LongPollingRunnable(boolean cacheMapInitEmptyFlag, CountDownLatch cacheCondition) { this.cacheMapInitEmptyFlag = cacheMapInitEmptyFlag; this.cacheCondition = cacheCondition; } @@ -147,7 +152,7 @@ public class ClientWorker implements DisposableBean { String itemId = keys[1]; String namespace = keys[2]; try { - String content = getServerConfig(namespace, itemId, tpId, 3000L); + String content = getServerConfig(namespace, itemId, tpId, defaultTimedOut); CacheData cacheData = cacheMap.get(tpId); String poolContent = ContentUtil.getPoolContent(JSONUtil.parseObject(content, ThreadPoolParameterInfo.class)); cacheData.setContent(poolContent); @@ -157,7 +162,7 @@ public class ClientWorker implements DisposableBean { } for (CacheData cacheData : cacheDataList) { if (!cacheData.isInitializing() || inInitializingCacheList - .contains(GroupKey.getKeyTenant(cacheData.threadPoolId, cacheData.itemId, cacheData.tenantId))) { + .contains(GroupKey.getKeyTenant(cacheData.getThreadPoolId(), cacheData.getItemId(), cacheData.getTenantId()))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } @@ -170,13 +175,13 @@ public class ClientWorker implements DisposableBean { private List checkUpdateDataIds(List cacheDataList, List inInitializingCacheList) { StringBuilder sb = new StringBuilder(); for (CacheData cacheData : cacheDataList) { - sb.append(cacheData.threadPoolId).append(WORD_SEPARATOR); - sb.append(cacheData.itemId).append(WORD_SEPARATOR); - sb.append(cacheData.tenantId).append(WORD_SEPARATOR); + sb.append(cacheData.getThreadPoolId()).append(WORD_SEPARATOR); + sb.append(cacheData.getItemId()).append(WORD_SEPARATOR); + sb.append(cacheData.getTenantId()).append(WORD_SEPARATOR); sb.append(identify).append(WORD_SEPARATOR); sb.append(cacheData.getMd5()).append(LINE_SEPARATOR); if (cacheData.isInitializing()) { - inInitializingCacheList.add(GroupKey.getKeyTenant(cacheData.threadPoolId, cacheData.itemId, cacheData.tenantId)); + inInitializingCacheList.add(GroupKey.getKeyTenant(cacheData.getThreadPoolId(), cacheData.getItemId(), cacheData.getTenantId())); } } boolean isInitializingCacheList = !inInitializingCacheList.isEmpty(); @@ -213,7 +218,7 @@ public class ClientWorker implements DisposableBean { } public String getServerConfig(String namespace, String itemId, String threadPoolId, long readTimeout) { - Map params = new HashMap<>(3); + Map params = new HashMap<>(INITIAL_CAPACITY); params.put("namespace", namespace); params.put("itemId", itemId); params.put("tpId", threadPoolId); @@ -241,7 +246,7 @@ public class ClientWorker implements DisposableBean { String[] keyArr = dataIdAndGroup.split(WORD_SEPARATOR); String dataId = keyArr[0]; String group = keyArr[1]; - if (keyArr.length == 3) { + if (keyArr.length == DATA_GROUP_TENANT_SIZE) { String tenant = keyArr[2]; updateList.add(GroupKey.getKeyTenant(dataId, group, tenant)); log.info("[{}] Refresh thread pool changed.", dataId); @@ -274,7 +279,7 @@ public class ClientWorker implements DisposableBean { if (lastCacheData == null) { String serverConfig; try { - serverConfig = getServerConfig(namespace, itemId, threadPoolId, 3000L); + serverConfig = getServerConfig(namespace, itemId, threadPoolId, defaultTimedOut); ThreadPoolParameterInfo poolInfo = JSONUtil.parseObject(serverConfig, ThreadPoolParameterInfo.class); cacheData.setContent(ContentUtil.getPoolContent(poolInfo)); } catch (Exception ex) { diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java index 38a22368..ce01cbcd 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java @@ -51,6 +51,8 @@ public class DiscoveryClient implements DisposableBean { private static final String PREFIX = "DiscoveryClient_"; private final String appPathIdentifier; + private final int delayTime = 30; + public DiscoveryClient(HttpAgent httpAgent, InstanceInfo instanceInfo, ClientShutdown hippo4jClientShutdown) { this.httpAgent = httpAgent; this.instanceInfo = instanceInfo; @@ -65,7 +67,7 @@ public class DiscoveryClient implements DisposableBean { } private void initScheduledTasks() { - scheduler.scheduleWithFixedDelay(new HeartbeatThread(), 30, 30, TimeUnit.SECONDS); + scheduler.scheduleWithFixedDelay(new HeartbeatThread(), delayTime, delayTime, TimeUnit.SECONDS); } boolean register() { @@ -118,6 +120,9 @@ public class DiscoveryClient implements DisposableBean { hippo4jClientShutdown.prepareDestroy(); } + /** + * HeartbeatThread + */ public class HeartbeatThread implements Runnable { @Override diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolSubscribeConfig.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolSubscribeConfig.java index 804c0864..db646ff8 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolSubscribeConfig.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolSubscribeConfig.java @@ -41,10 +41,12 @@ public class DynamicThreadPoolSubscribeConfig { private final BootstrapProperties properties; + private final int defaultAliveTime = 2000; + private final ExecutorService configRefreshExecutorService = ThreadPoolBuilder.builder() .corePoolSize(1) .maximumPoolSize(2) - .keepAliveTime(2000) + .keepAliveTime(defaultAliveTime) .timeUnit(TimeUnit.MILLISECONDS) .workQueue(BlockingQueueTypeEnum.SYNCHRONOUS_QUEUE) .allowCoreThreadTimeOut(true) diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/AbstractHealthCheck.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/AbstractHealthCheck.java index 1fd78118..246f80ad 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/AbstractHealthCheck.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/AbstractHealthCheck.java @@ -31,8 +31,10 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; - +import static cn.hippo4j.common.constant.Constants.MAX_CHECK_FAILURE_COUNT; +import static cn.hippo4j.common.constant.Constants.SECONDS_IN_MILLISECONDS; import static cn.hippo4j.common.constant.Constants.HEALTH_CHECK_INTERVAL; +import static cn.hippo4j.common.constant.Constants.FAILURE_SLEEP_INTERVAL; /** * Abstract health check. @@ -99,10 +101,10 @@ public abstract class AbstractHealthCheck implements ServerHealthCheck, Initiali } else { healthStatus = false; checkFailureCount++; - if (checkFailureCount > 1 && checkFailureCount < 4) { - ThreadUtil.sleep(HEALTH_CHECK_INTERVAL * 1000 * (checkFailureCount - 1)); - } else if (checkFailureCount >= 4) { - ThreadUtil.sleep(25000L); + if (checkFailureCount > 1 && checkFailureCount < MAX_CHECK_FAILURE_COUNT) { + ThreadUtil.sleep((long) HEALTH_CHECK_INTERVAL * SECONDS_IN_MILLISECONDS * (checkFailureCount - 1)); + } else if (checkFailureCount >= MAX_CHECK_FAILURE_COUNT) { + ThreadUtil.sleep(FAILURE_SLEEP_INTERVAL); } } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerListManager.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerListManager.java index cc5abd36..3202869c 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerListManager.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerListManager.java @@ -88,13 +88,16 @@ public class ServerListManager { return new ServerAddressIterator(serverUrls); } + /** + * Server Address Iterator + */ private static class ServerAddressIterator implements Iterator { final List sorted; final Iterator iter; - public ServerAddressIterator(List source) { + ServerAddressIterator(List source) { sorted = new ArrayList(); for (String address : source) { sorted.add(new RandomizedServerAddress(address)); @@ -113,6 +116,9 @@ public class ServerListManager { return null; } + /** + * Randomized Server Address + */ static class RandomizedServerAddress implements Comparable { static Random random = new Random(); @@ -123,7 +129,7 @@ public class ServerListManager { int seed; - public RandomizedServerAddress(String ip) { + RandomizedServerAddress(String ip) { try { this.serverIp = ip; /* diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/security/SecurityProxy.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/security/SecurityProxy.java index 72c5bd07..c03d518a 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/security/SecurityProxy.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/security/SecurityProxy.java @@ -39,6 +39,8 @@ public class SecurityProxy { private static final String APPLY_TOKEN_URL = Constants.BASE_PATH + "/auth/users/apply/token"; + private final int refreshWindowDuration = 10; + private final String username; private final String password; @@ -88,7 +90,7 @@ public class SecurityProxy { TokenInfo tokenInfo = JSONUtil.parseObject(tokenJsonStr, TokenInfo.class); accessToken = tokenInfo.getAccessToken(); tokenTtl = tokenInfo.getTokenTtl(); - tokenRefreshWindow = tokenTtl / 10; + tokenRefreshWindow = tokenTtl / refreshWindowDuration; } catch (Throwable ex) { log.error("Failed to apply for token. message: {}", ex.getMessage()); return false; diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java index 3d94628c..c3bd82e1 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java @@ -53,10 +53,14 @@ import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - +import static cn.hippo4j.common.constant.Constants.INITIAL_CAPACITY; +import static cn.hippo4j.common.constant.Constants.TP_ID; import static cn.hippo4j.common.constant.Constants.ITEM_ID; import static cn.hippo4j.common.constant.Constants.NAMESPACE; -import static cn.hippo4j.common.constant.Constants.TP_ID; +import static cn.hippo4j.common.constant.Constants.ACTIVE_ALARM; +import static cn.hippo4j.common.constant.Constants.CAPACITY_ALARM; +import static cn.hippo4j.common.constant.Constants.EXECUTE_TIME_OUT; +import static cn.hippo4j.common.constant.Constants.HTTP_EXECUTE_TIMEOUT; /** * Dynamic thread-pool post processor. @@ -94,7 +98,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { return bean; } DynamicThreadPoolExecutor dynamicThreadPoolExecutor; - if ((dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean)) == null) { + dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean); + if ((dynamicThreadPoolExecutor) == null) { dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean; } DynamicThreadPoolWrapper dynamicThreadPoolWrapper = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor); @@ -128,16 +133,17 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { protected ThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) { String threadPoolId = dynamicThreadPoolWrapper.getThreadPoolId(); ThreadPoolExecutor executor = dynamicThreadPoolWrapper.getExecutor(); - Map queryStrMap = new HashMap(3); + Map queryStrMap = new HashMap<>(INITIAL_CAPACITY); queryStrMap.put(TP_ID, threadPoolId); queryStrMap.put(ITEM_ID, properties.getItemId()); queryStrMap.put(NAMESPACE, properties.getNamespace()); ThreadPoolParameterInfo threadPoolParameterInfo = new ThreadPoolParameterInfo(); try { - Result result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 5000L); + Result result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, HTTP_EXECUTE_TIMEOUT); if (result.isSuccess() && result.getData() != null) { String resultJsonStr = JSONUtil.toJSONString(result.getData()); - if ((threadPoolParameterInfo = JSONUtil.parseObject(resultJsonStr, ThreadPoolParameterInfo.class)) != null) { + threadPoolParameterInfo = JSONUtil.parseObject(resultJsonStr, ThreadPoolParameterInfo.class); + if (threadPoolParameterInfo != null) { threadPoolParamReplace(executor, threadPoolParameterInfo); registerNotifyAlarm(threadPoolParameterInfo); } @@ -153,9 +159,9 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { .allowCoreThreadTimeOut(executor.allowsCoreThreadTimeOut()) .keepAliveTime(executor.getKeepAliveTime(TimeUnit.MILLISECONDS)) .isAlarm(false) - .activeAlarm(80) - .capacityAlarm(80) - .executeTimeOut(10000L) + .activeAlarm(ACTIVE_ALARM) + .capacityAlarm(CAPACITY_ALARM) + .executeTimeOut(EXECUTE_TIME_OUT) .rejectedPolicyType(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(executor.getRejectedExecutionHandler().getClass().getSimpleName())) .build(); DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder()