diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java b/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java index 2b651ffc..80e7951d 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java @@ -102,6 +102,22 @@ public class Constants { public static final int HEALTH_CHECK_INTERVAL = 5; + public static final int MAX_CHECK_FAILURE_COUNT = 4; + + public static final int INITIAL_CAPACITY = 3; + + public static final int DATA_GROUP_TENANT_SIZE = 3; + + public static final int ACTIVE_ALARM = 80; + + public static final int CAPACITY_ALARM = 80; + + public static final long EXECUTE_TIME_OUT = 10000L; + + public static final int SECONDS_IN_MILLISECONDS = 1000; + + public static final long FAILURE_SLEEP_INTERVAL = 25000L; + public static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors(); public static final String DEFAULT_GROUP = "default group"; @@ -110,7 +126,7 @@ public class Constants { public static final String EXECUTE_TIMEOUT_TRACE = "executeTimeoutTrace"; - public static final int HTTP_EXECUTE_TIMEOUT = 5000; + public static final long HTTP_EXECUTE_TIMEOUT = 5000L; public static final String CLIENT_VERSION = "Client-Version"; diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/BaseThreadDetailStateHandler.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/BaseThreadDetailStateHandler.java index 2e3f1051..72772f8b 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/BaseThreadDetailStateHandler.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/BaseThreadDetailStateHandler.java @@ -38,9 +38,9 @@ import java.util.concurrent.ThreadPoolExecutor; @Slf4j public class BaseThreadDetailStateHandler implements ThreadDetailState { - private final String WORKERS = "workers"; + private final String workersName = "workers"; - private final String THREAD = "thread"; + private final String threadName = "thread"; @Override public List getThreadDetailStateInfo(String threadPoolId) { @@ -53,14 +53,14 @@ public class BaseThreadDetailStateHandler implements ThreadDetailState { public List getThreadDetailStateInfo(ThreadPoolExecutor threadPoolExecutor) { List resultThreadStates = new ArrayList(); try { - HashSet workers = (HashSet) ReflectUtil.getFieldValue(threadPoolExecutor, WORKERS); + HashSet workers = (HashSet) ReflectUtil.getFieldValue(threadPoolExecutor, workersName); if (CollectionUtil.isEmpty(workers)) { return resultThreadStates; } for (Object worker : workers) { Thread thread; try { - thread = (Thread) ReflectUtil.getFieldValue(worker, THREAD); + thread = (Thread) ReflectUtil.getFieldValue(worker, threadName); if (thread == null) { log.warn("Reflection get worker thread is null. Worker: {}", worker); continue; diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/CacheData.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/CacheData.java index acd88ef2..19813c49 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/CacheData.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/CacheData.java @@ -35,15 +35,18 @@ import java.util.concurrent.CopyOnWriteArrayList; public class CacheData { @Getter - public volatile String md5; + private volatile String md5; - public volatile String content; + private volatile String content; - public final String tenantId; + @Getter + private final String tenantId; - public final String itemId; + @Getter + private final String itemId; - public final String threadPoolId; + @Getter + private final String threadPoolId; @Setter private volatile boolean isInitializing = true; @@ -56,7 +59,7 @@ public class CacheData { this.threadPoolId = threadPoolId; this.content = ContentUtil.getPoolContent(GlobalThreadPoolManage.getPoolParameter(threadPoolId)); this.md5 = getMd5String(content); - this.listeners = new CopyOnWriteArrayList(); + this.listeners = new CopyOnWriteArrayList<>(); } public void addListener(Listener listener) { diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientShutdown.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientShutdown.java index bd12c789..1434d4b2 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientShutdown.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientShutdown.java @@ -33,7 +33,7 @@ public class ClientShutdown { @Getter private volatile boolean prepareClose = false; - private final static Long TIME_OUT_SECOND = 1L; + private static final Long TIME_OUT_SECOND = 1L; private static final int DEFAULT_COUNT = 1; private final CountDownLatch countDownLatch = new CountDownLatch(DEFAULT_COUNT); diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java index 1e5af1a7..5a21ca6f 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java @@ -44,19 +44,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import static cn.hippo4j.common.constant.Constants.CLIENT_VERSION; -import static cn.hippo4j.common.constant.Constants.CONFIG_CONTROLLER_PATH; -import static cn.hippo4j.common.constant.Constants.CONFIG_LONG_POLL_TIMEOUT; -import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER_TRANSLATION; -import static cn.hippo4j.common.constant.Constants.LINE_SEPARATOR; -import static cn.hippo4j.common.constant.Constants.LISTENER_PATH; -import static cn.hippo4j.common.constant.Constants.LONG_PULLING_CLIENT_IDENTIFICATION; -import static cn.hippo4j.common.constant.Constants.LONG_PULLING_TIMEOUT; -import static cn.hippo4j.common.constant.Constants.LONG_PULLING_TIMEOUT_NO_HANGUP; -import static cn.hippo4j.common.constant.Constants.NULL; -import static cn.hippo4j.common.constant.Constants.PROBE_MODIFY_REQUEST; -import static cn.hippo4j.common.constant.Constants.WEIGHT_CONFIGS; -import static cn.hippo4j.common.constant.Constants.WORD_SEPARATOR; +import static cn.hippo4j.common.constant.Constants.*; /** * Client worker. @@ -113,13 +101,16 @@ public class ClientWorker implements DisposableBean { executorService.shutdownNow(); } + /** + * LongPollingRunnable + */ class LongPollingRunnable implements Runnable { private boolean cacheMapInitEmptyFlag; private final CountDownLatch cacheCondition; - public LongPollingRunnable(boolean cacheMapInitEmptyFlag, CountDownLatch cacheCondition) { + LongPollingRunnable(boolean cacheMapInitEmptyFlag, CountDownLatch cacheCondition) { this.cacheMapInitEmptyFlag = cacheMapInitEmptyFlag; this.cacheCondition = cacheCondition; } @@ -147,7 +138,7 @@ public class ClientWorker implements DisposableBean { String itemId = keys[1]; String namespace = keys[2]; try { - String content = getServerConfig(namespace, itemId, tpId, 3000L); + String content = getServerConfig(namespace, itemId, tpId, HTTP_EXECUTE_TIMEOUT); CacheData cacheData = cacheMap.get(tpId); String poolContent = ContentUtil.getPoolContent(JSONUtil.parseObject(content, ThreadPoolParameterInfo.class)); cacheData.setContent(poolContent); @@ -157,7 +148,7 @@ public class ClientWorker implements DisposableBean { } for (CacheData cacheData : cacheDataList) { if (!cacheData.isInitializing() || inInitializingCacheList - .contains(GroupKey.getKeyTenant(cacheData.threadPoolId, cacheData.itemId, cacheData.tenantId))) { + .contains(GroupKey.getKeyTenant(cacheData.getThreadPoolId(), cacheData.getItemId(), cacheData.getTenantId()))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } @@ -170,13 +161,13 @@ public class ClientWorker implements DisposableBean { private List checkUpdateDataIds(List cacheDataList, List inInitializingCacheList) { StringBuilder sb = new StringBuilder(); for (CacheData cacheData : cacheDataList) { - sb.append(cacheData.threadPoolId).append(WORD_SEPARATOR); - sb.append(cacheData.itemId).append(WORD_SEPARATOR); - sb.append(cacheData.tenantId).append(WORD_SEPARATOR); + sb.append(cacheData.getThreadPoolId()).append(WORD_SEPARATOR); + sb.append(cacheData.getItemId()).append(WORD_SEPARATOR); + sb.append(cacheData.getTenantId()).append(WORD_SEPARATOR); sb.append(identify).append(WORD_SEPARATOR); sb.append(cacheData.getMd5()).append(LINE_SEPARATOR); if (cacheData.isInitializing()) { - inInitializingCacheList.add(GroupKey.getKeyTenant(cacheData.threadPoolId, cacheData.itemId, cacheData.tenantId)); + inInitializingCacheList.add(GroupKey.getKeyTenant(cacheData.getThreadPoolId(), cacheData.getItemId(), cacheData.getTenantId())); } } boolean isInitializingCacheList = !inInitializingCacheList.isEmpty(); @@ -213,7 +204,7 @@ public class ClientWorker implements DisposableBean { } public String getServerConfig(String namespace, String itemId, String threadPoolId, long readTimeout) { - Map params = new HashMap<>(3); + Map params = new HashMap<>(INITIAL_CAPACITY); params.put("namespace", namespace); params.put("itemId", itemId); params.put("tpId", threadPoolId); @@ -241,7 +232,7 @@ public class ClientWorker implements DisposableBean { String[] keyArr = dataIdAndGroup.split(WORD_SEPARATOR); String dataId = keyArr[0]; String group = keyArr[1]; - if (keyArr.length == 3) { + if (keyArr.length == DATA_GROUP_TENANT_SIZE) { String tenant = keyArr[2]; updateList.add(GroupKey.getKeyTenant(dataId, group, tenant)); log.info("[{}] Refresh thread pool changed.", dataId); @@ -274,7 +265,7 @@ public class ClientWorker implements DisposableBean { if (lastCacheData == null) { String serverConfig; try { - serverConfig = getServerConfig(namespace, itemId, threadPoolId, 3000L); + serverConfig = getServerConfig(namespace, itemId, threadPoolId, HTTP_EXECUTE_TIMEOUT); ThreadPoolParameterInfo poolInfo = JSONUtil.parseObject(serverConfig, ThreadPoolParameterInfo.class); cacheData.setContent(ContentUtil.getPoolContent(poolInfo)); } catch (Exception ex) { diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java index 38a22368..ce01cbcd 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java @@ -51,6 +51,8 @@ public class DiscoveryClient implements DisposableBean { private static final String PREFIX = "DiscoveryClient_"; private final String appPathIdentifier; + private final int delayTime = 30; + public DiscoveryClient(HttpAgent httpAgent, InstanceInfo instanceInfo, ClientShutdown hippo4jClientShutdown) { this.httpAgent = httpAgent; this.instanceInfo = instanceInfo; @@ -65,7 +67,7 @@ public class DiscoveryClient implements DisposableBean { } private void initScheduledTasks() { - scheduler.scheduleWithFixedDelay(new HeartbeatThread(), 30, 30, TimeUnit.SECONDS); + scheduler.scheduleWithFixedDelay(new HeartbeatThread(), delayTime, delayTime, TimeUnit.SECONDS); } boolean register() { @@ -118,6 +120,9 @@ public class DiscoveryClient implements DisposableBean { hippo4jClientShutdown.prepareDestroy(); } + /** + * HeartbeatThread + */ public class HeartbeatThread implements Runnable { @Override diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolSubscribeConfig.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolSubscribeConfig.java index 804c0864..db646ff8 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolSubscribeConfig.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolSubscribeConfig.java @@ -41,10 +41,12 @@ public class DynamicThreadPoolSubscribeConfig { private final BootstrapProperties properties; + private final int defaultAliveTime = 2000; + private final ExecutorService configRefreshExecutorService = ThreadPoolBuilder.builder() .corePoolSize(1) .maximumPoolSize(2) - .keepAliveTime(2000) + .keepAliveTime(defaultAliveTime) .timeUnit(TimeUnit.MILLISECONDS) .workQueue(BlockingQueueTypeEnum.SYNCHRONOUS_QUEUE) .allowCoreThreadTimeOut(true) diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/AbstractHealthCheck.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/AbstractHealthCheck.java index 1fd78118..fc2cb69c 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/AbstractHealthCheck.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/AbstractHealthCheck.java @@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import static cn.hippo4j.common.constant.Constants.HEALTH_CHECK_INTERVAL; +import static cn.hippo4j.common.constant.Constants.*; /** * Abstract health check. @@ -99,10 +99,10 @@ public abstract class AbstractHealthCheck implements ServerHealthCheck, Initiali } else { healthStatus = false; checkFailureCount++; - if (checkFailureCount > 1 && checkFailureCount < 4) { - ThreadUtil.sleep(HEALTH_CHECK_INTERVAL * 1000 * (checkFailureCount - 1)); - } else if (checkFailureCount >= 4) { - ThreadUtil.sleep(25000L); + if (checkFailureCount > 1 && checkFailureCount < MAX_CHECK_FAILURE_COUNT) { + ThreadUtil.sleep((long) HEALTH_CHECK_INTERVAL * SECONDS_IN_MILLISECONDS * (checkFailureCount - 1)); + } else if (checkFailureCount >= MAX_CHECK_FAILURE_COUNT) { + ThreadUtil.sleep(FAILURE_SLEEP_INTERVAL); } } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerListManager.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerListManager.java index cc5abd36..3202869c 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerListManager.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerListManager.java @@ -88,13 +88,16 @@ public class ServerListManager { return new ServerAddressIterator(serverUrls); } + /** + * Server Address Iterator + */ private static class ServerAddressIterator implements Iterator { final List sorted; final Iterator iter; - public ServerAddressIterator(List source) { + ServerAddressIterator(List source) { sorted = new ArrayList(); for (String address : source) { sorted.add(new RandomizedServerAddress(address)); @@ -113,6 +116,9 @@ public class ServerListManager { return null; } + /** + * Randomized Server Address + */ static class RandomizedServerAddress implements Comparable { static Random random = new Random(); @@ -123,7 +129,7 @@ public class ServerListManager { int seed; - public RandomizedServerAddress(String ip) { + RandomizedServerAddress(String ip) { try { this.serverIp = ip; /* diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/security/SecurityProxy.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/security/SecurityProxy.java index 72c5bd07..c03d518a 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/security/SecurityProxy.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/security/SecurityProxy.java @@ -39,6 +39,8 @@ public class SecurityProxy { private static final String APPLY_TOKEN_URL = Constants.BASE_PATH + "/auth/users/apply/token"; + private final int refreshWindowDuration = 10; + private final String username; private final String password; @@ -88,7 +90,7 @@ public class SecurityProxy { TokenInfo tokenInfo = JSONUtil.parseObject(tokenJsonStr, TokenInfo.class); accessToken = tokenInfo.getAccessToken(); tokenTtl = tokenInfo.getTokenTtl(); - tokenRefreshWindow = tokenTtl / 10; + tokenRefreshWindow = tokenTtl / refreshWindowDuration; } catch (Throwable ex) { log.error("Failed to apply for token. message: {}", ex.getMessage()); return false; diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java index 3d94628c..00a1dd67 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java @@ -54,9 +54,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import static cn.hippo4j.common.constant.Constants.ITEM_ID; -import static cn.hippo4j.common.constant.Constants.NAMESPACE; -import static cn.hippo4j.common.constant.Constants.TP_ID; +import static cn.hippo4j.common.constant.Constants.*; /** * Dynamic thread-pool post processor. @@ -94,7 +92,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { return bean; } DynamicThreadPoolExecutor dynamicThreadPoolExecutor; - if ((dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean)) == null) { + dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean); + if ((dynamicThreadPoolExecutor) == null) { dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean; } DynamicThreadPoolWrapper dynamicThreadPoolWrapper = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor); @@ -128,16 +127,17 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { protected ThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) { String threadPoolId = dynamicThreadPoolWrapper.getThreadPoolId(); ThreadPoolExecutor executor = dynamicThreadPoolWrapper.getExecutor(); - Map queryStrMap = new HashMap(3); + Map queryStrMap = new HashMap(INITIAL_CAPACITY); queryStrMap.put(TP_ID, threadPoolId); queryStrMap.put(ITEM_ID, properties.getItemId()); queryStrMap.put(NAMESPACE, properties.getNamespace()); ThreadPoolParameterInfo threadPoolParameterInfo = new ThreadPoolParameterInfo(); try { - Result result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 5000L); + Result result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, HTTP_EXECUTE_TIMEOUT); if (result.isSuccess() && result.getData() != null) { String resultJsonStr = JSONUtil.toJSONString(result.getData()); - if ((threadPoolParameterInfo = JSONUtil.parseObject(resultJsonStr, ThreadPoolParameterInfo.class)) != null) { + threadPoolParameterInfo = JSONUtil.parseObject(resultJsonStr, ThreadPoolParameterInfo.class); + if (threadPoolParameterInfo != null) { threadPoolParamReplace(executor, threadPoolParameterInfo); registerNotifyAlarm(threadPoolParameterInfo); } @@ -153,9 +153,9 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { .allowCoreThreadTimeOut(executor.allowsCoreThreadTimeOut()) .keepAliveTime(executor.getKeepAliveTime(TimeUnit.MILLISECONDS)) .isAlarm(false) - .activeAlarm(80) - .capacityAlarm(80) - .executeTimeOut(10000L) + .activeAlarm(ACTIVE_ALARM) + .capacityAlarm(CAPACITY_ALARM) + .executeTimeOut(EXECUTE_TIME_OUT) .rejectedPolicyType(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(executor.getRejectedExecutionHandler().getClass().getSimpleName())) .build(); DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder()