Fix some module CheckStyle Errors (#1240)

* fix checkType errors about module hippo4j-spring-boot-starter.

* fix checkType errors about module hippo4j-adapter-dubbo.

* fix checkType errors about module hippo4j-adapter-alibaba-dubbo.

* fix checkType errors about module hippo4j-adapter-hystrix.

* fix checkType errors about module hippo4j-spring-boot-starter.
pull/1244/head
Xin Chen 2 years ago committed by GitHub
parent 0f3b33fbcc
commit f5126e3b68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -42,7 +42,7 @@ import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMI
@Slf4j @Slf4j
public class AlibabaDubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener<ApplicationStartedEvent> { public class AlibabaDubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener<ApplicationStartedEvent> {
private final Map<String, ThreadPoolExecutor> DUBBO_PROTOCOL_EXECUTOR = new HashMap<>(); private final Map<String, ThreadPoolExecutor> dubboProtocolExecutor = new HashMap<>();
@Override @Override
public String mark() { public String mark() {
@ -52,7 +52,7 @@ public class AlibabaDubboThreadPoolAdapter implements ThreadPoolAdapter, Applica
@Override @Override
public ThreadPoolAdapterState getThreadPoolState(String identify) { public ThreadPoolAdapterState getThreadPoolState(String identify) {
ThreadPoolAdapterState threadPoolAdapterState = new ThreadPoolAdapterState(); ThreadPoolAdapterState threadPoolAdapterState = new ThreadPoolAdapterState();
ThreadPoolExecutor executor = DUBBO_PROTOCOL_EXECUTOR.get(identify); ThreadPoolExecutor executor = dubboProtocolExecutor.get(identify);
if (executor == null) { if (executor == null) {
log.warn("[{}] Alibaba Dubbo consuming thread pool not found.", identify); log.warn("[{}] Alibaba Dubbo consuming thread pool not found.", identify);
return threadPoolAdapterState; return threadPoolAdapterState;
@ -66,14 +66,14 @@ public class AlibabaDubboThreadPoolAdapter implements ThreadPoolAdapter, Applica
@Override @Override
public List<ThreadPoolAdapterState> getThreadPoolStates() { public List<ThreadPoolAdapterState> getThreadPoolStates() {
List<ThreadPoolAdapterState> threadPoolAdapterStates = new ArrayList<>(); List<ThreadPoolAdapterState> threadPoolAdapterStates = new ArrayList<>();
DUBBO_PROTOCOL_EXECUTOR.forEach((key, val) -> threadPoolAdapterStates.add(getThreadPoolState(String.valueOf(key)))); dubboProtocolExecutor.forEach((key, val) -> threadPoolAdapterStates.add(getThreadPoolState(String.valueOf(key))));
return threadPoolAdapterStates; return threadPoolAdapterStates;
} }
@Override @Override
public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) {
String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey(); String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey();
ThreadPoolExecutor executor = DUBBO_PROTOCOL_EXECUTOR.get(threadPoolAdapterParameter.getThreadPoolKey()); ThreadPoolExecutor executor = dubboProtocolExecutor.get(threadPoolAdapterParameter.getThreadPoolKey());
if (executor == null) { if (executor == null) {
log.warn("[{}] Alibaba Dubbo consuming thread pool not found.", threadPoolKey); log.warn("[{}] Alibaba Dubbo consuming thread pool not found.", threadPoolKey);
return false; return false;
@ -94,7 +94,7 @@ public class AlibabaDubboThreadPoolAdapter implements ThreadPoolAdapter, Applica
try { try {
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
Map<String, Object> executors = dataStore.get(poolKey); Map<String, Object> executors = dataStore.get(poolKey);
executors.forEach((key, value) -> DUBBO_PROTOCOL_EXECUTOR.put(key, (ThreadPoolExecutor) value)); executors.forEach((key, value) -> dubboProtocolExecutor.put(key, (ThreadPoolExecutor) value));
} catch (Exception ex) { } catch (Exception ex) {
log.error("Failed to get Alibaba Dubbo protocol thread pool", ex); log.error("Failed to get Alibaba Dubbo protocol thread pool", ex);
} }

@ -46,7 +46,7 @@ import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMI
@Slf4j @Slf4j
public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener<ApplicationStartedEvent> { public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener<ApplicationStartedEvent> {
private final Map<String, ThreadPoolExecutor> DUBBO_PROTOCOL_EXECUTOR = new HashMap<>(); private final Map<String, ThreadPoolExecutor> dubboProtocolExecutor = new HashMap<>();
@Override @Override
public String mark() { public String mark() {
@ -56,7 +56,7 @@ public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationLis
@Override @Override
public ThreadPoolAdapterState getThreadPoolState(String identify) { public ThreadPoolAdapterState getThreadPoolState(String identify) {
ThreadPoolAdapterState threadPoolAdapterState = new ThreadPoolAdapterState(); ThreadPoolAdapterState threadPoolAdapterState = new ThreadPoolAdapterState();
ThreadPoolExecutor executor = DUBBO_PROTOCOL_EXECUTOR.get(identify); ThreadPoolExecutor executor = dubboProtocolExecutor.get(identify);
if (executor == null) { if (executor == null) {
log.warn("[{}] Dubbo consuming thread pool not found.", identify); log.warn("[{}] Dubbo consuming thread pool not found.", identify);
return threadPoolAdapterState; return threadPoolAdapterState;
@ -70,14 +70,14 @@ public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationLis
@Override @Override
public List<ThreadPoolAdapterState> getThreadPoolStates() { public List<ThreadPoolAdapterState> getThreadPoolStates() {
List<ThreadPoolAdapterState> threadPoolAdapterStates = new ArrayList<>(); List<ThreadPoolAdapterState> threadPoolAdapterStates = new ArrayList<>();
DUBBO_PROTOCOL_EXECUTOR.forEach((key, val) -> threadPoolAdapterStates.add(getThreadPoolState(String.valueOf(key)))); dubboProtocolExecutor.forEach((key, val) -> threadPoolAdapterStates.add(getThreadPoolState(String.valueOf(key))));
return threadPoolAdapterStates; return threadPoolAdapterStates;
} }
@Override @Override
public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) {
String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey(); String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey();
ThreadPoolExecutor executor = DUBBO_PROTOCOL_EXECUTOR.get(threadPoolAdapterParameter.getThreadPoolKey()); ThreadPoolExecutor executor = dubboProtocolExecutor.get(threadPoolAdapterParameter.getThreadPoolKey());
if (executor == null) { if (executor == null) {
log.warn("[{}] Dubbo consuming thread pool not found.", threadPoolKey); log.warn("[{}] Dubbo consuming thread pool not found.", threadPoolKey);
return false; return false;
@ -105,14 +105,14 @@ public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationLis
if (isLegacyVersion) { if (isLegacyVersion) {
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
Map<String, Object> executors = dataStore.get(poolKey); Map<String, Object> executors = dataStore.get(poolKey);
executors.forEach((key, value) -> DUBBO_PROTOCOL_EXECUTOR.put(key, (ThreadPoolExecutor) value)); executors.forEach((key, value) -> dubboProtocolExecutor.put(key, (ThreadPoolExecutor) value));
return; return;
} }
ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension(); ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data = ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data =
(ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>>) ReflectUtil.getFieldValue(executorRepository, "data"); (ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>>) ReflectUtil.getFieldValue(executorRepository, "data");
ConcurrentMap<Integer, ExecutorService> executorServiceMap = data.get(poolKey); ConcurrentMap<Integer, ExecutorService> executorServiceMap = data.get(poolKey);
executorServiceMap.forEach((key, value) -> DUBBO_PROTOCOL_EXECUTOR.put(String.valueOf(key), (ThreadPoolExecutor) value)); executorServiceMap.forEach((key, value) -> dubboProtocolExecutor.put(String.valueOf(key), (ThreadPoolExecutor) value));
} catch (Exception ex) { } catch (Exception ex) {
log.error("Failed to get Dubbo {} protocol thread pool", Version.getVersion(), ex); log.error("Failed to get Dubbo {} protocol thread pool", Version.getVersion(), ex);
} }

@ -113,6 +113,9 @@ public abstract class AbstractHystrixThreadPoolAdapter implements ThreadPoolAdap
scheduler.schedule(hystrixThreadPoolRefreshTask, taskIntervalSeconds, TimeUnit.SECONDS); scheduler.schedule(hystrixThreadPoolRefreshTask, taskIntervalSeconds, TimeUnit.SECONDS);
} }
/**
* hystrix thread-pool refresh task
*/
class HystrixThreadPoolRefreshTask implements Runnable { class HystrixThreadPoolRefreshTask implements Runnable {
private final ScheduledExecutorService scheduler; private final ScheduledExecutorService scheduler;

@ -102,6 +102,22 @@ public class Constants {
public static final int HEALTH_CHECK_INTERVAL = 5; public static final int HEALTH_CHECK_INTERVAL = 5;
public static final int MAX_CHECK_FAILURE_COUNT = 4;
public static final int INITIAL_CAPACITY = 3;
public static final int DATA_GROUP_TENANT_SIZE = 3;
public static final int ACTIVE_ALARM = 80;
public static final int CAPACITY_ALARM = 80;
public static final long EXECUTE_TIME_OUT = 10000L;
public static final int SECONDS_IN_MILLISECONDS = 1000;
public static final long FAILURE_SLEEP_INTERVAL = 25000L;
public static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors(); public static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
public static final String DEFAULT_GROUP = "default group"; public static final String DEFAULT_GROUP = "default group";
@ -110,7 +126,7 @@ public class Constants {
public static final String EXECUTE_TIMEOUT_TRACE = "executeTimeoutTrace"; public static final String EXECUTE_TIMEOUT_TRACE = "executeTimeoutTrace";
public static final int HTTP_EXECUTE_TIMEOUT = 5000; public static final long HTTP_EXECUTE_TIMEOUT = 5000L;
public static final String CLIENT_VERSION = "Client-Version"; public static final String CLIENT_VERSION = "Client-Version";

@ -38,9 +38,9 @@ import java.util.concurrent.ThreadPoolExecutor;
@Slf4j @Slf4j
public class BaseThreadDetailStateHandler implements ThreadDetailState { public class BaseThreadDetailStateHandler implements ThreadDetailState {
private final String WORKERS = "workers"; private final String workersName = "workers";
private final String THREAD = "thread"; private final String threadName = "thread";
@Override @Override
public List<ThreadDetailStateInfo> getThreadDetailStateInfo(String threadPoolId) { public List<ThreadDetailStateInfo> getThreadDetailStateInfo(String threadPoolId) {
@ -53,14 +53,14 @@ public class BaseThreadDetailStateHandler implements ThreadDetailState {
public List<ThreadDetailStateInfo> getThreadDetailStateInfo(ThreadPoolExecutor threadPoolExecutor) { public List<ThreadDetailStateInfo> getThreadDetailStateInfo(ThreadPoolExecutor threadPoolExecutor) {
List<ThreadDetailStateInfo> resultThreadStates = new ArrayList(); List<ThreadDetailStateInfo> resultThreadStates = new ArrayList();
try { try {
HashSet<Object> workers = (HashSet<Object>) ReflectUtil.getFieldValue(threadPoolExecutor, WORKERS); HashSet<Object> workers = (HashSet<Object>) ReflectUtil.getFieldValue(threadPoolExecutor, workersName);
if (CollectionUtil.isEmpty(workers)) { if (CollectionUtil.isEmpty(workers)) {
return resultThreadStates; return resultThreadStates;
} }
for (Object worker : workers) { for (Object worker : workers) {
Thread thread; Thread thread;
try { try {
thread = (Thread) ReflectUtil.getFieldValue(worker, THREAD); thread = (Thread) ReflectUtil.getFieldValue(worker, threadName);
if (thread == null) { if (thread == null) {
log.warn("Reflection get worker thread is null. Worker: {}", worker); log.warn("Reflection get worker thread is null. Worker: {}", worker);
continue; continue;

@ -35,15 +35,18 @@ import java.util.concurrent.CopyOnWriteArrayList;
public class CacheData { public class CacheData {
@Getter @Getter
public volatile String md5; private volatile String md5;
public volatile String content; private volatile String content;
public final String tenantId; @Getter
private final String tenantId;
public final String itemId; @Getter
private final String itemId;
public final String threadPoolId; @Getter
private final String threadPoolId;
@Setter @Setter
private volatile boolean isInitializing = true; private volatile boolean isInitializing = true;
@ -56,7 +59,7 @@ public class CacheData {
this.threadPoolId = threadPoolId; this.threadPoolId = threadPoolId;
this.content = ContentUtil.getPoolContent(GlobalThreadPoolManage.getPoolParameter(threadPoolId)); this.content = ContentUtil.getPoolContent(GlobalThreadPoolManage.getPoolParameter(threadPoolId));
this.md5 = getMd5String(content); this.md5 = getMd5String(content);
this.listeners = new CopyOnWriteArrayList(); this.listeners = new CopyOnWriteArrayList<>();
} }
public void addListener(Listener listener) { public void addListener(Listener listener) {

@ -33,7 +33,7 @@ public class ClientShutdown {
@Getter @Getter
private volatile boolean prepareClose = false; private volatile boolean prepareClose = false;
private final static Long TIME_OUT_SECOND = 1L; private static final Long TIME_OUT_SECOND = 1L;
private static final int DEFAULT_COUNT = 1; private static final int DEFAULT_COUNT = 1;
private final CountDownLatch countDownLatch = new CountDownLatch(DEFAULT_COUNT); private final CountDownLatch countDownLatch = new CountDownLatch(DEFAULT_COUNT);

@ -30,7 +30,6 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.DisposableBean;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.net.URLDecoder; import java.net.URLDecoder;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -43,20 +42,21 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static cn.hippo4j.common.constant.Constants.CLIENT_VERSION;
import static cn.hippo4j.common.constant.Constants.CONFIG_CONTROLLER_PATH;
import static cn.hippo4j.common.constant.Constants.CONFIG_LONG_POLL_TIMEOUT; import static cn.hippo4j.common.constant.Constants.CONFIG_LONG_POLL_TIMEOUT;
import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER_TRANSLATION; import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER_TRANSLATION;
import static cn.hippo4j.common.constant.Constants.WORD_SEPARATOR;
import static cn.hippo4j.common.constant.Constants.LINE_SEPARATOR; import static cn.hippo4j.common.constant.Constants.LINE_SEPARATOR;
import static cn.hippo4j.common.constant.Constants.LISTENER_PATH; import static cn.hippo4j.common.constant.Constants.PROBE_MODIFY_REQUEST;
import static cn.hippo4j.common.constant.Constants.LONG_PULLING_CLIENT_IDENTIFICATION; import static cn.hippo4j.common.constant.Constants.WEIGHT_CONFIGS;
import static cn.hippo4j.common.constant.Constants.LONG_PULLING_TIMEOUT; import static cn.hippo4j.common.constant.Constants.LONG_PULLING_TIMEOUT;
import static cn.hippo4j.common.constant.Constants.LONG_PULLING_CLIENT_IDENTIFICATION;
import static cn.hippo4j.common.constant.Constants.LONG_PULLING_TIMEOUT_NO_HANGUP; import static cn.hippo4j.common.constant.Constants.LONG_PULLING_TIMEOUT_NO_HANGUP;
import static cn.hippo4j.common.constant.Constants.CLIENT_VERSION;
import static cn.hippo4j.common.constant.Constants.LISTENER_PATH;
import static cn.hippo4j.common.constant.Constants.INITIAL_CAPACITY;
import static cn.hippo4j.common.constant.Constants.DATA_GROUP_TENANT_SIZE;
import static cn.hippo4j.common.constant.Constants.CONFIG_CONTROLLER_PATH;
import static cn.hippo4j.common.constant.Constants.NULL; import static cn.hippo4j.common.constant.Constants.NULL;
import static cn.hippo4j.common.constant.Constants.PROBE_MODIFY_REQUEST;
import static cn.hippo4j.common.constant.Constants.WEIGHT_CONFIGS;
import static cn.hippo4j.common.constant.Constants.WORD_SEPARATOR;
/** /**
* Client worker. * Client worker.
@ -77,6 +77,8 @@ public class ClientWorker implements DisposableBean {
private final CountDownLatch cacheCondition = new CountDownLatch(1); private final CountDownLatch cacheCondition = new CountDownLatch(1);
private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap<>(16); private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap<>(16);
private final long defaultTimedOut = 3000L;
@SuppressWarnings("all") @SuppressWarnings("all")
public ClientWorker(HttpAgent httpAgent, public ClientWorker(HttpAgent httpAgent,
String identify, String identify,
@ -113,13 +115,16 @@ public class ClientWorker implements DisposableBean {
executorService.shutdownNow(); executorService.shutdownNow();
} }
/**
* LongPollingRunnable
*/
class LongPollingRunnable implements Runnable { class LongPollingRunnable implements Runnable {
private boolean cacheMapInitEmptyFlag; private boolean cacheMapInitEmptyFlag;
private final CountDownLatch cacheCondition; private final CountDownLatch cacheCondition;
public LongPollingRunnable(boolean cacheMapInitEmptyFlag, CountDownLatch cacheCondition) { LongPollingRunnable(boolean cacheMapInitEmptyFlag, CountDownLatch cacheCondition) {
this.cacheMapInitEmptyFlag = cacheMapInitEmptyFlag; this.cacheMapInitEmptyFlag = cacheMapInitEmptyFlag;
this.cacheCondition = cacheCondition; this.cacheCondition = cacheCondition;
} }
@ -147,7 +152,7 @@ public class ClientWorker implements DisposableBean {
String itemId = keys[1]; String itemId = keys[1];
String namespace = keys[2]; String namespace = keys[2];
try { try {
String content = getServerConfig(namespace, itemId, tpId, 3000L); String content = getServerConfig(namespace, itemId, tpId, defaultTimedOut);
CacheData cacheData = cacheMap.get(tpId); CacheData cacheData = cacheMap.get(tpId);
String poolContent = ContentUtil.getPoolContent(JSONUtil.parseObject(content, ThreadPoolParameterInfo.class)); String poolContent = ContentUtil.getPoolContent(JSONUtil.parseObject(content, ThreadPoolParameterInfo.class));
cacheData.setContent(poolContent); cacheData.setContent(poolContent);
@ -157,7 +162,7 @@ public class ClientWorker implements DisposableBean {
} }
for (CacheData cacheData : cacheDataList) { for (CacheData cacheData : cacheDataList) {
if (!cacheData.isInitializing() || inInitializingCacheList if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.threadPoolId, cacheData.itemId, cacheData.tenantId))) { .contains(GroupKey.getKeyTenant(cacheData.getThreadPoolId(), cacheData.getItemId(), cacheData.getTenantId()))) {
cacheData.checkListenerMd5(); cacheData.checkListenerMd5();
cacheData.setInitializing(false); cacheData.setInitializing(false);
} }
@ -170,13 +175,13 @@ public class ClientWorker implements DisposableBean {
private List<String> checkUpdateDataIds(List<CacheData> cacheDataList, List<String> inInitializingCacheList) { private List<String> checkUpdateDataIds(List<CacheData> cacheDataList, List<String> inInitializingCacheList) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
for (CacheData cacheData : cacheDataList) { for (CacheData cacheData : cacheDataList) {
sb.append(cacheData.threadPoolId).append(WORD_SEPARATOR); sb.append(cacheData.getThreadPoolId()).append(WORD_SEPARATOR);
sb.append(cacheData.itemId).append(WORD_SEPARATOR); sb.append(cacheData.getItemId()).append(WORD_SEPARATOR);
sb.append(cacheData.tenantId).append(WORD_SEPARATOR); sb.append(cacheData.getTenantId()).append(WORD_SEPARATOR);
sb.append(identify).append(WORD_SEPARATOR); sb.append(identify).append(WORD_SEPARATOR);
sb.append(cacheData.getMd5()).append(LINE_SEPARATOR); sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
if (cacheData.isInitializing()) { if (cacheData.isInitializing()) {
inInitializingCacheList.add(GroupKey.getKeyTenant(cacheData.threadPoolId, cacheData.itemId, cacheData.tenantId)); inInitializingCacheList.add(GroupKey.getKeyTenant(cacheData.getThreadPoolId(), cacheData.getItemId(), cacheData.getTenantId()));
} }
} }
boolean isInitializingCacheList = !inInitializingCacheList.isEmpty(); boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
@ -213,7 +218,7 @@ public class ClientWorker implements DisposableBean {
} }
public String getServerConfig(String namespace, String itemId, String threadPoolId, long readTimeout) { public String getServerConfig(String namespace, String itemId, String threadPoolId, long readTimeout) {
Map<String, String> params = new HashMap<>(3); Map<String, String> params = new HashMap<>(INITIAL_CAPACITY);
params.put("namespace", namespace); params.put("namespace", namespace);
params.put("itemId", itemId); params.put("itemId", itemId);
params.put("tpId", threadPoolId); params.put("tpId", threadPoolId);
@ -241,7 +246,7 @@ public class ClientWorker implements DisposableBean {
String[] keyArr = dataIdAndGroup.split(WORD_SEPARATOR); String[] keyArr = dataIdAndGroup.split(WORD_SEPARATOR);
String dataId = keyArr[0]; String dataId = keyArr[0];
String group = keyArr[1]; String group = keyArr[1];
if (keyArr.length == 3) { if (keyArr.length == DATA_GROUP_TENANT_SIZE) {
String tenant = keyArr[2]; String tenant = keyArr[2];
updateList.add(GroupKey.getKeyTenant(dataId, group, tenant)); updateList.add(GroupKey.getKeyTenant(dataId, group, tenant));
log.info("[{}] Refresh thread pool changed.", dataId); log.info("[{}] Refresh thread pool changed.", dataId);
@ -274,7 +279,7 @@ public class ClientWorker implements DisposableBean {
if (lastCacheData == null) { if (lastCacheData == null) {
String serverConfig; String serverConfig;
try { try {
serverConfig = getServerConfig(namespace, itemId, threadPoolId, 3000L); serverConfig = getServerConfig(namespace, itemId, threadPoolId, defaultTimedOut);
ThreadPoolParameterInfo poolInfo = JSONUtil.parseObject(serverConfig, ThreadPoolParameterInfo.class); ThreadPoolParameterInfo poolInfo = JSONUtil.parseObject(serverConfig, ThreadPoolParameterInfo.class);
cacheData.setContent(ContentUtil.getPoolContent(poolInfo)); cacheData.setContent(ContentUtil.getPoolContent(poolInfo));
} catch (Exception ex) { } catch (Exception ex) {

@ -51,6 +51,8 @@ public class DiscoveryClient implements DisposableBean {
private static final String PREFIX = "DiscoveryClient_"; private static final String PREFIX = "DiscoveryClient_";
private final String appPathIdentifier; private final String appPathIdentifier;
private final int delayTime = 30;
public DiscoveryClient(HttpAgent httpAgent, InstanceInfo instanceInfo, ClientShutdown hippo4jClientShutdown) { public DiscoveryClient(HttpAgent httpAgent, InstanceInfo instanceInfo, ClientShutdown hippo4jClientShutdown) {
this.httpAgent = httpAgent; this.httpAgent = httpAgent;
this.instanceInfo = instanceInfo; this.instanceInfo = instanceInfo;
@ -65,7 +67,7 @@ public class DiscoveryClient implements DisposableBean {
} }
private void initScheduledTasks() { private void initScheduledTasks() {
scheduler.scheduleWithFixedDelay(new HeartbeatThread(), 30, 30, TimeUnit.SECONDS); scheduler.scheduleWithFixedDelay(new HeartbeatThread(), delayTime, delayTime, TimeUnit.SECONDS);
} }
boolean register() { boolean register() {
@ -118,6 +120,9 @@ public class DiscoveryClient implements DisposableBean {
hippo4jClientShutdown.prepareDestroy(); hippo4jClientShutdown.prepareDestroy();
} }
/**
* HeartbeatThread
*/
public class HeartbeatThread implements Runnable { public class HeartbeatThread implements Runnable {
@Override @Override

@ -41,10 +41,12 @@ public class DynamicThreadPoolSubscribeConfig {
private final BootstrapProperties properties; private final BootstrapProperties properties;
private final int defaultAliveTime = 2000;
private final ExecutorService configRefreshExecutorService = ThreadPoolBuilder.builder() private final ExecutorService configRefreshExecutorService = ThreadPoolBuilder.builder()
.corePoolSize(1) .corePoolSize(1)
.maximumPoolSize(2) .maximumPoolSize(2)
.keepAliveTime(2000) .keepAliveTime(defaultAliveTime)
.timeUnit(TimeUnit.MILLISECONDS) .timeUnit(TimeUnit.MILLISECONDS)
.workQueue(BlockingQueueTypeEnum.SYNCHRONOUS_QUEUE) .workQueue(BlockingQueueTypeEnum.SYNCHRONOUS_QUEUE)
.allowCoreThreadTimeOut(true) .allowCoreThreadTimeOut(true)

@ -31,8 +31,10 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import static cn.hippo4j.common.constant.Constants.MAX_CHECK_FAILURE_COUNT;
import static cn.hippo4j.common.constant.Constants.SECONDS_IN_MILLISECONDS;
import static cn.hippo4j.common.constant.Constants.HEALTH_CHECK_INTERVAL; import static cn.hippo4j.common.constant.Constants.HEALTH_CHECK_INTERVAL;
import static cn.hippo4j.common.constant.Constants.FAILURE_SLEEP_INTERVAL;
/** /**
* Abstract health check. * Abstract health check.
@ -99,10 +101,10 @@ public abstract class AbstractHealthCheck implements ServerHealthCheck, Initiali
} else { } else {
healthStatus = false; healthStatus = false;
checkFailureCount++; checkFailureCount++;
if (checkFailureCount > 1 && checkFailureCount < 4) { if (checkFailureCount > 1 && checkFailureCount < MAX_CHECK_FAILURE_COUNT) {
ThreadUtil.sleep(HEALTH_CHECK_INTERVAL * 1000 * (checkFailureCount - 1)); ThreadUtil.sleep((long) HEALTH_CHECK_INTERVAL * SECONDS_IN_MILLISECONDS * (checkFailureCount - 1));
} else if (checkFailureCount >= 4) { } else if (checkFailureCount >= MAX_CHECK_FAILURE_COUNT) {
ThreadUtil.sleep(25000L); ThreadUtil.sleep(FAILURE_SLEEP_INTERVAL);
} }
} }
} }

@ -88,13 +88,16 @@ public class ServerListManager {
return new ServerAddressIterator(serverUrls); return new ServerAddressIterator(serverUrls);
} }
/**
* Server Address Iterator
*/
private static class ServerAddressIterator implements Iterator<String> { private static class ServerAddressIterator implements Iterator<String> {
final List<RandomizedServerAddress> sorted; final List<RandomizedServerAddress> sorted;
final Iterator<RandomizedServerAddress> iter; final Iterator<RandomizedServerAddress> iter;
public ServerAddressIterator(List<String> source) { ServerAddressIterator(List<String> source) {
sorted = new ArrayList(); sorted = new ArrayList();
for (String address : source) { for (String address : source) {
sorted.add(new RandomizedServerAddress(address)); sorted.add(new RandomizedServerAddress(address));
@ -113,6 +116,9 @@ public class ServerListManager {
return null; return null;
} }
/**
* Randomized Server Address
*/
static class RandomizedServerAddress implements Comparable<RandomizedServerAddress> { static class RandomizedServerAddress implements Comparable<RandomizedServerAddress> {
static Random random = new Random(); static Random random = new Random();
@ -123,7 +129,7 @@ public class ServerListManager {
int seed; int seed;
public RandomizedServerAddress(String ip) { RandomizedServerAddress(String ip) {
try { try {
this.serverIp = ip; this.serverIp = ip;
/* /*

@ -39,6 +39,8 @@ public class SecurityProxy {
private static final String APPLY_TOKEN_URL = Constants.BASE_PATH + "/auth/users/apply/token"; private static final String APPLY_TOKEN_URL = Constants.BASE_PATH + "/auth/users/apply/token";
private final int refreshWindowDuration = 10;
private final String username; private final String username;
private final String password; private final String password;
@ -88,7 +90,7 @@ public class SecurityProxy {
TokenInfo tokenInfo = JSONUtil.parseObject(tokenJsonStr, TokenInfo.class); TokenInfo tokenInfo = JSONUtil.parseObject(tokenJsonStr, TokenInfo.class);
accessToken = tokenInfo.getAccessToken(); accessToken = tokenInfo.getAccessToken();
tokenTtl = tokenInfo.getTokenTtl(); tokenTtl = tokenInfo.getTokenTtl();
tokenRefreshWindow = tokenTtl / 10; tokenRefreshWindow = tokenTtl / refreshWindowDuration;
} catch (Throwable ex) { } catch (Throwable ex) {
log.error("Failed to apply for token. message: {}", ex.getMessage()); log.error("Failed to apply for token. message: {}", ex.getMessage());
return false; return false;

@ -53,10 +53,14 @@ import java.util.Optional;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static cn.hippo4j.common.constant.Constants.INITIAL_CAPACITY;
import static cn.hippo4j.common.constant.Constants.TP_ID;
import static cn.hippo4j.common.constant.Constants.ITEM_ID; import static cn.hippo4j.common.constant.Constants.ITEM_ID;
import static cn.hippo4j.common.constant.Constants.NAMESPACE; import static cn.hippo4j.common.constant.Constants.NAMESPACE;
import static cn.hippo4j.common.constant.Constants.TP_ID; import static cn.hippo4j.common.constant.Constants.ACTIVE_ALARM;
import static cn.hippo4j.common.constant.Constants.CAPACITY_ALARM;
import static cn.hippo4j.common.constant.Constants.EXECUTE_TIME_OUT;
import static cn.hippo4j.common.constant.Constants.HTTP_EXECUTE_TIMEOUT;
/** /**
* Dynamic thread-pool post processor. * Dynamic thread-pool post processor.
@ -94,7 +98,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
return bean; return bean;
} }
DynamicThreadPoolExecutor dynamicThreadPoolExecutor; DynamicThreadPoolExecutor dynamicThreadPoolExecutor;
if ((dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean)) == null) { dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean);
if ((dynamicThreadPoolExecutor) == null) {
dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean; dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean;
} }
DynamicThreadPoolWrapper dynamicThreadPoolWrapper = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor); DynamicThreadPoolWrapper dynamicThreadPoolWrapper = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor);
@ -128,16 +133,17 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
protected ThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) { protected ThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) {
String threadPoolId = dynamicThreadPoolWrapper.getThreadPoolId(); String threadPoolId = dynamicThreadPoolWrapper.getThreadPoolId();
ThreadPoolExecutor executor = dynamicThreadPoolWrapper.getExecutor(); ThreadPoolExecutor executor = dynamicThreadPoolWrapper.getExecutor();
Map<String, String> queryStrMap = new HashMap(3); Map<String, String> queryStrMap = new HashMap<>(INITIAL_CAPACITY);
queryStrMap.put(TP_ID, threadPoolId); queryStrMap.put(TP_ID, threadPoolId);
queryStrMap.put(ITEM_ID, properties.getItemId()); queryStrMap.put(ITEM_ID, properties.getItemId());
queryStrMap.put(NAMESPACE, properties.getNamespace()); queryStrMap.put(NAMESPACE, properties.getNamespace());
ThreadPoolParameterInfo threadPoolParameterInfo = new ThreadPoolParameterInfo(); ThreadPoolParameterInfo threadPoolParameterInfo = new ThreadPoolParameterInfo();
try { try {
Result result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 5000L); Result result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, HTTP_EXECUTE_TIMEOUT);
if (result.isSuccess() && result.getData() != null) { if (result.isSuccess() && result.getData() != null) {
String resultJsonStr = JSONUtil.toJSONString(result.getData()); String resultJsonStr = JSONUtil.toJSONString(result.getData());
if ((threadPoolParameterInfo = JSONUtil.parseObject(resultJsonStr, ThreadPoolParameterInfo.class)) != null) { threadPoolParameterInfo = JSONUtil.parseObject(resultJsonStr, ThreadPoolParameterInfo.class);
if (threadPoolParameterInfo != null) {
threadPoolParamReplace(executor, threadPoolParameterInfo); threadPoolParamReplace(executor, threadPoolParameterInfo);
registerNotifyAlarm(threadPoolParameterInfo); registerNotifyAlarm(threadPoolParameterInfo);
} }
@ -153,9 +159,9 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.allowCoreThreadTimeOut(executor.allowsCoreThreadTimeOut()) .allowCoreThreadTimeOut(executor.allowsCoreThreadTimeOut())
.keepAliveTime(executor.getKeepAliveTime(TimeUnit.MILLISECONDS)) .keepAliveTime(executor.getKeepAliveTime(TimeUnit.MILLISECONDS))
.isAlarm(false) .isAlarm(false)
.activeAlarm(80) .activeAlarm(ACTIVE_ALARM)
.capacityAlarm(80) .capacityAlarm(CAPACITY_ALARM)
.executeTimeOut(10000L) .executeTimeOut(EXECUTE_TIME_OUT)
.rejectedPolicyType(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(executor.getRejectedExecutionHandler().getClass().getSimpleName())) .rejectedPolicyType(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(executor.getRejectedExecutionHandler().getClass().getSimpleName()))
.build(); .build();
DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder() DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder()

Loading…
Cancel
Save