添加rabbitmq线程池支持

pull/244/head
weihu 2 years ago
parent a09093eadd
commit 174ed5d5b6

@ -36,6 +36,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
/**
* RabbitMQ thread-pool adapter.
*/
@ -62,11 +64,45 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application
@Override
public ThreadPoolAdapterState getThreadPoolState(String identify) {
return null;
ThreadPoolAdapterState threadPoolAdapterState = new ThreadPoolAdapterState();
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = RABBITMQ_EXECUTOR.get(identify);
ThreadPoolTaskExecutor threadPoolTaskExecutor = RABBITMQ_THREAD_POOL_TASK_EXECUTOR.get(identify);
threadPoolAdapterState.setThreadPoolKey(identify);
if (Objects.nonNull(simpleAsyncTaskExecutor)) {
threadPoolAdapterState.setCoreSize(simpleAsyncTaskExecutor.getConcurrencyLimit());
}
if (Objects.nonNull(threadPoolTaskExecutor)) {
threadPoolAdapterState.setCoreSize(threadPoolTaskExecutor.getCorePoolSize());
threadPoolAdapterState.setMaximumSize(threadPoolTaskExecutor.getMaxPoolSize());
}
return threadPoolAdapterState;
}
@Override
public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) {
String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey();
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = RABBITMQ_EXECUTOR.get(threadPoolKey);
ThreadPoolTaskExecutor threadPoolTaskExecutor = RABBITMQ_THREAD_POOL_TASK_EXECUTOR.get(threadPoolKey);
if (Objects.nonNull(threadPoolTaskExecutor)) {
int originalCoreSize = threadPoolTaskExecutor.getCorePoolSize();
int originalMaximumPoolSize = threadPoolTaskExecutor.getMaxPoolSize();
threadPoolTaskExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize());
threadPoolTaskExecutor.setMaxPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
log.info("[{}] rabbitmq consumption thread pool parameter change. coreSize :: {}, maximumSize :: {}",
threadPoolKey,
String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolAdapterParameter.getCorePoolSize()),
String.format(CHANGE_DELIMITER, originalMaximumPoolSize, threadPoolAdapterParameter.getMaximumPoolSize()));
return true;
}
if (Objects.nonNull(simpleAsyncTaskExecutor)) {
int concurrencyLimit = simpleAsyncTaskExecutor.getConcurrencyLimit();
simpleAsyncTaskExecutor.setConcurrencyLimit(threadPoolAdapterParameter.getCorePoolSize());
log.info("[{}] rabbitmq consumption thread pool parameter change. coreSize :: {}",
threadPoolKey,
String.format(CHANGE_DELIMITER, concurrencyLimit, threadPoolAdapterParameter.getCorePoolSize()));
return true;
}
log.warn("[{}] rabbitmq consuming thread pool not found.", threadPoolKey);
return false;
}
@ -88,8 +124,8 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application
RABBITMQ_THREAD_POOL_TASK_EXECUTOR.put(beanName, threadPoolTaskExecutor);
} else {
log.warn("Custom thread pools only support ThreadPoolTaskExecutor");
}
}
}
}
}
}

Loading…
Cancel
Save