pull/1546/head
bhuang 4 weeks ago
parent 1e81da1979
commit 6ab0f94966

@ -60,6 +60,13 @@ public class DynamicThreadPoolConfig {
private static final int MAX_POOL_SIZE = AVAILABLE_PROCESSORS * 4;
/**
* SpringBoot DynamicThreadPoolExecutor SpringBoot
* @DynamicThreadPool
* DynamicThreadPoolPostProcessor postProcessAfterInitialization
* 线线
* @return
*/
@Bean
@DynamicThreadPool
public Executor messageConsumeTtlDynamicThreadPool() {
@ -78,6 +85,22 @@ public class DynamicThreadPoolConfig {
return ttlExecutor;
}
@Bean
@DynamicThreadPool
public Executor messageConsumeTestDynamicThreadPool() {
String threadPoolId = "test";
ThreadPoolExecutor customExecutor = ThreadPoolBuilder.builder()
.dynamicPool()
.threadFactory(threadPoolId)
.threadPoolId(threadPoolId)
.executeTimeOut(EXECUTE_TIMEOUT)
.waitForTasksToCompleteOnShutdown(true)
.awaitTerminationMillis(AWAIT_TERMINATION_MILLIS)
.taskDecorator(new TaskTraceBuilderHandler())
.build();
return customExecutor;
}
/**
* {@link Bean @Bean} and {@link DynamicThreadPool @DynamicThreadPool}.
*/

@ -33,18 +33,25 @@ public class TransmittableThreadLocalExecutorAdapter implements DynamicThreadPoo
private static final String FIELD_NAME = "executor";
// 判断传进来的对象是否和当前适配器器对象匹配
@Override
public boolean match(Object executor) {
// 其实就是判断对象的类名是否为ExecutorTtlWrapper如果是就意味着是第三方线程池
// 这个线程池中持有者动态线程池对象
return Objects.equals(MATCH_CLASS_NAME, executor.getClass().getSimpleName());
}
// 从ExecutorTtlWrapper对象中获得其持有的DynamicThreadPoolExecutor对象
@Override
public ThreadPoolExecutor unwrap(Object executor) {
// 通过反射获得ExecutorTtlWrapper对象的executor成员变量
// 在之前展示的ExecutorTtlWrapper类的代码中可以看到动态线程池会赋值给
// ExecutorTtlWrapper的executor成员变量
return (ThreadPoolExecutor) ReflectUtil.getFieldValue(executor, FIELD_NAME);
}
@Override
public void replace(Object executor, Executor dynamicThreadPoolExecutor) {
// 将dynamicThreadPoolExecutor对象替换到executor中
ReflectUtil.setFieldValue(executor, FIELD_NAME, dynamicThreadPoolExecutor);
}
}

@ -33,23 +33,24 @@ import lombok.extern.slf4j.Slf4j;
public class InstanceInfo {
private static final String UNKNOWN = "unknown";
//应用名称,未设置就是未知
private String appName = UNKNOWN;
//地址
private String hostName;
//这个就是命名空间+项目Id
private String groupKey;
//端口号
private String port;
//客户端服务实例Id其实就是客户端地址+uuid (127.0.0.1:8088_eceeab1ab6a0471b838b97a47cfa1268)
private String instanceId;
private String ipApplicationName;
//客户端在配置文件中定义的上下文路径
private String clientBasePath;
//客户端回调地址,这个地址非常重要,一会就会为大家解释说明
private String callBackUrl;
//客户端唯一标识符其实和instanceId一样、
//只不过这个标识符是要在web界面展示给用户的
private String identify;
private String active;
@ -61,11 +62,11 @@ public class InstanceInfo {
private volatile ActionType actionType;
private volatile boolean isInstanceInfoDirty = false;
//客户端最后更新时间戳
private volatile Long lastUpdatedTimestamp;
private volatile Long lastDirtyTimestamp;
//服务实例的默认状态为up也就是上线状态
private volatile InstanceStatus status = InstanceStatus.UP;
private volatile InstanceStatus overriddenStatus = InstanceStatus.UNKNOWN;

@ -30,6 +30,7 @@ import java.util.concurrent.ThreadFactory;
/**
* Dynamic thread-pool register parameter.
* 线
*/
@Data
@Builder

@ -47,11 +47,16 @@ import java.util.concurrent.TimeUnit;
/**
* Dynamic thread-pool post processor.
* BeanPostProcessor
* 1. postProcessBeforeInitialization(Object bean, String beanName) bean
* @PostConstruct InitializingBean afterPropertiesSet
* 2. postProcessAfterInitialization(Object bean, String beanName) bean
*/
@Slf4j
@AllArgsConstructor
public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
// 配置信息对象
private final BootstrapConfigProperties configProperties;
private static final int DEFAULT_ACTIVE_ALARM = 80;
@ -62,21 +67,37 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
private static final String DEFAULT_RECEIVES = "";
// bean前置处理方法
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) {
return bean;
}
// bean后置处理方法
// 在这里判断bean是否为动态线程池对象如果是的话就可以把动态线程池信息注册到服务端
// 这个方法就是本类最核心的方法用来处理DynamicThreadPoolExecutor对象
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
// 这里会先判断一下传进来的bean是否属于DynamicThreadPoolExecutor类型如果大家看了我在DynamicThreadPoolConfig类提供的几个例子
// 就会发现我创建动态线程池对象最终是以Executor或者ThreadPoolExecutor形式返回的如果是以Executor形式返回的这个Executor接收的还并不是一个DynamicThreadPoolExecutor对象
// 而是一个ExecutorTtlWrapper对象这个ExecutorTtlWrapper对象的作用我已经在DynamicThreadPoolConfig类中解释了这时候ExecutorTtlWrapper对象肯定就不属于DynamicThreadPoolExecutor类型了
// 但是先别急虽然ExecutorTtlWrapper对象不属于DynamicThreadPoolExecutor类型但是后面的DynamicThreadPoolAdapterChoose.match(bean)这个条件还是可以通过的,所以仍然可以进入下面的分支
// 那为什么要执行DynamicThreadPoolAdapterChoose.match(bean)这行代码呢原因也很简单因为有时候用户可能会使用spring本身的线程池或者其他第三方形式的线程池比如ExecutorTtl比如spring的ThreadPoolTaskExecutor
// 该动态线程池框架也想收集这些线程池的信息所以就会在DynamicThreadPoolAdapterChoose.match(bean)中判断程序内是否有这些第三方线程池的适配器如果有就可以使用这些适配器把这些第三方线程池转换成DynamicThreadPoolExecutor对象
// 之后的逻辑就和处理真正的DynamicThreadPoolExecutor对象一样了无非就是把线程池信息注册到服务端然后把线程池保存在线程池全局管理器中
// DynamicThreadPoolAdapterChoose.match(bean)就是判断bean的类型是否为ThreadPoolTaskExecutor、ExecutorTtlWrapper、ExecutorServiceTtlWrapper中的一个这些都是第三方的线程池
if (bean instanceof DynamicThreadPoolExecutor || DynamicThreadPoolAdapterChoose.match(bean)) {
DynamicThreadPool dynamicThreadPool;
try {
// 判断该线程池bean对象上是否存在DynamicThreadPool注解
dynamicThreadPool = ApplicationContextHolder.findAnnotationOnBean(beanName, DynamicThreadPool.class);
// 如果找不到该注解,就进入下面这个分支
if (Objects.isNull(dynamicThreadPool)) {
// Adapt to lower versions of SpringBoot.
// 这里就是为了适配SpringBoot低版本使用DynamicThreadPoolAnnotationUtil工具再次查找注解
dynamicThreadPool = DynamicThreadPoolAnnotationUtil.findAnnotationOnBean(beanName, DynamicThreadPool.class);
if (Objects.isNull(dynamicThreadPool)) {
// 还是找不到则直接返回bean即可
return bean;
}
}
@ -84,10 +105,18 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
log.error("Failed to create dynamic thread pool in annotation mode.", ex);
return bean;
}
// 走到这里意味着当前的bean上有DynamicThreadPool注解也就意味着是一个动态线程池下面就要收集动态线程池配置信息了
// 定义一个动态线程池
ThreadPoolExecutor dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean);
// 下面的if分支会先从适配器中获得真正的动态线程池如果获得的线程池为空说明当前bean本身就是动态线程池如果不为空则正好得到了真正的动态线程池并且赋值给dynamicThreadPoolExecutor了
// 将bean转换为dynamicThreadPoolExecutor类型确切地说不是把当前要交给容器的这个bean转换成dynamicThreadPoolExecutor对象
// 实际上ExecutorTtlWrapper只是持有了dynamicThreadPoolExecutor的引用这里只不过是直接利用反射从ExecutorTtlWrapper把dynamicThreadPoolExecutor对象取出来了
if (dynamicThreadPoolExecutor == null) {
dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean;
}
// 将刚刚得到的dynamicThreadPoolExecutor对象包装成一个DynamicThreadPoolWrapper对象这个对象会被交给线程池全局管理器来管理
// 之后收集线程池运行信息时都要用到这个对象
// 在这里把动态线程池的信息注册给服务端了
ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(((DynamicThreadPoolExecutor) dynamicThreadPoolExecutor).getThreadPoolId(), dynamicThreadPoolExecutor);
DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor);
return DynamicThreadPoolAdapterChoose.match(bean) ? bean : remoteThreadPoolExecutor;
@ -100,10 +129,15 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
*
* @param threadPoolId dynamic thread-pool id
* @param executor dynamic thread-pool executor
* fillPoolAndRegister
* 线
* DynamicThreadPoolRegisterParameter
* HttpAgent
*/
protected ThreadPoolExecutor fillPoolAndRegister(String threadPoolId, ThreadPoolExecutor executor) {
ExecutorProperties executorProperties = null;
if (configProperties.getExecutors() != null) {
// 从配置文件中获取线程池配置信息
executorProperties = configProperties.getExecutors()
.stream()
.filter(each -> Objects.equals(threadPoolId, each.getThreadPoolId()))

@ -26,11 +26,26 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* Bootstrap properties.
* //动态线程池服务端的地址
* spring.dynamic.thread-pool.server-addr=http://localhost:6691
* //客户端namespace
* spring.dynamic.thread-pool.namespace=prescription
* //itemId
* spring.dynamic.thread-pool.item-id=dynamic-threadpool-example
* //访问服务端时需要的用户名和密码
* spring.dynamic.thread-pool.username=admin
* spring.dynamic.thread-pool.password=123456
*/
@Slf4j
@Getter
@Setter
@ConfigurationProperties(prefix = Constants.CONFIGURATION_PROPERTIES_PREFIX)
/**
* 使 BootstrapProperties BootstrapProperties
* SpringBoot
* BootstrapProperties SpringBoot
* DynamicThreadPoolAutoConfiguration
*/
public class BootstrapProperties implements BootstrapPropertiesInterface {
/**
@ -50,31 +65,40 @@ public class BootstrapProperties implements BootstrapPropertiesInterface {
/**
* Netty server port
* //netty服务器的端口号这个是可配置的
* //在hippo4j框架提供了两种通信方式一种是http一种就是netty
* //在该框架中默认使用的是http所以我就不引入netty了
*/
private String nettyServerPort;
/**
* Report type
* //客户端上报给服务端线程池历史信息的方法这个也可以使用netty的方式上报
* //我仍然使用内部默认的http了不引入netty
*/
private String reportType;
/**
* Namespace
* //命名空间
*/
private String namespace;
/**
* Item id
* //项目Id
*/
private String itemId;
/**
* Whether to enable dynamic thread pool
* //是否启动动态线程池
*/
private Boolean enable = true;
/**
* Print dynamic thread pool banner
* //是否在控制台打印hippo4j的启动图案
*/
private Boolean banner = true;

@ -91,11 +91,15 @@ import org.springframework.core.env.ConfigurableEnvironment;
@Configuration
@AllArgsConstructor
@ConditionalOnBean(MarkerConfiguration.Marker.class)
//这个注解会使BootstrapProperties类上的ConfigurationProperties注解生效
// BootstrapProperties对象就可以被SpringBoot容器管理了
@EnableConfigurationProperties(BootstrapProperties.class)
@ConditionalOnProperty(prefix = Constants.CONFIGURATION_PROPERTIES_PREFIX, value = "enable", matchIfMissing = true, havingValue = "true")
@ImportAutoConfiguration({WebAdapterConfiguration.class, NettyClientConfiguration.class, DiscoveryConfiguration.class, MessageConfiguration.class, UtilAutoConfiguration.class})
public class DynamicThreadPoolAutoConfiguration {
//在这里把配置文件中的相关信息封封装到这个成员变量中了
//properties对象会被自动注入
private final BootstrapProperties properties;
private final ConfigurableEnvironment environment;
@ -137,6 +141,8 @@ public class DynamicThreadPoolAutoConfiguration {
return new AdaptedThreadPoolDestroyPostProcessor(applicationContext);
}
//动态线程池处理器这个处理器其实是就是spring中的一个bean处理器在这个bean处理器中把动态线程池包装成了DynamicThreadPoolRegisterWrapper对象
//然后开始向服务端注册该动态线程池的信息
@Bean
@ConditionalOnProperty(prefix = Constants.CONFIGURATION_PROPERTIES_PREFIX, value = "enable", matchIfMissing = true, havingValue = "true")
@SuppressWarnings("all")
@ -242,6 +248,7 @@ public class DynamicThreadPoolAutoConfiguration {
return new DynamicThreadPoolSubscribeConfig(threadPoolDynamicRefresh, clientWorker, properties);
}
//远程通信组件使用的是http通信方式
@Bean
public HttpAgent httpAgent(BootstrapProperties properties) {
return new ServerHttpAgent(properties);

@ -34,30 +34,43 @@ import java.util.concurrent.TimeUnit;
/**
* Server http agent.
* 访httpServerHttpAgentHttpAgent
*
* DynamicThreadPoolAutoConfiguration
*/
public class ServerHttpAgent implements HttpAgent {
//配置信息对象
private final BootstrapProperties dynamicThreadPoolProperties;
//服务地址管理器,这个对象中封装着可用的服务端地址列表,当然,服务端地址的列表信息,需要用户提前定义在配置文件中
//nacos中也有这个类
private final ServerListManager serverListManager;
//安全代理类如果有朋友自己看过nacos客户端源码肯定对这个类的作用不陌生这就是要给安全代理类
//主要作用就是用来访问服务端的通过这个类从服务端获得token以后每次访问服务端都会携带这个token
//但是在我迭代的nacos客户端代码中我把这个功能给省略了如果大家感兴趣可以自己去看看nacos客户端的源码
private SecurityProxy securityProxy;
private ServerHealthCheck serverHealthCheck;
//定时任务执行器这个定时任务执行器会定期刷新本地缓存的token
private ScheduledExecutorService executorService;
//定时任务执行间隔
private final long securityInfoRefreshIntervalMills = TimeUnit.SECONDS.toMillis(5);
public ServerHttpAgent(BootstrapProperties properties) {
this.dynamicThreadPoolProperties = properties;
this.serverListManager = new ServerListManager(dynamicThreadPoolProperties);
this.securityProxy = new SecurityProxy(properties);
//在这里已经先从服务端获取到了token了
this.securityProxy.applyToken(this.serverListManager.getServerUrls());
//创建定时任务执行器
this.executorService = new ScheduledThreadPoolExecutor(
new Integer(1),
ThreadFactoryBuilder.builder().daemon(true).prefix("client.scheduled.token.security.updater").build());
//向定时任务执行器提交了定期执行的任务
this.executorService.scheduleWithFixedDelay(
//定期访问服务端刷新本地token
() -> securityProxy.applyToken(serverListManager.getServerUrls()),
0,
securityInfoRefreshIntervalMills,
@ -85,6 +98,8 @@ public class ServerHttpAgent implements HttpAgent {
return HttpUtil.get(buildUrl(path), Result.class);
}
//下面就是具体的访问服务端的方法了,这里只需要强调一点,那就是访问服务端之前
//会调用本类的injectSecurityInfo方法把本地token封装到请求中一起发送给服务端
@Override
public Result httpPost(String path, Object body) {
isHealthStatus();
@ -129,6 +144,7 @@ public class ServerHttpAgent implements HttpAgent {
serverHealthCheck.isHealthStatus();
}
//把本地token封装到map中的方法
private Map injectSecurityInfo(Map<String, String> params) {
if (StringUtil.isNotBlank(securityProxy.getAccessToken())) {
params.put(Constants.ACCESS_TOKEN, securityProxy.getAccessToken());

@ -66,6 +66,7 @@ import static cn.hippo4j.common.constant.Constants.TP_ID;
/**
* Dynamic thread-pool post processor.
* DynamicThreadPoolAutoConfiguration bean
*/
@Slf4j
@AllArgsConstructor
@ -101,6 +102,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
if ((dynamicThreadPoolExecutor) == null) {
dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean;
}
//在这里把动态线程池的信息注册给服务端了
ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(((DynamicThreadPoolExecutor) dynamicThreadPoolExecutor).getThreadPoolId(), dynamicThreadPoolExecutor);
DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor);
subscribeConfig(((DynamicThreadPoolExecutor) dynamicThreadPoolExecutor).getThreadPoolId());
@ -125,24 +127,40 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
*
* @param threadPoolId dynamic thread-pool id
* @param executor dynamic thread-pool executor
* 线fillPoolAndRegister
* dynamicThreadPoolWrapperdynamicThreadPoolWrapper
*
*/
protected ThreadPoolExecutor fillPoolAndRegister(String threadPoolId, ThreadPoolExecutor executor) {
//封装线程池Id命名空间项目Id信息
Map<String, String> queryStrMap = new HashMap<>(INITIAL_CAPACITY);
queryStrMap.put(TP_ID, threadPoolId);
queryStrMap.put(ITEM_ID, properties.getItemId());
queryStrMap.put(NAMESPACE, properties.getNamespace());
//创建封装线程池参数信息的对象
ThreadPoolParameterInfo threadPoolParameterInfo = new ThreadPoolParameterInfo();
//下面就是首先访问服务端,看看服务端是否存在动态线程池的配置信息的操作,如果存在就是用服务端的信息刷新本地动态线程池的配置信息
try {
//这里做了一个访问服务端的操作这是因为也许用户通过web界面已经实现在服务端定义好了线程池的配置信息
//所以要以服务端的配置信息为主,因此在这里先访问服务端,看看服务端有没有设置好的动态线程池信息,其实就是去服务端查询数据库而已
//这里访问的就是服务端的ConfigController类的detailConfigInfo方法
//Constants.CONFIG_CONTROLLER_PATH就是要访问的服务端的接口路径为"/hippo4j/v1/cs/configs"
Result result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, HTTP_EXECUTE_TIMEOUT);
//判断返回的结果中是否存在最新的线程池配置信息
if (result.isSuccess() && result.getData() != null) {
String resultJsonStr = JSONUtil.toJSONString(result.getData());
//如果存在就获取信息然后转换成threadPoolParameterInfo对象
threadPoolParameterInfo = JSONUtil.parseObject(resultJsonStr, ThreadPoolParameterInfo.class);
if (threadPoolParameterInfo != null) {
//在这里刷新本地动态线程池的信息
threadPoolParamReplace(executor, threadPoolParameterInfo);
registerNotifyAlarm(threadPoolParameterInfo);
}
} else {
// DynamicThreadPool configuration undefined in server
//下面就是第一次把动态线程池注册到服务端的操作
//如果走到这里就意味着服务端没有当前动态线程池的任何信息那就要在客户端构建一个DynamicThreadPoolRegisterWrapper对象然后把这个对象直接发送给服务端进行注册即可
//这里创建的这个DynamicThreadPoolRegisterParameter对象封装了动态线程池的核心参数信息
DynamicThreadPoolRegisterParameter parameterInfo = DynamicThreadPoolRegisterParameter.builder()
.threadPoolId(threadPoolId)
.corePoolSize(executor.getCorePoolSize())
@ -158,9 +176,14 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.executeTimeOut(EXECUTE_TIME_OUT)
.rejectedPolicyType(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(executor.getRejectedExecutionHandler().getClass().getSimpleName()))
.build();
//在这里创建了DynamicThreadPoolRegisterWrapper对象并且把刚才创建的parameterInfo交给registerWrapper对象
// 这个registerWrapper对象要发送给服务端进行注册
DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder()
.parameter(parameterInfo)
.build();
//将线程池信息注册到服务端,这里是通过线程池全局管理器来注册的
//还记得我之前在展示GlobalThreadPoolManage代码的时候让大家对dynamicRegister方法混个眼熟这里就用到了dynamicRegister方法
//开始真正把客户端线程池信息注册到服务端了
GlobalThreadPoolManage.dynamicRegister(registerWrapper);
}
} catch (Exception ex) {

@ -59,6 +59,7 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
/**
* Wait for tasks to complete on shutdown
* //关闭线程池时是否等待任务执行完毕
*/
@Getter
@Setter
@ -102,6 +103,9 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
//构造方法
//这个构造方法中有两个参数一个是executeTimeOut这个就是用户设置的任务执行的超时时间这个参数该怎么发挥作用呢
//另一个是awaitTerminationMillis代表线程池关闭时等待剩余任务执行的最大时间那这个参数又该怎么发挥作用呢
public DynamicThreadPoolExecutor(
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
@ -110,13 +114,19 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
@NonNull String threadPoolId,
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler rejectedExecutionHandler) {
// 可以看到向父类构造方法传入了一个 threadPoolId 参数threadPoolId 就是动态线程池的 Id。
// 每一个动态线程池都有自己的 Id用来和其他动态线程池做区分
super(
// 这里给ExtensibleThreadPoolExecutor 的构造方法传入了一个 DefaultThreadPoolPluginManager 参数
threadPoolId, new DefaultThreadPoolPluginManager().setPluginComparator(AnnotationAwareOrderComparator.INSTANCE),
corePoolSize, maximumPoolSize, keepAliveTime, unit,
blockingQueue, threadFactory, rejectedExecutionHandler);
log.info("Initializing ExecutorService '{}'", threadPoolId);
this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown;
// Init default plugins.
//在这里创建了默认的插件管理器DefaultThreadPoolPluginManager对象
//注意,这里有一行非常重要的代码,这里的操作非常重要,就是在这里,把所有插件都注册到当前动态线程池对象的插件管理器中了
//这时候executeTimeOut和awaitTerminationMillis参数也就可以发挥作用了
new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis)
.doRegister(this);
this.active = new AtomicBoolean(true);
@ -135,6 +145,7 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
/**
* Invoked by the containing {@code BeanFactory} on destruction of a bean.
*/
//销毁动态线程池的方法
@Override
public void destroy() {
// instance has been destroyed, not need to call this method again
@ -142,11 +153,13 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
log.warn("Failed to destroy ExecutorService '{}' because it has already been destroyed", getThreadPoolId());
return;
}
//如果等待任务执行完毕再终止线程池工作就调用shutdown方法
if (isWaitForTasksToCompleteOnShutdown()) {
super.shutdown();
} else {
super.shutdownNow();
}
//在这里清空插件管理器中的插件
getThreadPoolPluginManager().clear();
log.info("ExecutorService '{}' has been destroyed", getThreadPoolId());

@ -50,17 +50,21 @@ import java.util.concurrent.TimeUnit;
*
* @see ThreadPoolPluginManager
* @see ThreadPoolPlugin
* 线jdk线
*
*/
public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements ThreadPoolPluginSupport {
/**
* Thread pool id
* 线Id
*/
@Getter
private final String threadPoolId;
/**
* Action aware registry
* 线
*/
@Getter
private final ThreadPoolPluginManager threadPoolPluginManager;
@ -108,6 +112,7 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
@NonNull RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
// pool extended info.
//给线程池Id赋值
this.threadPoolId = threadPoolId;
this.threadPoolPluginManager = threadPoolPluginManager;
// proxy handler to support callback, repeated packaging of the same rejection policy should be avoided here.
@ -122,6 +127,7 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
*
* @param thread the thread that will run task {@code r}
* @param runnable the task that will be executed
*
*/
@Override
protected void beforeExecute(Thread thread, Runnable runnable) {
@ -158,6 +164,7 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
* @param runnable the runnable that has completed
* @param throwable the exception that caused termination, or null if
* execution completed normally
*
*/
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {

@ -34,7 +34,10 @@ public class GlobalThreadPoolManage {
* @param registerWrapper register wrapper
*/
public static ThreadPoolExecutor dynamicRegister(DynamicThreadPoolRegisterWrapper registerWrapper) {
//这里从ApplicationContext中得到了DynamicThreadPoolService对象
//ApplicationContextHolder其实就是SpringBoot的ApplicationContext
DynamicThreadPoolService dynamicThreadPoolService = ApplicationContextHolder.getBean(DynamicThreadPoolService.class);
//DynamicThreadPoolService对象把动态线程池信息注册到服务端
return dynamicThreadPoolService.registerDynamicThreadPool(registerWrapper);
}
}

@ -93,6 +93,7 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
/**
* All registered {@link ThreadPoolPlugin}
* 线registeredPlugins
*/
private final Map<String, ThreadPoolPlugin> registeredPlugins = new ConcurrentHashMap<>(16);

@ -35,17 +35,22 @@ import lombok.NoArgsConstructor;
* @see TaskRejectNotifyAlarmPlugin
* @see ThreadPoolExecutorShutdownPlugin
*/
// 默认的线程池插件注册器组件,通过这个注册器对象,把线程池的所有插件都注册到线程池的插件管理器中
// DefaultThreadPoolPluginManager是一个线程池插件管理器它的作用是管理线程池的插件
// 这个组件在DynamicThreadPoolExecutor对象的构造方法中被创建然后开始将插件注册到插件管理器中
@NoArgsConstructor
@AllArgsConstructor
public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistrar {
/**
* Execute time out
* //这个就是用户设置的任务执行的超时时间
*/
private long executeTimeOut;
/**
* Await termination millis
* //线程池关闭时,等待剩余任务执行的最大时间
*/
private long awaitTerminationMillis;
@ -54,12 +59,17 @@ public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistr
*
* @param support thread pool plugin manager delegate
*/
//这个就是把动态线程池各个内置的功能扩展插件注册到动态线程池的内部的插件管理器成员变量中的方法
@Override
public void doRegister(ThreadPoolPluginSupport support) {
//这里调用了DynamicThreadPoolExecutor对象的register方法但是在我们目前的程序中
//动态线程池中根本没有register方法所以还需要在动态线程池中再添加一个register方法
//具体实现我就写在当前这个代码块中了,请大家继续向下看
support.register(new TaskDecoratorPlugin());
support.register(new TaskTimeoutNotifyAlarmPlugin(support.getThreadPoolId(), executeTimeOut, support.getThreadPoolExecutor()));
support.register(new TaskRejectCountRecordPlugin());
support.register(new TaskRejectNotifyAlarmPlugin());
//awaitTerminationMillis就是等待剩余任务执行的最大时间这两个参数是怎么被赋值的呢
support.register(new ThreadPoolExecutorShutdownPlugin(awaitTerminationMillis));
}
}

@ -73,6 +73,10 @@ public interface ThreadPoolPluginSupport extends ThreadPoolPluginManager {
*/
@Override
default void register(ThreadPoolPlugin plugin) {
//动态线程池持有了插件管理器这个插件管理器是定义在ExtensibleThreadPoolExecutor类中的
//并且使用了lombok的@Getter注解所以就可以直接调用getThreadPoolPluginManager()方法得到插件管理器
//然后把插件注册到插件管理器中即可
// ThreadPoolPluginManager 是DynamicThreadPoolExecutor 构造器中传入了一个DefaultThreadPoolPluginManager
getThreadPoolPluginManager().register(plugin);
}

@ -85,6 +85,7 @@ public class AbstractBuildThreadPoolTemplate {
*
* @param initParam init param
* @return dynamic monitor thread-pool
* 线
*/
public static DynamicThreadPoolExecutor buildDynamicPool(ThreadPoolInitParam initParam) {
Assert.notNull(initParam);
@ -105,6 +106,7 @@ public class AbstractBuildThreadPoolTemplate {
} catch (IllegalArgumentException ex) {
throw new IllegalArgumentException(String.format("Error creating thread pool parameter. threadPool id: %s", initParam.getThreadPoolId()), ex);
}
//在这里设置了任务装饰器
dynamicThreadPoolExecutor.setTaskDecorator(initParam.getTaskDecorator());
dynamicThreadPoolExecutor.allowCoreThreadTimeOut(initParam.allowCoreThreadTimeOut);
return dynamicThreadPoolExecutor;
@ -112,6 +114,7 @@ public class AbstractBuildThreadPoolTemplate {
/**
* Thread-pool init param.
* 线线
*/
@Data
@Accessors(chain = true)

@ -35,14 +35,27 @@ import java.util.concurrent.TimeUnit;
*/
public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
//这个表示线程池构建器是否要创建快速线程池,所谓快速线程池
//其实就是使用的任务队列特殊一点使用了TaskQueue任务队列
//这个队列有一个特点,那就是只要当提交给任务队列的数量大于线程池当前线程的数量了
//但是当前线程数量仍然小于线程池的最大线程数量,这时候就可以不把任务添加到队列中
//而是让线程池直接创建线程执行任务所谓快速就快速在这里。实际上在hippo4j框架中
//不仅提供了动态线程池还提供了FastThreadPoolExecutor快速线程池最后还提供了普通线程池也就是原生的ThreadPoolExecutor。
//这三种线程池想创建哪一种就创建哪一种只不过只有DynamicThreadPoolExecutor动态线程池会被注册到服务单并且可以动态更新配置信息
//快速线程池和普通线程池我就不在文章中展示了反正所有线程池的创建都在ThreadPoolBuilder体系下大家掌握了DynamicThreadPoolExecutor是如何创建的
//其他的也就都掌握了
private boolean isFastPool;
//表示这个线程池构建器是否要创建动态线程池
private boolean isDynamicPool;
//默认的线程池的核心线程数量这个数量是根据CPU核心数量计算出来的
private int corePoolSize = calculateCoreNum();
//默认的池最大线程数量
private int maximumPoolSize = corePoolSize + (corePoolSize >> 1);
//默认存活时间
private long keepAliveTime = 30000L;
private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
@ -418,6 +431,11 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
* @return dynamic thread-pool executor
*/
private static ThreadPoolExecutor buildDynamicPool(ThreadPoolBuilder builder) {
//AbstractBuildThreadPoolTemplate.buildDynamicPool()方法中又调用了当前构建器对象的buildInitParam(builder)方法
//buildInitParam(builder)方法会得到一个ThreadPoolInitParam线程池初始化参数对象该对象封装了线程池需要的配置参数
//线程池构建器模板就会使用这个参数对象创建动态线程池
//而线程池构建器模板就是AbstractBuildThreadPoolTemplate至于为什么是抽象的是因为这个模板不仅可以创建动态线程池
//还可以创建动态线程池,还有普通线程池,这个就不再过多解释了
return AbstractBuildThreadPoolTemplate.buildDynamicPool(buildInitParam(builder));
}
@ -428,13 +446,19 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
* @return thread-pool init param
*/
private static AbstractBuildThreadPoolTemplate.ThreadPoolInitParam buildInitParam(ThreadPoolBuilder builder) {
//定义一个ThreadPoolInitParam对象
AbstractBuildThreadPoolTemplate.ThreadPoolInitParam initParam;
if (builder.threadFactory == null) {
//判断线程工厂是否为空,如果线程工厂为空,就判断线程池中线程前缀是否为空
//如果线程前缀不为空则直接创建ThreadPoolInitParam对象即可这里创建ThreadPoolInitParam对象的过程中其实就把ThreadPoolInitParam
//对象内部的ThreadFactory成员变量创建成功了
Assert.notEmpty(builder.threadNamePrefix, "The thread name prefix cannot be empty or an empty string.");
initParam = new AbstractBuildThreadPoolTemplate.ThreadPoolInitParam(builder.threadNamePrefix, builder.isDaemon);
} else {
//如果线程工厂不为空直接创建ThreadPoolInitParam对象即可
initParam = new AbstractBuildThreadPoolTemplate.ThreadPoolInitParam(builder.threadFactory);
}
//接下来就要使用刚才得到的构造器对象给initParam中的其他成员变量赋值即可
initParam.setCorePoolNum(builder.corePoolSize)
.setMaximumPoolSize(builder.maximumPoolSize)
.setKeepAliveTime(builder.keepAliveTime)
@ -444,10 +468,15 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
.setTimeUnit(builder.timeUnit)
.setAllowCoreThreadTimeOut(builder.allowCoreThreadTimeOut)
.setTaskDecorator(builder.taskDecorator);
//判断用户要创建的是什么线程池
if (builder.isDynamicPool) {
//这里创建的就是动态线程池得到线程池Id
String threadPoolId = Optional.ofNullable(builder.threadPoolId).orElse(builder.threadNamePrefix);
//设置线程池Id
initParam.setThreadPoolId(threadPoolId);
//设置线程池关闭时是否等待正在执行的任务执行完毕
initParam.setWaitForTasksToCompleteOnShutdown(builder.waitForTasksToCompleteOnShutdown);
//设置线程池关闭时,等待剩余任务执行的最大时间
initParam.setAwaitTerminationMillis(builder.awaitTerminationMillis);
}
if (!builder.isFastPool) {

@ -30,11 +30,14 @@ import java.util.concurrent.ThreadPoolExecutor;
/**
* Dynamic thread pool adapter choose.
* 线
*/
public class DynamicThreadPoolAdapterChoose {
// 存储了所有执行器适配器对象的集合
private static final List<DynamicThreadPoolAdapter> DYNAMIC_THREAD_POOL_ADAPTERS = new ArrayList<>();
// 添加三个动态线程池适配器对象
static {
DYNAMIC_THREAD_POOL_ADAPTERS.add(new TransmittableThreadLocalExecutorAdapter());
DYNAMIC_THREAD_POOL_ADAPTERS.add(new TransmittableThreadLocalExecutorServiceAdapter());
@ -49,6 +52,7 @@ public class DynamicThreadPoolAdapterChoose {
* @param executor objects where there may be instances
* of dynamic thread pools
* @return matching results
* //匹配执行器的适配器对象
*/
public static boolean match(Object executor) {
return DYNAMIC_THREAD_POOL_ADAPTERS.stream().anyMatch(each -> each.match(executor));
@ -60,6 +64,7 @@ public class DynamicThreadPoolAdapterChoose {
* @param executor objects where there may be instances
* of dynamic thread pools
* @return get the real dynamic thread pool instance
* //使用执行器的适配器对象得到执行器中的动态线程池
*/
public static ThreadPoolExecutor unwrap(Object executor) {
Optional<DynamicThreadPoolAdapter> dynamicThreadPoolAdapterOptional = DYNAMIC_THREAD_POOL_ADAPTERS.stream().filter(each -> each.match(executor)).findFirst();
@ -73,6 +78,7 @@ public class DynamicThreadPoolAdapterChoose {
* @param executor objects where there may be instances
* of dynamic thread pools
* @param dynamicThreadPoolExecutor dynamic thread-pool executor
* //使用dynamicThreadPoolExecutor替代executor中的线程池成员变量
*/
public static void replace(Object executor, Executor dynamicThreadPoolExecutor) {
Optional<DynamicThreadPoolAdapter> dynamicThreadPoolAdapterOptional = DYNAMIC_THREAD_POOL_ADAPTERS.stream().filter(each -> each.match(executor)).findFirst();

@ -22,9 +22,11 @@ import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.executor.plugin.*;
import cn.hippo4j.core.executor.plugin.manager.DefaultThreadPoolPluginManager;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -33,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* test for default method of {@link ThreadPoolPlugin} and it's subclass
*/
@Slf4j
public class ThreadPoolPluginTest {
@Test
@ -88,6 +91,16 @@ public class ThreadPoolPluginTest {
private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin {
private final String id = this.getClass().getSimpleName();
@Override
public void beforeShutdown(ThreadPoolExecutor executor) {
log.info("before shutdown", id);
}
@Override
public void afterShutdown(ThreadPoolExecutor executor, List<Runnable> remainingTasks) {
log.info("after shutdown", id);
}
}
}

Loading…
Cancel
Save