diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/constant/ChangeThreadPoolConstants.java b/hippo4j-common/src/main/java/cn/hippo4j/common/constant/ChangeThreadPoolConstants.java new file mode 100644 index 00000000..f18496d8 --- /dev/null +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/constant/ChangeThreadPoolConstants.java @@ -0,0 +1,22 @@ +package cn.hippo4j.common.constant; + +/** + * Change thread-pool constants. + * + * @author chen.ma + * @date 2022/5/1 17:25 + */ +public class ChangeThreadPoolConstants { + + public static final String CHANGE_THREAD_POOL_TEXT = "[{}] Changed thread pool. " + + "\n coreSize :: [{}]" + + "\n maxSize :: [{}]" + + "\n queueType :: [{}]" + + "\n capacity :: [{}]" + + "\n keepAliveTime :: [{}]" + + "\n executeTimeOut :: [{}]" + + "\n rejectedType :: [{}]" + + "\n allowCoreThreadTimeOut :: [{}]"; + + public static final String CHANGE_DELIMITER = "%s => %s"; +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/config/BootstrapPropertiesInterface.java b/hippo4j-core/src/main/java/cn/hippo4j/core/config/BootstrapPropertiesInterface.java index d70d24ce..1c5e2d05 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/config/BootstrapPropertiesInterface.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/config/BootstrapPropertiesInterface.java @@ -87,5 +87,4 @@ public interface BootstrapPropertiesInterface { default Boolean getBanner() { return null; } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/config/ConfigEmptyException.java b/hippo4j-core/src/main/java/cn/hippo4j/core/config/ConfigEmptyException.java index 0e596d0a..913c092e 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/config/ConfigEmptyException.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/config/ConfigEmptyException.java @@ -39,5 +39,4 @@ public class ConfigEmptyException extends RuntimeException { * action */ private String action; - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/config/UtilAutoConfiguration.java b/hippo4j-core/src/main/java/cn/hippo4j/core/config/UtilAutoConfiguration.java index 596704f8..3420f044 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/config/UtilAutoConfiguration.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/config/UtilAutoConfiguration.java @@ -37,5 +37,4 @@ public class UtilAutoConfiguration { public InetUtils hippo4JInetUtils(InetUtilsProperties properties) { return new InetUtils(properties); } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/config/WebThreadPoolConfiguration.java b/hippo4j-core/src/main/java/cn/hippo4j/core/config/WebThreadPoolConfiguration.java index cb181388..8e6e821d 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/config/WebThreadPoolConfiguration.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/config/WebThreadPoolConfiguration.java @@ -77,5 +77,4 @@ public class WebThreadPoolConfiguration { public WebThreadPoolHandlerChoose webThreadPoolServiceChoose() { return new WebThreadPoolHandlerChoose(); } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/enable/BeforeCheckConfiguration.java b/hippo4j-core/src/main/java/cn/hippo4j/core/enable/BeforeCheckConfiguration.java index 91ed0a8c..2160ffe2 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/enable/BeforeCheckConfiguration.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/enable/BeforeCheckConfiguration.java @@ -51,35 +51,30 @@ public class BeforeCheckConfiguration { "Web server failed to start. The dynamic thread pool username is empty.", "Please check whether the [spring.dynamic.thread-pool.username] configuration is empty or an empty string."); } - String password = properties.getPassword(); if (StringUtil.isBlank(password)) { throw new ConfigEmptyException( "Web server failed to start. The dynamic thread pool password is empty.", "Please check whether the [spring.dynamic.thread-pool.password] configuration is empty or an empty string."); } - String namespace = properties.getNamespace(); if (StringUtil.isBlank(namespace)) { throw new ConfigEmptyException( "Web server failed to start. The dynamic thread pool namespace is empty.", "Please check whether the [spring.dynamic.thread-pool.namespace] configuration is empty or an empty string."); } - String itemId = properties.getItemId(); if (StringUtil.isBlank(itemId)) { throw new ConfigEmptyException( "Web server failed to start. The dynamic thread pool item id is empty.", "Please check whether the [spring.dynamic.thread-pool.item-id] configuration is empty or an empty string."); } - String serverAddr = properties.getServerAddr(); if (StringUtil.isBlank(serverAddr)) { throw new ConfigEmptyException( "Web server failed to start. The dynamic thread pool server addr is empty.", "Please check whether the [spring.dynamic.thread-pool.server-addr] configuration is empty or an empty string."); } - String applicationName = environment.getProperty("spring.application.name"); if (StringUtil.isBlank(applicationName)) { throw new ConfigEmptyException( @@ -94,5 +89,4 @@ public class BeforeCheckConfiguration { public class BeforeCheck { } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/enable/MarkerConfiguration.java b/hippo4j-core/src/main/java/cn/hippo4j/core/enable/MarkerConfiguration.java index 8ca7f74c..3e8d8f37 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/enable/MarkerConfiguration.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/enable/MarkerConfiguration.java @@ -37,5 +37,4 @@ public class MarkerConfiguration { public class Marker { } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java index 9bd3b0e1..519c15ce 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java @@ -71,11 +71,9 @@ public class DynamicThreadPoolExecutor extends AbstractDynamicExecutorSupport { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, waitForTasksToCompleteOnShutdown, awaitTerminationMillis, workQueue, threadPoolId, threadFactory, handler); this.threadPoolId = threadPoolId; this.executeTimeOut = executeTimeOut; - // Number of dynamic proxy denial policies. RejectedExecutionHandler rejectedProxy = RejectedProxyUtil.createProxy(handler, threadPoolId, rejectCount); setRejectedExecutionHandler(rejectedProxy); - // Redundant fields to avoid reflecting the acquired fields when sending change information. redundancyHandler = handler; } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java index 456e0b26..5141e62c 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java @@ -35,83 +35,29 @@ import java.util.concurrent.ThreadPoolExecutor; @Data public class DynamicThreadPoolWrapper implements DisposableBean { - /** - * Tenant id - */ - private String tenantId; + private String tenantId, itemId, threadPoolId; - /** - * Item id - */ - private String itemId; + private boolean subscribeFlag, initFlag; - /** - * Thread pool id - */ - private String tpId; - - /** - * Subscribe flag - */ - private boolean subscribeFlag; - - /** - * Init flag - */ - private boolean initFlag; - - /** - * executor - * {@link DynamicThreadPoolExecutor} - */ private ThreadPoolExecutor executor; - /** - * 首选服务端线程池, 为空使用默认线程池 {@link CommonDynamicThreadPool#getInstance(String)} - * - * @param threadPoolId - */ public DynamicThreadPoolWrapper(String threadPoolId) { this(threadPoolId, CommonDynamicThreadPool.getInstance(threadPoolId)); } - /** - * 首选服务端线程池, 为空使用 threadPoolExecutor. - * - * @param threadPoolId - * @param threadPoolExecutor - */ public DynamicThreadPoolWrapper(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) { - this.tpId = threadPoolId; + this.threadPoolId = threadPoolId; this.executor = threadPoolExecutor; } - /** - * 提交任务. - * - * @param command - */ public void execute(Runnable command) { executor.execute(command); } - /** - * 提交任务. - * - * @param task - * @return - */ public Future submit(Runnable task) { return executor.submit(task); } - /** - * 提交任务. - * - * @param task - * @param - * @return - */ public Future submit(Callable task) { return executor.submit(task); } @@ -122,5 +68,4 @@ public class DynamicThreadPoolWrapper implements DisposableBean { ((AbstractDynamicExecutorSupport) executor).destroy(); } } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java index 02305e20..015855d1 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java @@ -107,10 +107,8 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner if (hippoSendMessageService == null) { return; } - ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(threadPoolId); BlockingQueue blockIngQueue = threadPoolExecutor.getQueue(); - int queueSize = blockIngQueue.size(); int capacity = queueSize + blockIngQueue.remainingCapacity(); int divide = CalculateUtil.divide(queueSize, capacity); @@ -133,7 +131,6 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner int activeCount = threadPoolExecutor.getActiveCount(); int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize(); int divide = CalculateUtil.divide(activeCount, maximumPoolSize); - ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(threadPoolId); boolean isSend = threadPoolNotifyAlarm.getIsAlarm() && divide > threadPoolNotifyAlarm.getActiveAlarm(); @@ -154,7 +151,6 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner if (Objects.isNull(threadPoolNotifyAlarm) || !threadPoolNotifyAlarm.getIsAlarm()) { return; } - ThreadPoolExecutor threadPoolExecutor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor(); checkPoolRejectedAlarm(threadPoolId, threadPoolExecutor); } @@ -186,19 +182,16 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner if (Objects.isNull(threadPoolNotifyAlarm) || !threadPoolNotifyAlarm.getIsAlarm()) { return; } - if (threadPoolExecutor instanceof DynamicThreadPoolExecutor) { try { AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyReq(threadPoolExecutor); alarmNotifyRequest.setThreadPoolId(threadPoolId); alarmNotifyRequest.setExecuteTime(executeTime); alarmNotifyRequest.setExecuteTimeOut(executeTimeOut); - String executeTimeoutTrace = TraceContextUtil.getAndRemove(); if (StringUtil.isNotBlank(executeTimeoutTrace)) { alarmNotifyRequest.setExecuteTimeoutTrace(executeTimeoutTrace); } - Runnable task = () -> hippoSendMessageService.sendAlarmMessage(NotifyTypeEnum.TIMEOUT, alarmNotifyRequest); EXECUTE_TIMEOUT_EXECUTOR.execute(task); } catch (Throwable ex) { @@ -217,7 +210,6 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner String appName = StrUtil.isBlank(itemId) ? applicationName : itemId; request.setAppName(appName); request.setIdentify(IdentifyUtil.getIdentify()); - hippoSendMessageService.sendChangeMessage(request); } @@ -229,10 +221,8 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner */ public AlarmNotifyRequest buildAlarmNotifyReq(ThreadPoolExecutor threadPoolExecutor) { AlarmNotifyRequest request = new AlarmNotifyRequest(); - String appName = StrUtil.isBlank(itemId) ? applicationName : itemId; request.setAppName(appName); - // 核心线程数 int corePoolSize = threadPoolExecutor.getCorePoolSize(); // 最大线程数 @@ -245,7 +235,6 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner int largestPoolSize = threadPoolExecutor.getLargestPoolSize(); // 线程池中执行任务总数量 (有锁) long completedTaskCount = threadPoolExecutor.getCompletedTaskCount(); - request.setActive(active.toUpperCase()); request.setIdentify(IdentifyUtil.getIdentify()); request.setCorePoolSize(corePoolSize); @@ -254,7 +243,6 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner request.setActiveCount(activeCount); request.setLargestPoolSize(largestPoolSize); request.setCompletedTaskCount(completedTaskCount); - BlockingQueue queue = threadPoolExecutor.getQueue(); // 队列元素个数 int queueSize = queue.size(); @@ -268,18 +256,14 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner request.setCapacity(queueCapacity); request.setQueueSize(queueSize); request.setRemainingCapacity(remainingCapacity); - RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor instanceof DynamicThreadPoolExecutor ? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRedundancyHandler() : threadPoolExecutor.getRejectedExecutionHandler(); request.setRejectedExecutionHandlerName(rejectedExecutionHandler.getClass().getSimpleName()); - long rejectCount = threadPoolExecutor instanceof DynamicThreadPoolExecutor ? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRejectCountNum() : -1L; request.setRejectCountNum(rejectCount); - return request; } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/manage/GlobalNotifyAlarmManage.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/manage/GlobalNotifyAlarmManage.java index 1d1c15d6..414f595b 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/manage/GlobalNotifyAlarmManage.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/manage/GlobalNotifyAlarmManage.java @@ -54,5 +54,4 @@ public class GlobalNotifyAlarmManage { public static void put(String key, ThreadPoolNotifyAlarm val) { NOTIFY_ALARM_MAP.put(key, val); } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/manage/GlobalThreadPoolManage.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/manage/GlobalThreadPoolManage.java index 885d775d..d17c1620 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/manage/GlobalThreadPoolManage.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/manage/GlobalThreadPoolManage.java @@ -113,5 +113,4 @@ public class GlobalThreadPoolManage { public static Integer getThreadPoolNum() { return listThreadPoolId().size(); } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/AbstractThreadPoolRuntime.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/AbstractThreadPoolRuntime.java index 5eb2162e..b023c62b 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/AbstractThreadPoolRuntime.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/AbstractThreadPoolRuntime.java @@ -83,7 +83,6 @@ public abstract class AbstractThreadPoolRuntime { String currentLoad = CalculateUtil.divide(activeCount, maximumPoolSize) + ""; // 峰值负载 String peakLoad = CalculateUtil.divide(largestPoolSize, maximumPoolSize) + ""; - BlockingQueue queue = pool.getQueue(); // 队列元素个数 int queueSize = queue.size(); @@ -93,7 +92,6 @@ public abstract class AbstractThreadPoolRuntime { int remainingCapacity = queue.remainingCapacity(); // 队列容量 int queueCapacity = queueSize + remainingCapacity; - stateInfo.setCoreSize(corePoolSize); stateInfo.setTpId(threadPoolId); stateInfo.setPoolSize(poolSize); @@ -107,7 +105,6 @@ public abstract class AbstractThreadPoolRuntime { stateInfo.setQueueRemainingCapacity(remainingCapacity); stateInfo.setLargestPoolSize(largestPoolSize); stateInfo.setCompletedTaskCount(completedTaskCount); - long rejectCount = pool instanceof DynamicThreadPoolExecutor ? ((DynamicThreadPoolExecutor) pool).getRejectCountNum() : -1L; stateInfo.setRejectCount(rejectCount); @@ -115,5 +112,4 @@ public abstract class AbstractThreadPoolRuntime { stateInfo.setTimestamp(System.currentTimeMillis()); return supplement(stateInfo); } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java index b3f78493..4d441ca5 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java @@ -59,19 +59,15 @@ public class ThreadPoolRunStateHandler extends AbstractThreadPoolRuntime { ByteConvertUtil.getPrintSize(runtimeInfo.getTotalMemory()), " / 最大可用: ", ByteConvertUtil.getPrintSize(runtimeInfo.getMaxMemory())).toString(); - poolRunStateInfo.setCurrentLoad(poolRunStateInfo.getCurrentLoad() + "%"); poolRunStateInfo.setPeakLoad(poolRunStateInfo.getPeakLoad() + "%"); - String ipAddress = hippo4JInetUtils.findFirstNonLoopbackHostInfo().getIpAddress(); poolRunStateInfo.setHost(ipAddress); poolRunStateInfo.setMemoryProportion(memoryProportion); poolRunStateInfo.setFreeMemory(ByteConvertUtil.getPrintSize(runtimeInfo.getFreeMemory())); - String threadPoolId = poolRunStateInfo.getTpId(); DynamicThreadPoolWrapper executorService = GlobalThreadPoolManage.getExecutorService(threadPoolId); ThreadPoolExecutor pool = executorService.getExecutor(); - String rejectedName; if (pool instanceof AbstractDynamicExecutorSupport) { rejectedName = ((DynamicThreadPoolExecutor) pool).getRedundancyHandler().getClass().getSimpleName(); @@ -79,17 +75,12 @@ public class ThreadPoolRunStateHandler extends AbstractThreadPoolRuntime { rejectedName = pool.getRejectedExecutionHandler().getClass().getSimpleName(); } poolRunStateInfo.setRejectedName(rejectedName); - ManyPoolRunStateInfo manyPoolRunStateInfo = BeanUtil.toBean(poolRunStateInfo, ManyPoolRunStateInfo.class); manyPoolRunStateInfo.setIdentify(CLIENT_IDENTIFICATION_VALUE); - String active = environment.getProperty("spring.profiles.active", "UNKNOWN"); manyPoolRunStateInfo.setActive(active.toUpperCase()); - String threadPoolState = ThreadPoolStatusHandler.getThreadPoolState(pool); manyPoolRunStateInfo.setState(threadPoolState); - return manyPoolRunStateInfo; } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolStatusHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolStatusHandler.java index 58c4a3bd..62eb34d9 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolStatusHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolStatusHandler.java @@ -53,14 +53,12 @@ public class ThreadPoolStatusHandler { try { Method runStateLessThan = ReflectUtil.getMethodByName(ThreadPoolExecutor.class, "runStateLessThan"); cn.hippo4j.common.toolkit.ReflectUtil.setAccessible(runStateLessThan); - AtomicInteger ctl = (AtomicInteger) ReflectUtil.getFieldValue(executor, "ctl"); int shutdown = (int) ReflectUtil.getFieldValue(executor, "SHUTDOWN"); boolean runStateLessThanBool = ReflectUtil.invoke(executor, runStateLessThan, ctl.get(), shutdown); if (runStateLessThanBool) { return RUNNING; } - Method runStateAtLeast = ReflectUtil.getMethodByName(ThreadPoolExecutor.class, "runStateAtLeast"); cn.hippo4j.common.toolkit.ReflectUtil.setAccessible(runStateAtLeast); int terminated = (int) ReflectUtil.getFieldValue(executor, "TERMINATED"); @@ -68,12 +66,9 @@ public class ThreadPoolStatusHandler { return resultStatus; } catch (Exception ex) { log.error("Failed to get thread pool status.", ex); - EXCEPTION_FLAG.set(Boolean.FALSE); } } - return "UNKNOWN"; } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractBuildThreadPoolTemplate.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractBuildThreadPoolTemplate.java index c9ca3a2b..ef4940f0 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractBuildThreadPoolTemplate.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractBuildThreadPoolTemplate.java @@ -77,7 +77,6 @@ public class AbstractBuildThreadPoolTemplate { } catch (IllegalArgumentException ex) { throw new IllegalArgumentException("Error creating thread pool parameter.", ex); } - executorService.allowCoreThreadTimeOut(initParam.allowCoreThreadTimeOut); return executorService; } @@ -111,7 +110,6 @@ public class AbstractBuildThreadPoolTemplate { } catch (IllegalArgumentException ex) { throw new IllegalArgumentException("Error creating thread pool parameter.", ex); } - taskQueue.setExecutor(fastThreadPoolExecutor); fastThreadPoolExecutor.allowCoreThreadTimeOut(initParam.allowCoreThreadTimeOut); return fastThreadPoolExecutor; @@ -142,7 +140,6 @@ public class AbstractBuildThreadPoolTemplate { } catch (IllegalArgumentException ex) { throw new IllegalArgumentException(String.format("Error creating thread pool parameter. threadPool id :: %s", initParam.getThreadPoolId()), ex); } - dynamicThreadPoolExecutor.setTaskDecorator(initParam.getTaskDecorator()); dynamicThreadPoolExecutor.allowCoreThreadTimeOut(initParam.allowCoreThreadTimeOut); return dynamicThreadPoolExecutor; diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractDynamicExecutorSupport.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractDynamicExecutorSupport.java index cc28f860..aeb39c88 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractDynamicExecutorSupport.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractDynamicExecutorSupport.java @@ -169,5 +169,4 @@ public abstract class AbstractDynamicExecutorSupport extends ThreadPoolExecutor } } } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/CommonDynamicThreadPool.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/CommonDynamicThreadPool.java index 1c8d25fa..0e5af10a 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/CommonDynamicThreadPool.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/CommonDynamicThreadPool.java @@ -45,5 +45,4 @@ public class CommonDynamicThreadPool { .build(); return poolExecutor; } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/QueueTypeEnum.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/QueueTypeEnum.java index c07db712..6046f9a3 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/QueueTypeEnum.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/QueueTypeEnum.java @@ -96,14 +96,12 @@ public enum QueueTypeEnum { .filter(each -> Objects.equals(each.name, blockingQueueName)) .findFirst() .orElse(null); - if (queueTypeEnum != null) { blockingQueue = createBlockingQueue(queueTypeEnum.type, capacity); if (Objects.equals(blockingQueue.getClass().getSimpleName(), blockingQueueName)) { return blockingQueue; } } - Collection customBlockingQueues = DynamicThreadPoolServiceLoader .getSingletonServiceInstances(CustomBlockingQueue.class); blockingQueue = Optional.ofNullable(blockingQueue) @@ -120,7 +118,6 @@ public enum QueueTypeEnum { return new LinkedBlockingQueue(temCapacity); })); - return blockingQueue; } @@ -148,7 +145,6 @@ public enum QueueTypeEnum { } else if (Objects.equals(type, RESIZABLE_LINKED_BLOCKING_QUEUE.type)) { blockingQueue = new ResizableCapacityLinkedBlockIngQueue(capacity); } - Collection customBlockingQueues = DynamicThreadPoolServiceLoader .getSingletonServiceInstances(CustomBlockingQueue.class); blockingQueue = Optional.ofNullable(blockingQueue).orElseGet(() -> customBlockingQueues.stream() @@ -156,7 +152,6 @@ public enum QueueTypeEnum { .map(each -> each.generateBlockingQueue()) .findFirst() .orElse(new LinkedBlockingQueue(capacity))); - return blockingQueue; } @@ -170,8 +165,6 @@ public enum QueueTypeEnum { Optional queueTypeEnum = Arrays.stream(QueueTypeEnum.values()) .filter(each -> each.type == type) .findFirst(); - return queueTypeEnum.map(each -> each.name).orElse(""); } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedPolicies.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedPolicies.java index d1612876..ba312ffd 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedPolicies.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedPolicies.java @@ -52,7 +52,6 @@ public class RejectedPolicies { executor.execute(r); } } - } /** @@ -71,7 +70,5 @@ public class RejectedPolicies { log.error("Adding Queue task to thread pool failed.", e); } } - } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedTypeEnum.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedTypeEnum.java index c457e171..0dce3174 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedTypeEnum.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/RejectedTypeEnum.java @@ -101,18 +101,15 @@ public enum RejectedTypeEnum { .filter(each -> Objects.equals(each.name, name)) .findFirst() .orElse(null); - if (rejectedTypeEnum != null) { return rejectedTypeEnum.rejectedHandler; } - Collection customRejectedExecutionHandlers = DynamicThreadPoolServiceLoader .getSingletonServiceInstances(CustomRejectedExecutionHandler.class); Optional customRejected = customRejectedExecutionHandlers.stream() .filter(each -> Objects.equals(name, each.getName())) .map(each -> each.generateRejected()) .findFirst(); - return customRejected.orElse(ABORT_POLICY.rejectedHandler); } @@ -127,7 +124,6 @@ public enum RejectedTypeEnum { .filter(each -> Objects.equals(type, each.type)) .map(each -> each.rejectedHandler) .findFirst(); - // 使用 SPI 匹配拒绝策略 RejectedExecutionHandler resultRejected = rejectedTypeEnum.orElseGet(() -> { Collection customRejectedExecutionHandlers = DynamicThreadPoolServiceLoader @@ -136,10 +132,8 @@ public enum RejectedTypeEnum { .filter(each -> Objects.equals(type, each.getType())) .map(each -> each.generateRejected()) .findFirst(); - return customRejected.orElse(ABORT_POLICY.rejectedHandler); }); - return resultRejected; } @@ -152,5 +146,4 @@ public enum RejectedTypeEnum { public static String getRejectedNameByType(int type) { return createPolicy(type).getClass().getSimpleName(); } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadFactoryBuilder.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadFactoryBuilder.java index f5e6e037..09ed9052 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadFactoryBuilder.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadFactoryBuilder.java @@ -171,5 +171,4 @@ public class ThreadFactoryBuilder implements Builder { return thread; }; } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolBuilder.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolBuilder.java index 71cf9487..5fc7e475 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolBuilder.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolBuilder.java @@ -173,6 +173,14 @@ public class ThreadPoolBuilder implements Builder { return this; } + public ThreadPoolBuilder singlePool(String threadNamePrefix) { + int singleNum = 1; + this.corePoolSize = singleNum; + this.maxPoolSize = singleNum; + this.threadNamePrefix = threadNamePrefix; + return this; + } + public ThreadPoolBuilder poolThreadSize(int corePoolSize, int maxPoolSize) { this.corePoolSize = corePoolSize; this.maxPoolSize = maxPoolSize; diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolExecutorTemplate.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolExecutorTemplate.java index 4564a2a5..7e41ea0e 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolExecutorTemplate.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolExecutorTemplate.java @@ -79,5 +79,4 @@ public class ThreadPoolExecutorTemplate extends ThreadPoolExecutor { } }; } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/AbstractWebThreadPoolService.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/AbstractWebThreadPoolService.java index 2f257c8c..5d7d185b 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/AbstractWebThreadPoolService.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/AbstractWebThreadPoolService.java @@ -60,7 +60,6 @@ public abstract class AbstractWebThreadPoolService implements WebThreadPoolServi } } } - return executor; } @@ -72,5 +71,4 @@ public abstract class AbstractWebThreadPoolService implements WebThreadPoolServi // ignore. Adaptation unit test. } } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/JettyWebThreadPoolHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/JettyWebThreadPoolHandler.java index 617eb496..c02a1ce2 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/JettyWebThreadPoolHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/JettyWebThreadPoolHandler.java @@ -30,6 +30,8 @@ import org.springframework.boot.web.server.WebServer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; +import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; + /** * @author : wh * @date : 2022/2/28 16:55 @@ -50,10 +52,8 @@ public class JettyWebThreadPoolHandler extends AbstractWebThreadPoolService { QueuedThreadPool queuedThreadPool = (QueuedThreadPool) executor; poolBaseInfo.setCoreSize(queuedThreadPool.getMinThreads()); poolBaseInfo.setMaximumSize(queuedThreadPool.getMaxThreads()); - BlockingQueue jobs = (BlockingQueue) ReflectUtil.getFieldValue(queuedThreadPool, "_jobs"); int queueCapacity = jobs.remainingCapacity() + jobs.size(); - poolBaseInfo.setQueueCapacity(queueCapacity); poolBaseInfo.setQueueType(jobs.getClass().getSimpleName()); poolBaseInfo.setKeepAliveTime((long) queuedThreadPool.getIdleTimeout()); @@ -67,16 +67,13 @@ public class JettyWebThreadPoolHandler extends AbstractWebThreadPoolService { try { parameterInfo = new PoolParameterInfo(); QueuedThreadPool jettyExecutor = (QueuedThreadPool) executor; - int minThreads = jettyExecutor.getMinThreads(); int maxThreads = jettyExecutor.getMaxThreads(); - parameterInfo.setCoreSize(minThreads); parameterInfo.setMaxSize(maxThreads); } catch (Exception ex) { log.error("Failed to get the jetty thread pool parameter.", ex); } - return parameterInfo; } @@ -89,25 +86,20 @@ public class JettyWebThreadPoolHandler extends AbstractWebThreadPoolService { public void updateWebThreadPool(PoolParameterInfo poolParameterInfo) { try { QueuedThreadPool jettyExecutor = (QueuedThreadPool) executor; - int minThreads = jettyExecutor.getMinThreads(); int maxThreads = jettyExecutor.getMaxThreads(); - Integer coreSize = poolParameterInfo.getCoreSize(); Integer maxSize = poolParameterInfo.getMaxSize(); - jettyExecutor.setMinThreads(coreSize); jettyExecutor.setMaxThreads(maxSize); - log.info( "[JETTY] Changed web thread pool. " + "\n coreSize :: [{}]" + "\n maxSize :: [{}]", - String.format("%s => %s", minThreads, jettyExecutor.getMinThreads()), - String.format("%s => %s", maxThreads, jettyExecutor.getMaxThreads())); + String.format(CHANGE_DELIMITER, minThreads, jettyExecutor.getMinThreads()), + String.format(CHANGE_DELIMITER, maxThreads, jettyExecutor.getMaxThreads())); } catch (Exception ex) { log.error("Failed to modify the jetty thread pool parameter.", ex); } } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/TomcatWebThreadPoolHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/TomcatWebThreadPoolHandler.java index efcb9f8e..5025e000 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/TomcatWebThreadPoolHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/TomcatWebThreadPoolHandler.java @@ -30,6 +30,8 @@ import org.springframework.boot.web.server.WebServer; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; + /** * Tomcat web thread pool handler. * @@ -52,7 +54,6 @@ public class TomcatWebThreadPoolHandler extends AbstractWebThreadPoolService { log.warn("Exception getting Tomcat thread pool. Exception message :: {}", EXCEPTION_MESSAGE); return null; } - Executor tomcatExecutor = null; try { tomcatExecutor = ((TomcatWebServer) webServer).getTomcat().getConnector().getProtocolHandler().getExecutor(); @@ -61,7 +62,6 @@ public class TomcatWebThreadPoolHandler extends AbstractWebThreadPoolService { EXCEPTION_MESSAGE = ex.getMessage(); log.error("Failed to get Tomcat thread pool. Message :: {}", EXCEPTION_MESSAGE); } - return tomcatExecutor; } @@ -85,7 +85,6 @@ public class TomcatWebThreadPoolHandler extends AbstractWebThreadPoolService { poolBaseInfo.setQueueType(queue.getClass().getSimpleName()); poolBaseInfo.setQueueCapacity(queueCapacity); poolBaseInfo.setRejectedName(rejectedExecutionHandler.getClass().getSimpleName()); - return poolBaseInfo; } @@ -105,7 +104,6 @@ public class TomcatWebThreadPoolHandler extends AbstractWebThreadPoolService { } catch (Exception ex) { log.error("Failed to get the tomcat thread pool parameter.", ex); } - return parameterInfo; } @@ -121,7 +119,6 @@ public class TomcatWebThreadPoolHandler extends AbstractWebThreadPoolService { int originalCoreSize = tomcatExecutor.getCorePoolSize(); int originalMaximumPoolSize = tomcatExecutor.getMaximumPoolSize(); long originalKeepAliveTime = tomcatExecutor.getKeepAliveTime(TimeUnit.SECONDS); - tomcatExecutor.setCorePoolSize(poolParameterInfo.getCoreSize()); tomcatExecutor.setMaximumPoolSize(poolParameterInfo.getMaxSize()); tomcatExecutor.setKeepAliveTime(poolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS); @@ -130,12 +127,11 @@ public class TomcatWebThreadPoolHandler extends AbstractWebThreadPoolService { "\n coreSize :: [{}]" + "\n maxSize :: [{}]" + "\n keepAliveTime :: [{}]", - String.format("%s => %s", originalCoreSize, poolParameterInfo.getCoreSize()), - String.format("%s => %s", originalMaximumPoolSize, poolParameterInfo.getMaxSize()), - String.format("%s => %s", originalKeepAliveTime, poolParameterInfo.getKeepAliveTime())); + String.format(CHANGE_DELIMITER, originalCoreSize, poolParameterInfo.getCoreSize()), + String.format(CHANGE_DELIMITER, originalMaximumPoolSize, poolParameterInfo.getMaxSize()), + String.format(CHANGE_DELIMITER, originalKeepAliveTime, poolParameterInfo.getKeepAliveTime())); } catch (Exception ex) { log.error("Failed to modify the Tomcat thread pool parameter.", ex); } } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/UndertowWebThreadPoolHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/UndertowWebThreadPoolHandler.java index dab8f7f8..f54f36d6 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/UndertowWebThreadPoolHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/UndertowWebThreadPoolHandler.java @@ -38,6 +38,8 @@ import java.util.Date; import java.util.Objects; import java.util.concurrent.Executor; +import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; + /** * Undertow web thread pool handler. * @@ -77,7 +79,6 @@ public class UndertowWebThreadPoolHandler extends AbstractWebThreadPoolService { } catch (Exception ex) { log.error("The undertow container failed to get thread pool parameters.", ex); } - return poolBaseInfo; } @@ -97,7 +98,6 @@ public class UndertowWebThreadPoolHandler extends AbstractWebThreadPoolService { } catch (Exception ex) { log.error("Failed to get the undertow thread pool parameter.", ex); } - return parameterInfo; } @@ -106,7 +106,6 @@ public class UndertowWebThreadPoolHandler extends AbstractWebThreadPoolService { PoolRunStateInfo stateInfo = new PoolRunStateInfo(); XnioWorker xnioWorker = (XnioWorker) executor; - // private final TaskPool taskPool; Field field = ReflectionUtils.findField(XnioWorker.class, "taskPool"); ReflectionUtils.makeAccessible(field); Object fieldObject = ReflectionUtils.getField(field, xnioWorker); @@ -153,15 +152,12 @@ public class UndertowWebThreadPoolHandler extends AbstractWebThreadPoolService { public void updateWebThreadPool(PoolParameterInfo poolParameterInfo) { try { XnioWorker xnioWorker = (XnioWorker) executor; - Integer coreSize = poolParameterInfo.getCoreSize(); Integer maxSize = poolParameterInfo.getMaxSize(); Integer keepAliveTime = poolParameterInfo.getKeepAliveTime(); - int originalCoreSize = xnioWorker.getOption(Options.WORKER_TASK_CORE_THREADS); int originalMaximumPoolSize = xnioWorker.getOption(Options.WORKER_TASK_MAX_THREADS); int originalKeepAliveTime = xnioWorker.getOption(Options.WORKER_TASK_KEEPALIVE); - xnioWorker.setOption(Options.WORKER_TASK_CORE_THREADS, coreSize); xnioWorker.setOption(Options.WORKER_TASK_MAX_THREADS, maxSize); xnioWorker.setOption(Options.WORKER_TASK_KEEPALIVE, keepAliveTime); @@ -170,13 +166,11 @@ public class UndertowWebThreadPoolHandler extends AbstractWebThreadPoolService { "\n coreSize :: [{}]" + "\n maxSize :: [{}]" + "\n keepAliveTime :: [{}]", - String.format("%s => %s", originalCoreSize, coreSize), - String.format("%s => %s", originalMaximumPoolSize, maxSize), - String.format("%s => %s", originalKeepAliveTime, keepAliveTime)); - + String.format(CHANGE_DELIMITER, originalCoreSize, coreSize), + String.format(CHANGE_DELIMITER, originalMaximumPoolSize, maxSize), + String.format(CHANGE_DELIMITER, originalKeepAliveTime, keepAliveTime)); } catch (Exception ex) { log.error("Failed to modify the undertow thread pool parameter.", ex); } } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/WebThreadPoolHandlerChoose.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/WebThreadPoolHandlerChoose.java index d1a60fa9..f42da476 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/WebThreadPoolHandlerChoose.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/WebThreadPoolHandlerChoose.java @@ -42,8 +42,6 @@ public class WebThreadPoolHandlerChoose { } catch (Exception ex) { throw new ServiceException("Web thread pool service bean not found.", ex); } - return webThreadPoolService; } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/WebThreadPoolRunStateHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/WebThreadPoolRunStateHandler.java index 88c3caff..fa272dc2 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/WebThreadPoolRunStateHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/WebThreadPoolRunStateHandler.java @@ -51,5 +51,4 @@ public class WebThreadPoolRunStateHandler extends AbstractThreadPoolRuntime { return poolRunStateInfo; } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/WebThreadPoolService.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/WebThreadPoolService.java index 99e39ec3..17bb4edf 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/WebThreadPoolService.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/WebThreadPoolService.java @@ -66,5 +66,4 @@ public interface WebThreadPoolService { * @param poolParameterInfo */ void updateWebThreadPool(PoolParameterInfo poolParameterInfo); - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/handler/DynamicThreadPoolBannerHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/handler/DynamicThreadPoolBannerHandler.java index a7f80daa..2c32bc3b 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/handler/DynamicThreadPoolBannerHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/handler/DynamicThreadPoolBannerHandler.java @@ -58,16 +58,13 @@ public class DynamicThreadPoolBannerHandler implements InitializingBean { "| |) | || | ' \\/ _` | ' \\| / _| | | | _/\n" + "|___/ \\_, |_||_\\__,_|_|_|_|_\\__| |_| |_| \n" + " |__/ \n"; - if (properties.getBanner()) { String version = getVersion(); version = (version != null) ? " (v" + version + ")" : "no version."; - StringBuilder padding = new StringBuilder(); while (padding.length() < STRAP_LINE_SIZE - (version.length() + DYNAMIC_THREAD_POOL.length())) { padding.append(" "); } - System.out.println(AnsiOutput.toString(banner, AnsiColor.GREEN, DYNAMIC_THREAD_POOL, AnsiColor.DEFAULT, padding.toString(), AnsiStyle.FAINT, version, "\n\n", HIPPO4J_GITHUB, "\n", HIPPO4J_SITE, "\n")); @@ -78,5 +75,4 @@ public class DynamicThreadPoolBannerHandler implements InitializingBean { final Package pkg = DynamicThreadPoolBannerHandler.class.getPackage(); return pkg != null ? pkg.getImplementationVersion() : ""; } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/proxy/RejectedProxyInvocationHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/proxy/RejectedProxyInvocationHandler.java index 9466a63e..101cd427 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/proxy/RejectedProxyInvocationHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/proxy/RejectedProxyInvocationHandler.java @@ -44,17 +44,14 @@ public class RejectedProxyInvocationHandler implements InvocationHandler { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { rejectCount.incrementAndGet(); - if (ApplicationContextHolder.getInstance() != null) { ThreadPoolNotifyAlarmHandler alarmHandler = ApplicationContextHolder.getBean(ThreadPoolNotifyAlarmHandler.class); alarmHandler.checkPoolRejectedAlarm(threadPoolId); } - try { return method.invoke(target, args); } catch (InvocationTargetException ex) { throw ex.getCause(); } } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/proxy/RejectedProxyUtil.java b/hippo4j-core/src/main/java/cn/hippo4j/core/proxy/RejectedProxyUtil.java index 8becab3c..c5acc952 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/proxy/RejectedProxyUtil.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/proxy/RejectedProxyUtil.java @@ -43,8 +43,6 @@ public class RejectedProxyUtil { rejectedExecutionHandler.getClass().getClassLoader(), new Class[]{RejectedExecutionHandler.class}, new RejectedProxyInvocationHandler(rejectedExecutionHandler, threadPoolId, rejectedNum)); - return rejectedProxy; } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/spi/CustomBlockingQueue.java b/hippo4j-core/src/main/java/cn/hippo4j/core/spi/CustomBlockingQueue.java index 8a42bde8..3288e184 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/spi/CustomBlockingQueue.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/spi/CustomBlockingQueue.java @@ -49,5 +49,4 @@ public interface CustomBlockingQueue { * @return */ BlockingQueue generateBlockingQueue(); - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/spi/CustomRejectedExecutionHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/spi/CustomRejectedExecutionHandler.java index d2d73105..b8b0793c 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/spi/CustomRejectedExecutionHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/spi/CustomRejectedExecutionHandler.java @@ -49,5 +49,4 @@ public interface CustomRejectedExecutionHandler { * @return */ RejectedExecutionHandler generateRejected(); - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/spi/DynamicThreadPoolServiceLoader.java b/hippo4j-core/src/main/java/cn/hippo4j/core/spi/DynamicThreadPoolServiceLoader.java index c78607a0..3ff0b12c 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/spi/DynamicThreadPoolServiceLoader.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/spi/DynamicThreadPoolServiceLoader.java @@ -92,5 +92,4 @@ public class DynamicThreadPoolServiceLoader { throw new ServiceLoaderInstantiationException(clazz, ex); } } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/spi/ServiceLoaderInstantiationException.java b/hippo4j-core/src/main/java/cn/hippo4j/core/spi/ServiceLoaderInstantiationException.java index 5bf3059e..41d491b2 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/spi/ServiceLoaderInstantiationException.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/spi/ServiceLoaderInstantiationException.java @@ -28,5 +28,4 @@ public class ServiceLoaderInstantiationException extends RuntimeException { public ServiceLoaderInstantiationException(final Class clazz, final Exception cause) { super(String.format("Can not find public default constructor for SPI class `%s`", clazz.getName()), cause); } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/toolkit/CalculateUtil.java b/hippo4j-core/src/main/java/cn/hippo4j/core/toolkit/CalculateUtil.java index 024f0d28..524e209e 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/toolkit/CalculateUtil.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/toolkit/CalculateUtil.java @@ -35,5 +35,4 @@ public class CalculateUtil { public static int divide(int num1, int num2) { return ((int) (Double.parseDouble(num1 + "") / Double.parseDouble(num2 + "") * 100)); } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/toolkit/IdentifyUtil.java b/hippo4j-core/src/main/java/cn/hippo4j/core/toolkit/IdentifyUtil.java index 101b3d50..9fe2b7fd 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/toolkit/IdentifyUtil.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/toolkit/IdentifyUtil.java @@ -61,7 +61,6 @@ public class IdentifyUtil { port, IDENTIFY_SLICER_SYMBOL, CLIENT_IDENTIFICATION_VALUE).toString(); - IDENTIFY = identification; return identification; } @@ -76,15 +75,12 @@ public class IdentifyUtil { while (StrUtil.isBlank(IDENTIFY)) { ConfigurableEnvironment environment = ApplicationContextHolder.getBean(ConfigurableEnvironment.class); InetUtils inetUtils = ApplicationContextHolder.getBean(InetUtils.class); - if (environment != null && inetUtils != null) { String identify = generate(environment, inetUtils); return identify; } - Thread.sleep(500); } - return IDENTIFY; } @@ -99,8 +95,6 @@ public class IdentifyUtil { public static String getThreadPoolIdentify(String threadPoolId, String itemId, String namespace) { ArrayList params = Lists.newArrayList( threadPoolId, itemId, namespace, getIdentify()); - return Joiner.on(GROUP_KEY_DELIMITER).join(params); } - } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/toolkit/TraceContextUtil.java b/hippo4j-core/src/main/java/cn/hippo4j/core/toolkit/TraceContextUtil.java index 75d79225..fe44da6e 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/toolkit/TraceContextUtil.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/toolkit/TraceContextUtil.java @@ -53,5 +53,4 @@ public class TraceContextUtil { public static void setExecuteTimeoutTraceKey(String key) { EXECUTE_TIMEOUT_TRACE_KEY = key; } - } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/AbstractCoreThreadPoolDynamicRefresh.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/AbstractCoreThreadPoolDynamicRefresh.java index 06d5fff5..6cad3b01 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/AbstractCoreThreadPoolDynamicRefresh.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/AbstractCoreThreadPoolDynamicRefresh.java @@ -54,6 +54,9 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; +import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT; + /** * Abstract core thread pool dynamic refresh. * @@ -68,10 +71,7 @@ public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPool protected final BootstrapCoreProperties bootstrapCoreProperties; - protected final ExecutorService dynamicRefreshExecutorService = ThreadPoolBuilder.builder() - .threadFactory("client.dynamic.refresh") - .singlePool() - .build(); + protected final ExecutorService dynamicRefreshExecutorService = ThreadPoolBuilder.builder().singlePool("client.dynamic.refresh").build(); @Override public void dynamicRefresh(String content) { @@ -176,26 +176,16 @@ public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPool ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId()); // refresh executor properties GlobalCoreThreadPoolManage.refresh(threadPoolId, properties); - log.info( - "[{}] Changed thread pool. " + - "\n coreSize :: [{}]" + - "\n maxSize :: [{}]" + - "\n queueType :: [{}]" + - "\n capacity :: [{}]" + - "\n keepAliveTime :: [{}]" + - "\n executeTimeOut :: [{}]" + - "\n rejectedType :: [{}]" + - "\n allowCoreThreadTimeOut :: [{}]", + log.info(CHANGE_THREAD_POOL_TEXT, threadPoolId.toUpperCase(), - String.format("%s => %s", beforeProperties.getCorePoolSize(), properties.getCorePoolSize()), - String.format("%s => %s", beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()), - String.format("%s => %s", beforeProperties.getBlockingQueue(), properties.getBlockingQueue()), - String.format("%s => %s", beforeProperties.getQueueCapacity(), properties.getQueueCapacity()), - String.format("%s => %s", beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()), - String.format("%s => %s", beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()), - String.format("%s => %s", beforeProperties.getRejectedHandler(), properties.getRejectedHandler()), - String.format("%s => %s", beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())); - + String.format(CHANGE_DELIMITER, beforeProperties.getCorePoolSize(), properties.getCorePoolSize()), + String.format(CHANGE_DELIMITER, beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()), + String.format(CHANGE_DELIMITER, beforeProperties.getBlockingQueue(), properties.getBlockingQueue()), + String.format(CHANGE_DELIMITER, beforeProperties.getQueueCapacity(), properties.getQueueCapacity()), + String.format(CHANGE_DELIMITER, beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()), + String.format(CHANGE_DELIMITER, beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()), + String.format(CHANGE_DELIMITER, beforeProperties.getRejectedHandler(), properties.getRejectedHandler()), + String.format(CHANGE_DELIMITER, beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())); try { threadPoolNotifyAlarmHandler.sendPoolConfigChange(newChangeRequest(beforeProperties, properties)); } catch (Throwable ex) { @@ -241,7 +231,6 @@ public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPool private boolean checkConsistency(String threadPoolId, ExecutorProperties properties) { ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId()); ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor(); - boolean result = !Objects.equals(beforeProperties.getCorePoolSize(), properties.getCorePoolSize()) || !Objects.equals(beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()) || !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut()) diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/ApolloRefresherHandler.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/ApolloRefresherHandler.java index 167ad7d6..8acafd67 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/ApolloRefresherHandler.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/ApolloRefresherHandler.java @@ -62,5 +62,4 @@ public class ApolloRefresherHandler extends AbstractCoreThreadPoolDynamicRefresh config.addChangeListener(configChangeListener); log.info("dynamic-thread-pool refresher, add apollo listener success, namespace: {}", namespace); } - } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/BootstrapCorePropertiesBinderAdapt.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/BootstrapCorePropertiesBinderAdapt.java index 2b38723d..c84c6b3b 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/BootstrapCorePropertiesBinderAdapt.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/BootstrapCorePropertiesBinderAdapt.java @@ -66,7 +66,6 @@ public class BootstrapCorePropertiesBinderAdapt { bindableCoreProperties = adapt(configInfo); } } - return bindableCoreProperties; } @@ -86,14 +85,13 @@ public class BootstrapCorePropertiesBinderAdapt { boolean containFlag = key != null && StringUtil.isNotBlank((String) key) && (((String) key).indexOf(PREFIX + ".executors") != -1 - || ((String) key).indexOf(PREFIX + ".notify-platforms") != -1 - || ((String) key).indexOf(PREFIX + ".notifyPlatforms") != -1); + || ((String) key).indexOf(PREFIX + ".notify-platforms") != -1 + || ((String) key).indexOf(PREFIX + ".notifyPlatforms") != -1); if (containFlag) { String targetKey = key.toString().replace(PREFIX + ".", ""); targetMap.put(targetKey, val); } }); - // convert List executorPropertiesList = Lists.newArrayList(); List notifyPropertiesList = Lists.newArrayList(); @@ -111,7 +109,6 @@ public class BootstrapCorePropertiesBinderAdapt { if (notifyKeySplit != null && notifyKeySplit.length > 0) { key = key.replace("-", "_"); } - notifySingleMap.put(key, entry.getValue()); } else { key = key.replace("executors[" + i + "].", ""); @@ -123,27 +120,22 @@ public class BootstrapCorePropertiesBinderAdapt { executorSingleMap.put(key, entry.getValue()); } } - if (key.indexOf("notify-platforms[" + i + "].") != -1 || key.indexOf("notifyPlatforms[" + i + "].") != -1) { if (key.indexOf("notify-platforms[" + i + "].") != -1) { key = key.replace("notify-platforms[" + i + "].", ""); } else { key = key.replace("notifyPlatforms[" + i + "].", ""); } - String[] keySplit = key.split("-"); if (keySplit != null && keySplit.length > 0) { key = key.replace("-", "_"); } - platformSingleMap.put(key, entry.getValue()); } } - if (CollectionUtil.isEmpty(executorSingleMap) && CollectionUtil.isEmpty(platformSingleMap)) { break; } - if (CollectionUtil.isNotEmpty(executorSingleMap)) { ExecutorProperties executorProperties = BeanUtil.mapToBean(executorSingleMap, ExecutorProperties.class, true, CopyOptions.create()); if (executorProperties != null) { @@ -156,15 +148,12 @@ public class BootstrapCorePropertiesBinderAdapt { notifyReceivesMap.put(value.name(), (String) receives); } } - alarm.setReceives(notifyReceivesMap); executorProperties.setNotify(alarm); } - executorPropertiesList.add(executorProperties); } } - if (CollectionUtil.isNotEmpty(platformSingleMap)) { NotifyPlatformProperties notifyPlatformProperties = BeanUtil.mapToBean(platformSingleMap, NotifyPlatformProperties.class, true, CopyOptions.create()); if (notifyPlatformProperties != null) { @@ -172,15 +161,12 @@ public class BootstrapCorePropertiesBinderAdapt { } } } - bindableCoreProperties = new BootstrapCoreProperties(); bindableCoreProperties.setExecutors(executorPropertiesList); bindableCoreProperties.setNotifyPlatforms(notifyPropertiesList); } catch (Exception ex) { throw ex; } - return bindableCoreProperties; } - } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/NacosCloudRefresherHandler.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/NacosCloudRefresherHandler.java index 6f4314df..da70021e 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/NacosCloudRefresherHandler.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/NacosCloudRefresherHandler.java @@ -61,5 +61,4 @@ public class NacosCloudRefresherHandler extends AbstractCoreThreadPoolDynamicRef } }); } - } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/NacosRefresherHandler.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/NacosRefresherHandler.java index 7f46c31f..9a038abc 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/NacosRefresherHandler.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/NacosRefresherHandler.java @@ -61,5 +61,4 @@ public class NacosRefresherHandler extends AbstractCoreThreadPoolDynamicRefresh } }); } - } diff --git a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/ZookeeperRefresherHandler.java b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/ZookeeperRefresherHandler.java index 6f8feb08..a4f580b4 100644 --- a/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/ZookeeperRefresherHandler.java +++ b/hippo4j-spring-boot/hippo4j-core-spring-boot-starter/src/main/java/cn/hippo4j/core/starter/refresher/ZookeeperRefresherHandler.java @@ -112,5 +112,4 @@ public class ZookeeperRefresherHandler extends AbstractCoreThreadPoolDynamicRefr log.error("load zk node error, nodePath is {}", nodePath, e); } } - } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/ServerThreadPoolDynamicRefresh.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/ServerThreadPoolDynamicRefresh.java index 38792de9..5c6ee99a 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/ServerThreadPoolDynamicRefresh.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/ServerThreadPoolDynamicRefresh.java @@ -40,6 +40,9 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; +import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT; + /** * Thread pool dynamic refresh. * @@ -110,26 +113,17 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh request.setNowExecuteTimeOut(originalExecuteTimeOut); threadPoolNotifyAlarmHandler.sendPoolConfigChange(request); - log.info( - "[{}] Changed thread pool. " + - "\n coreSize :: [{}]" + - "\n maxSize :: [{}]" + - "\n queueType :: [{}]" + - "\n capacity :: [{}]" + - "\n keepAliveTime :: [{}]" + - "\n executeTimeOut :: [{}]" + - "\n rejectedType :: [{}]" + - "\n allowCoreThreadTimeOut :: [{}]", + log.info(CHANGE_THREAD_POOL_TEXT, threadPoolId.toUpperCase(), - String.format("%s => %s", originalCoreSize, afterExecutor.getCorePoolSize()), - String.format("%s => %s", originalMaximumPoolSize, afterExecutor.getMaximumPoolSize()), - String.format("%s => %s", originalQuery, QueueTypeEnum.getBlockingQueueNameByType(parameter.getQueueType())), - String.format("%s => %s", originalCapacity, + String.format(CHANGE_DELIMITER, originalCoreSize, afterExecutor.getCorePoolSize()), + String.format(CHANGE_DELIMITER, originalMaximumPoolSize, afterExecutor.getMaximumPoolSize()), + String.format(CHANGE_DELIMITER, originalQuery, QueueTypeEnum.getBlockingQueueNameByType(parameter.getQueueType())), + String.format(CHANGE_DELIMITER, originalCapacity, (afterExecutor.getQueue().remainingCapacity() + afterExecutor.getQueue().size())), - String.format("%s => %s", originalKeepAliveTime, afterExecutor.getKeepAliveTime(TimeUnit.SECONDS)), - String.format("%s => %s", originalExecuteTimeOut, originalExecuteTimeOut), - String.format("%s => %s", originalRejected, RejectedTypeEnum.getRejectedNameByType(parameter.getRejectedType())), - String.format("%s => %s", originalAllowCoreThreadTimeOut, EnableEnum.getBool(parameter.getAllowCoreThreadTimeOut()))); + String.format(CHANGE_DELIMITER, originalKeepAliveTime, afterExecutor.getKeepAliveTime(TimeUnit.SECONDS)), + String.format(CHANGE_DELIMITER, originalExecuteTimeOut, originalExecuteTimeOut), + String.format(CHANGE_DELIMITER, originalRejected, RejectedTypeEnum.getRejectedNameByType(parameter.getRejectedType())), + String.format(CHANGE_DELIMITER, originalAllowCoreThreadTimeOut, EnableEnum.getBool(parameter.getAllowCoreThreadTimeOut()))); } /**