|
|
@ -17,6 +17,12 @@
|
|
|
|
|
|
|
|
|
|
|
|
package cn.hippo4j.adapter.rabbitmq;
|
|
|
|
package cn.hippo4j.adapter.rabbitmq;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import java.util.List;
|
|
|
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
|
|
|
import java.util.Objects;
|
|
|
|
|
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
|
|
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
|
|
|
|
|
|
|
|
|
import cn.hippo4j.adapter.base.ThreadPoolAdapter;
|
|
|
|
import cn.hippo4j.adapter.base.ThreadPoolAdapter;
|
|
|
|
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
|
|
|
|
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
|
|
|
|
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
|
|
|
|
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
|
|
|
@ -25,16 +31,11 @@ import com.google.common.collect.Lists;
|
|
|
|
import com.google.common.collect.Maps;
|
|
|
|
import com.google.common.collect.Maps;
|
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
|
|
|
|
|
|
|
import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory;
|
|
|
|
import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory;
|
|
|
|
import org.springframework.boot.context.event.ApplicationStartedEvent;
|
|
|
|
import org.springframework.boot.context.event.ApplicationStartedEvent;
|
|
|
|
import org.springframework.context.ApplicationListener;
|
|
|
|
import org.springframework.context.ApplicationListener;
|
|
|
|
|
|
|
|
|
|
|
|
import java.util.List;
|
|
|
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
|
|
|
import java.util.Objects;
|
|
|
|
|
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
|
|
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
|
|
|
|
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
@ -48,12 +49,7 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application
|
|
|
|
|
|
|
|
|
|
|
|
private static final String FiledName = "executorService";
|
|
|
|
private static final String FiledName = "executorService";
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
private final Map<String, AbstractConnectionFactory> abstractConnectionFactoryMap;
|
|
|
|
* TODO Configurable name
|
|
|
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
private static final String RABBITMQ_EXECUTOR_SERVICE = "Rabbitmq_Executor_Service";
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final AbstractConnectionFactory abstractConnectionFactory;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final Map<String, ThreadPoolExecutor> RABBITMQ_THREAD_POOL_TASK_EXECUTOR = Maps.newHashMap();
|
|
|
|
private final Map<String, ThreadPoolExecutor> RABBITMQ_THREAD_POOL_TASK_EXECUTOR = Maps.newHashMap();
|
|
|
|
|
|
|
|
|
|
|
@ -103,15 +99,19 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public void onApplicationEvent(ApplicationStartedEvent event) {
|
|
|
|
public void onApplicationEvent(ApplicationStartedEvent event) {
|
|
|
|
ExecutorService executor = (ExecutorService) ReflectUtil.getFieldValue(abstractConnectionFactory, FiledName);
|
|
|
|
abstractConnectionFactoryMap.forEach((beanName, abstractConnectionFactor) -> {
|
|
|
|
|
|
|
|
ExecutorService executor = (ExecutorService) ReflectUtil.getFieldValue(abstractConnectionFactor, FiledName);
|
|
|
|
if (Objects.nonNull(executor)) {
|
|
|
|
if (Objects.nonNull(executor)) {
|
|
|
|
if (executor instanceof ThreadPoolExecutor) {
|
|
|
|
if (executor instanceof ThreadPoolExecutor) {
|
|
|
|
ThreadPoolExecutor threadPoolTaskExecutor = (ThreadPoolExecutor) executor;
|
|
|
|
ThreadPoolExecutor threadPoolTaskExecutor = (ThreadPoolExecutor) executor;
|
|
|
|
RABBITMQ_THREAD_POOL_TASK_EXECUTOR.put(RABBITMQ_EXECUTOR_SERVICE, threadPoolTaskExecutor);
|
|
|
|
RABBITMQ_THREAD_POOL_TASK_EXECUTOR.put(beanName, threadPoolTaskExecutor);
|
|
|
|
log.info("Rabbitmq executor name {}", RABBITMQ_EXECUTOR_SERVICE);
|
|
|
|
log.info("Rabbitmq executor name {}", beanName);
|
|
|
|
} else {
|
|
|
|
}
|
|
|
|
|
|
|
|
else {
|
|
|
|
log.warn("Custom thread pools only support ThreadPoolExecutor");
|
|
|
|
log.warn("Custom thread pools only support ThreadPoolExecutor");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|