diff --git a/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java index 5af7fafa..6194f564 100644 --- a/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-rabbitmq/src/main/java/cn/hippo4j/adapter/rabbitmq/RabbitMQThreadPoolAdapter.java @@ -36,6 +36,8 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.Executor; +import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; + /** * RabbitMQ thread-pool adapter. */ @@ -62,11 +64,45 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application @Override public ThreadPoolAdapterState getThreadPoolState(String identify) { - return null; + ThreadPoolAdapterState threadPoolAdapterState = new ThreadPoolAdapterState(); + SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = RABBITMQ_EXECUTOR.get(identify); + ThreadPoolTaskExecutor threadPoolTaskExecutor = RABBITMQ_THREAD_POOL_TASK_EXECUTOR.get(identify); + threadPoolAdapterState.setThreadPoolKey(identify); + if (Objects.nonNull(simpleAsyncTaskExecutor)) { + threadPoolAdapterState.setCoreSize(simpleAsyncTaskExecutor.getConcurrencyLimit()); + } + if (Objects.nonNull(threadPoolTaskExecutor)) { + threadPoolAdapterState.setCoreSize(threadPoolTaskExecutor.getCorePoolSize()); + threadPoolAdapterState.setMaximumSize(threadPoolTaskExecutor.getMaxPoolSize()); + } + return threadPoolAdapterState; } @Override public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { + String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey(); + SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = RABBITMQ_EXECUTOR.get(threadPoolKey); + ThreadPoolTaskExecutor threadPoolTaskExecutor = RABBITMQ_THREAD_POOL_TASK_EXECUTOR.get(threadPoolKey); + if (Objects.nonNull(threadPoolTaskExecutor)) { + int originalCoreSize = threadPoolTaskExecutor.getCorePoolSize(); + int originalMaximumPoolSize = threadPoolTaskExecutor.getMaxPoolSize(); + threadPoolTaskExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize()); + threadPoolTaskExecutor.setMaxPoolSize(threadPoolAdapterParameter.getMaximumPoolSize()); + log.info("[{}] rabbitmq consumption thread pool parameter change. coreSize :: {}, maximumSize :: {}", + threadPoolKey, + String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolAdapterParameter.getCorePoolSize()), + String.format(CHANGE_DELIMITER, originalMaximumPoolSize, threadPoolAdapterParameter.getMaximumPoolSize())); + return true; + } + if (Objects.nonNull(simpleAsyncTaskExecutor)) { + int concurrencyLimit = simpleAsyncTaskExecutor.getConcurrencyLimit(); + simpleAsyncTaskExecutor.setConcurrencyLimit(threadPoolAdapterParameter.getCorePoolSize()); + log.info("[{}] rabbitmq consumption thread pool parameter change. coreSize :: {}", + threadPoolKey, + String.format(CHANGE_DELIMITER, concurrencyLimit, threadPoolAdapterParameter.getCorePoolSize())); + return true; + } + log.warn("[{}] rabbitmq consuming thread pool not found.", threadPoolKey); return false; } @@ -88,8 +124,8 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application RABBITMQ_THREAD_POOL_TASK_EXECUTOR.put(beanName, threadPoolTaskExecutor); } else { log.warn("Custom thread pools only support ThreadPoolTaskExecutor"); - } - } + } + } } }