From 174ed5d5b616979c48bd782ec633a8bb2b06f399 Mon Sep 17 00:00:00 2001 From: weihu Date: Tue, 24 May 2022 10:40:08 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0rabbitmq=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B1=A0=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rabbitmq/RabbitMQThreadPoolAdapter.java | 42 +++++++++++++++++-- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java index 5af7fafa..6194f564 100644 --- a/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java @@ -36,6 +36,8 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.Executor; +import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; + /** * RabbitMQ thread-pool adapter. */ @@ -62,11 +64,45 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application @Override public ThreadPoolAdapterState getThreadPoolState(String identify) { - return null; + ThreadPoolAdapterState threadPoolAdapterState = new ThreadPoolAdapterState(); + SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = RABBITMQ_EXECUTOR.get(identify); + ThreadPoolTaskExecutor threadPoolTaskExecutor = RABBITMQ_THREAD_POOL_TASK_EXECUTOR.get(identify); + threadPoolAdapterState.setThreadPoolKey(identify); + if (Objects.nonNull(simpleAsyncTaskExecutor)) { + threadPoolAdapterState.setCoreSize(simpleAsyncTaskExecutor.getConcurrencyLimit()); + } + if (Objects.nonNull(threadPoolTaskExecutor)) { + threadPoolAdapterState.setCoreSize(threadPoolTaskExecutor.getCorePoolSize()); + threadPoolAdapterState.setMaximumSize(threadPoolTaskExecutor.getMaxPoolSize()); + } + return threadPoolAdapterState; } @Override public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { + String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey(); + SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = RABBITMQ_EXECUTOR.get(threadPoolKey); + ThreadPoolTaskExecutor threadPoolTaskExecutor = RABBITMQ_THREAD_POOL_TASK_EXECUTOR.get(threadPoolKey); + if (Objects.nonNull(threadPoolTaskExecutor)) { + int originalCoreSize = threadPoolTaskExecutor.getCorePoolSize(); + int originalMaximumPoolSize = threadPoolTaskExecutor.getMaxPoolSize(); + threadPoolTaskExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize()); + threadPoolTaskExecutor.setMaxPoolSize(threadPoolAdapterParameter.getMaximumPoolSize()); + log.info("[{}] rabbitmq consumption thread pool parameter change. coreSize :: {}, maximumSize :: {}", + threadPoolKey, + String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolAdapterParameter.getCorePoolSize()), + String.format(CHANGE_DELIMITER, originalMaximumPoolSize, threadPoolAdapterParameter.getMaximumPoolSize())); + return true; + } + if (Objects.nonNull(simpleAsyncTaskExecutor)) { + int concurrencyLimit = simpleAsyncTaskExecutor.getConcurrencyLimit(); + simpleAsyncTaskExecutor.setConcurrencyLimit(threadPoolAdapterParameter.getCorePoolSize()); + log.info("[{}] rabbitmq consumption thread pool parameter change. coreSize :: {}", + threadPoolKey, + String.format(CHANGE_DELIMITER, concurrencyLimit, threadPoolAdapterParameter.getCorePoolSize())); + return true; + } + log.warn("[{}] rabbitmq consuming thread pool not found.", threadPoolKey); return false; } @@ -88,8 +124,8 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application RABBITMQ_THREAD_POOL_TASK_EXECUTOR.put(beanName, threadPoolTaskExecutor); } else { log.warn("Custom thread pools only support ThreadPoolTaskExecutor"); - } - } + } + } } }