diff --git a/examples/threadpool-example/example-core/src/main/java/cn/hippo4j/example/core/config/DynamicThreadPoolConfig.java b/examples/threadpool-example/example-core/src/main/java/cn/hippo4j/example/core/config/DynamicThreadPoolConfig.java index 11855427..ff63fe95 100644 --- a/examples/threadpool-example/example-core/src/main/java/cn/hippo4j/example/core/config/DynamicThreadPoolConfig.java +++ b/examples/threadpool-example/example-core/src/main/java/cn/hippo4j/example/core/config/DynamicThreadPoolConfig.java @@ -60,6 +60,13 @@ public class DynamicThreadPoolConfig { private static final int MAX_POOL_SIZE = AVAILABLE_PROCESSORS * 4; + /** + * 我定义了一个 SpringBoot 的配置类,在配置类中定义了一个 DynamicThreadPoolExecutor 对象交给了 SpringBoot 容器来管理, + * 并且还在该对象上面添加了 @DynamicThreadPool 注解。这样一来,我肯定就可以在 + * DynamicThreadPoolPostProcessor 对象处理器的 postProcessAfterInitialization + * 方法中得到该对象,然后判断该对象是否为动态线程池对象,如果是的话,就可以把这个动态线程池的信息注册到服务端 + * @return + */ @Bean @DynamicThreadPool public Executor messageConsumeTtlDynamicThreadPool() { @@ -78,6 +85,22 @@ public class DynamicThreadPoolConfig { return ttlExecutor; } + @Bean + @DynamicThreadPool + public Executor messageConsumeTestDynamicThreadPool() { + String threadPoolId = "test"; + ThreadPoolExecutor customExecutor = ThreadPoolBuilder.builder() + .dynamicPool() + .threadFactory(threadPoolId) + .threadPoolId(threadPoolId) + .executeTimeOut(EXECUTE_TIMEOUT) + .waitForTasksToCompleteOnShutdown(true) + .awaitTerminationMillis(AWAIT_TERMINATION_MILLIS) + .taskDecorator(new TaskTraceBuilderHandler()) + .build(); + return customExecutor; + } + /** * {@link Bean @Bean} and {@link DynamicThreadPool @DynamicThreadPool}. */ diff --git a/infra/common/src/main/java/cn/hippo4j/common/handler/TransmittableThreadLocalExecutorAdapter.java b/infra/common/src/main/java/cn/hippo4j/common/handler/TransmittableThreadLocalExecutorAdapter.java index 079fc11e..1867b9e4 100644 --- a/infra/common/src/main/java/cn/hippo4j/common/handler/TransmittableThreadLocalExecutorAdapter.java +++ b/infra/common/src/main/java/cn/hippo4j/common/handler/TransmittableThreadLocalExecutorAdapter.java @@ -33,18 +33,25 @@ public class TransmittableThreadLocalExecutorAdapter implements DynamicThreadPoo private static final String FIELD_NAME = "executor"; + // 判断传进来的对象是否和当前适配器器对象匹配 @Override public boolean match(Object executor) { + // 其实就是判断对象的类名是否为ExecutorTtlWrapper,如果是就意味着是第三方线程池 + // 这个线程池中持有者动态线程池对象 return Objects.equals(MATCH_CLASS_NAME, executor.getClass().getSimpleName()); } - + // 从ExecutorTtlWrapper对象中获得其持有的DynamicThreadPoolExecutor对象 @Override public ThreadPoolExecutor unwrap(Object executor) { + // 通过反射获得ExecutorTtlWrapper对象的executor成员变量 + // 在之前展示的ExecutorTtlWrapper类的代码中,可以看到,动态线程池会赋值给 + // ExecutorTtlWrapper的executor成员变量 return (ThreadPoolExecutor) ReflectUtil.getFieldValue(executor, FIELD_NAME); } @Override public void replace(Object executor, Executor dynamicThreadPoolExecutor) { + // 将dynamicThreadPoolExecutor对象替换到executor中 ReflectUtil.setFieldValue(executor, FIELD_NAME, dynamicThreadPoolExecutor); } } diff --git a/infra/common/src/main/java/cn/hippo4j/common/model/InstanceInfo.java b/infra/common/src/main/java/cn/hippo4j/common/model/InstanceInfo.java index 204e856a..804ac189 100644 --- a/infra/common/src/main/java/cn/hippo4j/common/model/InstanceInfo.java +++ b/infra/common/src/main/java/cn/hippo4j/common/model/InstanceInfo.java @@ -33,23 +33,24 @@ import lombok.extern.slf4j.Slf4j; public class InstanceInfo { private static final String UNKNOWN = "unknown"; - + //应用名称,未设置就是未知 private String appName = UNKNOWN; - + //地址 private String hostName; - + //这个就是命名空间+项目Id private String groupKey; - + //端口号 private String port; - + //客户端服务实例Id,其实就是客户端地址+uuid (127.0.0.1:8088_eceeab1ab6a0471b838b97a47cfa1268) private String instanceId; private String ipApplicationName; - + //客户端在配置文件中定义的上下文路径 private String clientBasePath; - + //客户端回调地址,这个地址非常重要,一会就会为大家解释说明 private String callBackUrl; - + //客户端唯一标识符,其实和instanceId一样、 + //只不过这个标识符是要在web界面展示给用户的 private String identify; private String active; @@ -61,11 +62,11 @@ public class InstanceInfo { private volatile ActionType actionType; private volatile boolean isInstanceInfoDirty = false; - + //客户端最后更新时间戳 private volatile Long lastUpdatedTimestamp; private volatile Long lastDirtyTimestamp; - + //服务实例的默认状态为up,也就是上线状态 private volatile InstanceStatus status = InstanceStatus.UP; private volatile InstanceStatus overriddenStatus = InstanceStatus.UNKNOWN; diff --git a/infra/common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java b/infra/common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java index 48671de5..293d8a7c 100644 --- a/infra/common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java +++ b/infra/common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java @@ -30,6 +30,7 @@ import java.util.concurrent.ThreadFactory; /** * Dynamic thread-pool register parameter. + * 封装动态线程池核心信息的对象 */ @Data @Builder diff --git a/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/support/DynamicThreadPoolPostProcessor.java b/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/support/DynamicThreadPoolPostProcessor.java index 864c7a8d..c8cd0edc 100644 --- a/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/support/DynamicThreadPoolPostProcessor.java +++ b/starters/threadpool/config/src/main/java/cn/hippo4j/config/springboot/starter/support/DynamicThreadPoolPostProcessor.java @@ -47,11 +47,16 @@ import java.util.concurrent.TimeUnit; /** * Dynamic thread-pool post processor. + * BeanPostProcessor 接口定义了两个主要方法: + * 1. postProcessBeforeInitialization(Object bean, String beanName):在 bean 初始化方法 + * (如 @PostConstruct 注解的方法或 InitializingBean 接口的 afterPropertiesSet 方法)调用之前执行。 + * 2. postProcessAfterInitialization(Object bean, String beanName):在 bean 初始化方法调用之后执行。 */ @Slf4j @AllArgsConstructor public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { + // 配置信息对象 private final BootstrapConfigProperties configProperties; private static final int DEFAULT_ACTIVE_ALARM = 80; @@ -62,21 +67,37 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { private static final String DEFAULT_RECEIVES = ""; + // bean前置处理方法 @Override public Object postProcessBeforeInitialization(Object bean, String beanName) { return bean; } + // bean后置处理方法 + // 在这里判断bean是否为动态线程池对象,如果是的话就可以把动态线程池信息注册到服务端 + // 这个方法就是本类最核心的方法,用来处理DynamicThreadPoolExecutor对象 @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + // 这里会先判断一下传进来的bean是否属于DynamicThreadPoolExecutor类型,如果大家看了我在DynamicThreadPoolConfig类提供的几个例子 + // 就会发现我创建动态线程池对象最终是以Executor或者ThreadPoolExecutor形式返回的,如果是以Executor形式返回的,这个Executor接收的还并不是一个DynamicThreadPoolExecutor对象 + // 而是一个ExecutorTtlWrapper对象,这个ExecutorTtlWrapper对象的作用我已经在DynamicThreadPoolConfig类中解释了,这时候,ExecutorTtlWrapper对象肯定就不属于DynamicThreadPoolExecutor类型了 + // 但是先别急,虽然ExecutorTtlWrapper对象不属于DynamicThreadPoolExecutor类型,但是后面的DynamicThreadPoolAdapterChoose.match(bean)这个条件还是可以通过的,所以仍然可以进入下面的分支 + // 那为什么要执行DynamicThreadPoolAdapterChoose.match(bean)这行代码呢?原因也很简单,因为有时候用户可能会使用spring本身的线程池,或者其他第三方形式的线程池,比如ExecutorTtl,比如spring的ThreadPoolTaskExecutor + // 该动态线程池框架也想收集这些线程池的信息,所以就会在DynamicThreadPoolAdapterChoose.match(bean)中判断程序内是否有这些第三方线程池的适配器,如果有,就可以使用这些适配器把这些第三方线程池转换成DynamicThreadPoolExecutor对象 + // 之后的逻辑就和处理真正的DynamicThreadPoolExecutor对象一样了,无非就是把线程池信息注册到服务端,然后把线程池保存在线程池全局管理器中 + // DynamicThreadPoolAdapterChoose.match(bean)就是判断bean的类型是否为ThreadPoolTaskExecutor、ExecutorTtlWrapper、ExecutorServiceTtlWrapper中的一个,这些都是第三方的线程池 if (bean instanceof DynamicThreadPoolExecutor || DynamicThreadPoolAdapterChoose.match(bean)) { DynamicThreadPool dynamicThreadPool; try { + // 判断该线程池bean对象上是否存在DynamicThreadPool注解 dynamicThreadPool = ApplicationContextHolder.findAnnotationOnBean(beanName, DynamicThreadPool.class); + // 如果找不到该注解,就进入下面这个分支 if (Objects.isNull(dynamicThreadPool)) { // Adapt to lower versions of SpringBoot. + // 这里就是为了适配SpringBoot低版本,使用DynamicThreadPoolAnnotationUtil工具再次查找注解 dynamicThreadPool = DynamicThreadPoolAnnotationUtil.findAnnotationOnBean(beanName, DynamicThreadPool.class); if (Objects.isNull(dynamicThreadPool)) { + // 还是找不到则直接返回bean即可 return bean; } } @@ -84,10 +105,18 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { log.error("Failed to create dynamic thread pool in annotation mode.", ex); return bean; } + // 走到这里意味着当前的bean上有DynamicThreadPool注解,也就意味着是一个动态线程池,下面就要收集动态线程池配置信息了 + // 定义一个动态线程池 ThreadPoolExecutor dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean); + // 下面的if分支会先从适配器中获得真正的动态线程池,如果获得的线程池为空,说明当前bean本身就是动态线程池,如果不为空,则正好得到了真正的动态线程池,并且赋值给dynamicThreadPoolExecutor了 + // 将bean转换为dynamicThreadPoolExecutor类型,确切地说不是把当前要交给容器的这个bean转换成dynamicThreadPoolExecutor对象 + // 实际上ExecutorTtlWrapper只是持有了dynamicThreadPoolExecutor的引用,这里只不过是直接利用反射从ExecutorTtlWrapper把dynamicThreadPoolExecutor对象取出来了 if (dynamicThreadPoolExecutor == null) { dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean; } + // 将刚刚得到的dynamicThreadPoolExecutor对象包装成一个DynamicThreadPoolWrapper对象,这个对象会被交给线程池全局管理器来管理 + // 之后收集线程池运行信息时都要用到这个对象 + // 在这里把动态线程池的信息注册给服务端了 ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(((DynamicThreadPoolExecutor) dynamicThreadPoolExecutor).getThreadPoolId(), dynamicThreadPoolExecutor); DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor); return DynamicThreadPoolAdapterChoose.match(bean) ? bean : remoteThreadPoolExecutor; @@ -100,10 +129,15 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { * * @param threadPoolId dynamic thread-pool id * @param executor dynamic thread-pool executor + * fillPoolAndRegister 方法实现思路非常简单,但要执行的操作就稍微多一些了, + * 我之所以说该方法实现思路简单,是因为在该方法中,只需要把动态线程池的配置信息封装到一个新的对象, + * 就是我即将要定义的 DynamicThreadPoolRegisterParameter 对象中, + * 然后将这个对象直接通过 HttpAgent 通信组件发送给服务端即可 */ protected ThreadPoolExecutor fillPoolAndRegister(String threadPoolId, ThreadPoolExecutor executor) { ExecutorProperties executorProperties = null; if (configProperties.getExecutors() != null) { + // 从配置文件中获取线程池配置信息 executorProperties = configProperties.getExecutors() .stream() .filter(each -> Objects.equals(threadPoolId, each.getThreadPoolId())) diff --git a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/config/BootstrapProperties.java b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/config/BootstrapProperties.java index 0669aed1..39ba16c1 100644 --- a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/config/BootstrapProperties.java +++ b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/config/BootstrapProperties.java @@ -26,11 +26,26 @@ import org.springframework.boot.context.properties.ConfigurationProperties; /** * Bootstrap properties. + * //动态线程池服务端的地址 + * spring.dynamic.thread-pool.server-addr=http://localhost:6691 + * //客户端namespace + * spring.dynamic.thread-pool.namespace=prescription + * //itemId + * spring.dynamic.thread-pool.item-id=dynamic-threadpool-example + * //访问服务端时需要的用户名和密码 + * spring.dynamic.thread-pool.username=admin + * spring.dynamic.thread-pool.password=123456 */ @Slf4j @Getter @Setter @ConfigurationProperties(prefix = Constants.CONFIGURATION_PROPERTIES_PREFIX) +/** + * 既然是要使用 BootstrapProperties 来封装配置类中的信息,那么 BootstrapProperties 类的对象肯定要被 + * SpringBoot 容器来管理,所以接下来我要定义一个配置类,在这个配置类中, + * 把 BootstrapProperties 的对象交给 SpringBoot 的容器来管理。 + * 这个配置类我也定义好了,DynamicThreadPoolAutoConfiguration + */ public class BootstrapProperties implements BootstrapPropertiesInterface { /** @@ -50,31 +65,40 @@ public class BootstrapProperties implements BootstrapPropertiesInterface { /** * Netty server port + * //netty服务器的端口号,这个是可配置的 + * //在hippo4j框架,提供了两种通信方式,一种是http,一种就是netty + * //在该框架中默认使用的是http,所以我就不引入netty了 */ private String nettyServerPort; /** * Report type + * //客户端上报给服务端线程池历史信息的方法,这个也可以使用netty的方式上报 + * //我仍然使用内部默认的http了,不引入netty */ private String reportType; /** * Namespace + * //命名空间 */ private String namespace; /** * Item id + * //项目Id */ private String itemId; /** * Whether to enable dynamic thread pool + * //是否启动动态线程池 */ private Boolean enable = true; /** * Print dynamic thread pool banner + * //是否在控制台打印hippo4j的启动图案 */ private Boolean banner = true; diff --git a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java index 50d59116..337d2b49 100644 --- a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -91,11 +91,15 @@ import org.springframework.core.env.ConfigurableEnvironment; @Configuration @AllArgsConstructor @ConditionalOnBean(MarkerConfiguration.Marker.class) +//这个注解会使BootstrapProperties类上的ConfigurationProperties注解生效, +// BootstrapProperties对象就可以被SpringBoot容器管理了 @EnableConfigurationProperties(BootstrapProperties.class) @ConditionalOnProperty(prefix = Constants.CONFIGURATION_PROPERTIES_PREFIX, value = "enable", matchIfMissing = true, havingValue = "true") @ImportAutoConfiguration({WebAdapterConfiguration.class, NettyClientConfiguration.class, DiscoveryConfiguration.class, MessageConfiguration.class, UtilAutoConfiguration.class}) public class DynamicThreadPoolAutoConfiguration { + //在这里把配置文件中的相关信息封封装到这个成员变量中了 + //properties对象会被自动注入 private final BootstrapProperties properties; private final ConfigurableEnvironment environment; @@ -137,6 +141,8 @@ public class DynamicThreadPoolAutoConfiguration { return new AdaptedThreadPoolDestroyPostProcessor(applicationContext); } + //动态线程池处理器,这个处理器其实是就是spring中的一个bean处理器,在这个bean处理器中把动态线程池包装成了DynamicThreadPoolRegisterWrapper对象 + //然后开始向服务端注册该动态线程池的信息 @Bean @ConditionalOnProperty(prefix = Constants.CONFIGURATION_PROPERTIES_PREFIX, value = "enable", matchIfMissing = true, havingValue = "true") @SuppressWarnings("all") @@ -242,6 +248,7 @@ public class DynamicThreadPoolAutoConfiguration { return new DynamicThreadPoolSubscribeConfig(threadPoolDynamicRefresh, clientWorker, properties); } + //远程通信组件,使用的是http通信方式 @Bean public HttpAgent httpAgent(BootstrapProperties properties) { return new ServerHttpAgent(properties); diff --git a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/remote/ServerHttpAgent.java b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/remote/ServerHttpAgent.java index 498aedd4..e478d34f 100644 --- a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/remote/ServerHttpAgent.java +++ b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/remote/ServerHttpAgent.java @@ -34,30 +34,43 @@ import java.util.concurrent.TimeUnit; /** * Server http agent. + * 用来访问服务端的http代理类,ServerHttpAgent实现了HttpAgent接口 + * + * 这个组件将会在DynamicThreadPoolAutoConfiguration 中被注入为组件 */ public class ServerHttpAgent implements HttpAgent { + //配置信息对象 private final BootstrapProperties dynamicThreadPoolProperties; + //服务地址管理器,这个对象中封装着可用的服务端地址列表,当然,服务端地址的列表信息,需要用户提前定义在配置文件中 + //nacos中也有这个类 private final ServerListManager serverListManager; + //安全代理类,如果有朋友自己看过nacos客户端源码,肯定对这个类的作用不陌生,这就是要给安全代理类 + //主要作用就是用来访问服务端的,通过这个类从服务端获得token,以后每次访问服务端都会携带这个token + //但是在我迭代的nacos客户端代码中,我把这个功能给省略了,如果大家感兴趣可以自己去看看nacos客户端的源码 private SecurityProxy securityProxy; private ServerHealthCheck serverHealthCheck; - + //定时任务执行器,这个定时任务执行器会定期刷新本地缓存的token private ScheduledExecutorService executorService; - + //定时任务执行间隔 private final long securityInfoRefreshIntervalMills = TimeUnit.SECONDS.toMillis(5); public ServerHttpAgent(BootstrapProperties properties) { this.dynamicThreadPoolProperties = properties; this.serverListManager = new ServerListManager(dynamicThreadPoolProperties); this.securityProxy = new SecurityProxy(properties); + //在这里已经先从服务端获取到了token了 this.securityProxy.applyToken(this.serverListManager.getServerUrls()); + //创建定时任务执行器 this.executorService = new ScheduledThreadPoolExecutor( new Integer(1), ThreadFactoryBuilder.builder().daemon(true).prefix("client.scheduled.token.security.updater").build()); + //向定时任务执行器提交了定期执行的任务 this.executorService.scheduleWithFixedDelay( + //定期访问服务端,刷新本地token () -> securityProxy.applyToken(serverListManager.getServerUrls()), 0, securityInfoRefreshIntervalMills, @@ -85,6 +98,8 @@ public class ServerHttpAgent implements HttpAgent { return HttpUtil.get(buildUrl(path), Result.class); } + //下面就是具体的访问服务端的方法了,这里只需要强调一点,那就是访问服务端之前 + //会调用本类的injectSecurityInfo方法,把本地token封装到请求中一起发送给服务端 @Override public Result httpPost(String path, Object body) { isHealthStatus(); @@ -129,6 +144,7 @@ public class ServerHttpAgent implements HttpAgent { serverHealthCheck.isHealthStatus(); } + //把本地token封装到map中的方法 private Map injectSecurityInfo(Map params) { if (StringUtil.isNotBlank(securityProxy.getAccessToken())) { params.put(Constants.ACCESS_TOKEN, securityProxy.getAccessToken()); diff --git a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java index e2ea8e8f..237984ef 100644 --- a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java +++ b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java @@ -66,6 +66,7 @@ import static cn.hippo4j.common.constant.Constants.TP_ID; /** * Dynamic thread-pool post processor. + * 这个组件在DynamicThreadPoolAutoConfiguration 中被配置为bean 组件 */ @Slf4j @AllArgsConstructor @@ -101,6 +102,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { if ((dynamicThreadPoolExecutor) == null) { dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean; } + //在这里把动态线程池的信息注册给服务端了 ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(((DynamicThreadPoolExecutor) dynamicThreadPoolExecutor).getThreadPoolId(), dynamicThreadPoolExecutor); DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor); subscribeConfig(((DynamicThreadPoolExecutor) dynamicThreadPoolExecutor).getThreadPoolId()); @@ -125,24 +127,40 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { * * @param threadPoolId dynamic thread-pool id * @param executor dynamic thread-pool executor + * 注册线程池信息到服务端的方法,注意,这里交给fillPoolAndRegister方法的 + * 已经是dynamicThreadPoolWrapper对象了,而dynamicThreadPoolWrapper对象的代码之前已经展示过了 + * */ protected ThreadPoolExecutor fillPoolAndRegister(String threadPoolId, ThreadPoolExecutor executor) { + //封装线程池Id,命名空间,项目Id信息 Map queryStrMap = new HashMap<>(INITIAL_CAPACITY); queryStrMap.put(TP_ID, threadPoolId); queryStrMap.put(ITEM_ID, properties.getItemId()); queryStrMap.put(NAMESPACE, properties.getNamespace()); + //创建封装线程池参数信息的对象 ThreadPoolParameterInfo threadPoolParameterInfo = new ThreadPoolParameterInfo(); + //下面就是首先访问服务端,看看服务端是否存在动态线程池的配置信息的操作,如果存在就是用服务端的信息刷新本地动态线程池的配置信息 try { + //这里做了一个访问服务端的操作,这是因为也许用户通过web界面,已经实现在服务端定义好了线程池的配置信息 + //所以要以服务端的配置信息为主,因此在这里先访问服务端,看看服务端有没有设置好的动态线程池信息,其实就是去服务端查询数据库而已 + //这里访问的就是服务端的ConfigController类的detailConfigInfo方法 + //Constants.CONFIG_CONTROLLER_PATH就是要访问的服务端的接口,路径为"/hippo4j/v1/cs/configs" Result result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, HTTP_EXECUTE_TIMEOUT); + //判断返回的结果中是否存在最新的线程池配置信息 if (result.isSuccess() && result.getData() != null) { String resultJsonStr = JSONUtil.toJSONString(result.getData()); + //如果存在就获取信息,然后转换成threadPoolParameterInfo对象 threadPoolParameterInfo = JSONUtil.parseObject(resultJsonStr, ThreadPoolParameterInfo.class); if (threadPoolParameterInfo != null) { + //在这里刷新本地动态线程池的信息 threadPoolParamReplace(executor, threadPoolParameterInfo); registerNotifyAlarm(threadPoolParameterInfo); } } else { // DynamicThreadPool configuration undefined in server + //下面就是第一次把动态线程池注册到服务端的操作 + //如果走到这里就意味着服务端没有当前动态线程池的任何信息,那就要在客户端构建一个DynamicThreadPoolRegisterWrapper对象,然后把这个对象直接发送给服务端,进行注册即可 + //这里创建的这个DynamicThreadPoolRegisterParameter对象封装了动态线程池的核心参数信息 DynamicThreadPoolRegisterParameter parameterInfo = DynamicThreadPoolRegisterParameter.builder() .threadPoolId(threadPoolId) .corePoolSize(executor.getCorePoolSize()) @@ -158,9 +176,14 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { .executeTimeOut(EXECUTE_TIME_OUT) .rejectedPolicyType(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(executor.getRejectedExecutionHandler().getClass().getSimpleName())) .build(); + //在这里创建了DynamicThreadPoolRegisterWrapper对象,并且把刚才创建的parameterInfo交给registerWrapper对象, + // 这个registerWrapper对象要发送给服务端进行注册 DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder() .parameter(parameterInfo) .build(); + //将线程池信息注册到服务端,这里是通过线程池全局管理器来注册的 + //还记得我之前在展示GlobalThreadPoolManage代码的时候,让大家对dynamicRegister方法混个眼熟,这里就用到了dynamicRegister方法 + //开始真正把客户端线程池信息注册到服务端了 GlobalThreadPoolManage.dynamicRegister(registerWrapper); } } catch (Exception ex) { diff --git a/threadpool/core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java b/threadpool/core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java index 596cfd0b..f2691510 100644 --- a/threadpool/core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java +++ b/threadpool/core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java @@ -59,6 +59,7 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl /** * Wait for tasks to complete on shutdown + * //关闭线程池时是否等待任务执行完毕 */ @Getter @Setter @@ -102,6 +103,9 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ + //构造方法 + //这个构造方法中有两个参数,一个是executeTimeOut,这个就是用户设置的任务执行的超时时间,这个参数该怎么发挥作用呢? + //另一个是awaitTerminationMillis,代表线程池关闭时,等待剩余任务执行的最大时间,那这个参数又该怎么发挥作用呢? public DynamicThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, @@ -110,13 +114,19 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl @NonNull String threadPoolId, @NonNull ThreadFactory threadFactory, @NonNull RejectedExecutionHandler rejectedExecutionHandler) { + // 可以看到向父类构造方法传入了一个 threadPoolId 参数,threadPoolId 就是动态线程池的 Id。 + // 每一个动态线程池都有自己的 Id,用来和其他动态线程池做区分 super( + // 这里给ExtensibleThreadPoolExecutor 的构造方法传入了一个 DefaultThreadPoolPluginManager 参数 threadPoolId, new DefaultThreadPoolPluginManager().setPluginComparator(AnnotationAwareOrderComparator.INSTANCE), corePoolSize, maximumPoolSize, keepAliveTime, unit, blockingQueue, threadFactory, rejectedExecutionHandler); log.info("Initializing ExecutorService '{}'", threadPoolId); this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown; // Init default plugins. + //在这里创建了默认的插件管理器DefaultThreadPoolPluginManager对象 + //注意,这里有一行非常重要的代码,这里的操作非常重要,就是在这里,把所有插件都注册到当前动态线程池对象的插件管理器中了 + //这时候,executeTimeOut和awaitTerminationMillis参数也就可以发挥作用了 new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis) .doRegister(this); this.active = new AtomicBoolean(true); @@ -135,6 +145,7 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl /** * Invoked by the containing {@code BeanFactory} on destruction of a bean. */ + //销毁动态线程池的方法 @Override public void destroy() { // instance has been destroyed, not need to call this method again @@ -142,11 +153,13 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl log.warn("Failed to destroy ExecutorService '{}' because it has already been destroyed", getThreadPoolId()); return; } + //如果等待任务执行完毕再终止线程池工作,就调用shutdown方法 if (isWaitForTasksToCompleteOnShutdown()) { super.shutdown(); } else { super.shutdownNow(); } + //在这里清空插件管理器中的插件 getThreadPoolPluginManager().clear(); log.info("ExecutorService '{}' has been destroyed", getThreadPoolId()); diff --git a/threadpool/core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java b/threadpool/core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java index 1cedb30a..7bb7ff3f 100644 --- a/threadpool/core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java +++ b/threadpool/core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java @@ -50,17 +50,21 @@ import java.util.concurrent.TimeUnit; * * @see ThreadPoolPluginManager * @see ThreadPoolPlugin + * 扩展线程池类,这个类为jdk的原生线程池提供了非常多的扩展点,基本上每一个重要操作都提供了拓展点 + * 插件体系 */ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements ThreadPoolPluginSupport { /** * Thread pool id + * 线程池Id */ @Getter private final String threadPoolId; /** * Action aware registry + * 插件管理器对象,当前线程池用到的所有插件都会注册到这个管理器中 */ @Getter private final ThreadPoolPluginManager threadPoolPluginManager; @@ -108,6 +112,7 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements @NonNull RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); // pool extended info. + //给线程池Id赋值 this.threadPoolId = threadPoolId; this.threadPoolPluginManager = threadPoolPluginManager; // proxy handler to support callback, repeated packaging of the same rejection policy should be avoided here. @@ -122,6 +127,7 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements * * @param thread the thread that will run task {@code r} * @param runnable the task that will be executed + * 执行任务之前的回调 */ @Override protected void beforeExecute(Thread thread, Runnable runnable) { @@ -158,6 +164,7 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements * @param runnable the runnable that has completed * @param throwable the exception that caused termination, or null if * execution completed normally + * 执行任务之后的回调 */ @Override protected void afterExecute(Runnable runnable, Throwable throwable) { diff --git a/threadpool/core/src/main/java/cn/hippo4j/core/executor/manage/GlobalThreadPoolManage.java b/threadpool/core/src/main/java/cn/hippo4j/core/executor/manage/GlobalThreadPoolManage.java index 40820e14..9f728786 100644 --- a/threadpool/core/src/main/java/cn/hippo4j/core/executor/manage/GlobalThreadPoolManage.java +++ b/threadpool/core/src/main/java/cn/hippo4j/core/executor/manage/GlobalThreadPoolManage.java @@ -34,7 +34,10 @@ public class GlobalThreadPoolManage { * @param registerWrapper register wrapper */ public static ThreadPoolExecutor dynamicRegister(DynamicThreadPoolRegisterWrapper registerWrapper) { + //这里从ApplicationContext中得到了DynamicThreadPoolService对象 + //ApplicationContextHolder其实就是SpringBoot的ApplicationContext DynamicThreadPoolService dynamicThreadPoolService = ApplicationContextHolder.getBean(DynamicThreadPoolService.class); + //DynamicThreadPoolService对象把动态线程池信息注册到服务端 return dynamicThreadPoolService.registerDynamicThreadPool(registerWrapper); } } diff --git a/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/manager/DefaultThreadPoolPluginManager.java b/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/manager/DefaultThreadPoolPluginManager.java index 319e7ef5..920d5b8c 100644 --- a/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/manager/DefaultThreadPoolPluginManager.java +++ b/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/manager/DefaultThreadPoolPluginManager.java @@ -93,6 +93,7 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager { /** * All registered {@link ThreadPoolPlugin} + * 线程池的插件被注册到registeredPlugins这里 */ private final Map registeredPlugins = new ConcurrentHashMap<>(16); diff --git a/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/manager/DefaultThreadPoolPluginRegistrar.java b/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/manager/DefaultThreadPoolPluginRegistrar.java index 3bfc911c..a0c09f8e 100644 --- a/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/manager/DefaultThreadPoolPluginRegistrar.java +++ b/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/manager/DefaultThreadPoolPluginRegistrar.java @@ -35,17 +35,22 @@ import lombok.NoArgsConstructor; * @see TaskRejectNotifyAlarmPlugin * @see ThreadPoolExecutorShutdownPlugin */ +// 默认的线程池插件注册器组件,通过这个注册器对象,把线程池的所有插件都注册到线程池的插件管理器中 +// DefaultThreadPoolPluginManager是一个线程池插件管理器,它的作用是管理线程池的插件 +// 这个组件在DynamicThreadPoolExecutor对象的构造方法中被创建,然后开始将插件注册到插件管理器中 @NoArgsConstructor @AllArgsConstructor public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistrar { /** * Execute time out + * //这个就是用户设置的任务执行的超时时间 */ private long executeTimeOut; /** * Await termination millis + * //线程池关闭时,等待剩余任务执行的最大时间 */ private long awaitTerminationMillis; @@ -54,12 +59,17 @@ public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistr * * @param support thread pool plugin manager delegate */ + //这个就是把动态线程池各个内置的功能扩展插件注册到动态线程池的内部的插件管理器成员变量中的方法 @Override public void doRegister(ThreadPoolPluginSupport support) { + //这里调用了DynamicThreadPoolExecutor对象的register方法,但是在我们目前的程序中 + //动态线程池中根本没有register方法,所以还需要在动态线程池中再添加一个register方法 + //具体实现我就写在当前这个代码块中了,请大家继续向下看 support.register(new TaskDecoratorPlugin()); support.register(new TaskTimeoutNotifyAlarmPlugin(support.getThreadPoolId(), executeTimeOut, support.getThreadPoolExecutor())); support.register(new TaskRejectCountRecordPlugin()); support.register(new TaskRejectNotifyAlarmPlugin()); + //awaitTerminationMillis就是等待剩余任务执行的最大时间,这两个参数是怎么被赋值的呢? support.register(new ThreadPoolExecutorShutdownPlugin(awaitTerminationMillis)); } } diff --git a/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/manager/ThreadPoolPluginSupport.java b/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/manager/ThreadPoolPluginSupport.java index 89818bb2..cd763a74 100644 --- a/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/manager/ThreadPoolPluginSupport.java +++ b/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/manager/ThreadPoolPluginSupport.java @@ -73,6 +73,10 @@ public interface ThreadPoolPluginSupport extends ThreadPoolPluginManager { */ @Override default void register(ThreadPoolPlugin plugin) { + //动态线程池持有了插件管理器,这个插件管理器是定义在ExtensibleThreadPoolExecutor类中的 + //并且使用了lombok的@Getter注解,所以就可以直接调用getThreadPoolPluginManager()方法得到插件管理器 + //然后把插件注册到插件管理器中即可 + // ThreadPoolPluginManager 是DynamicThreadPoolExecutor 构造器中传入了一个DefaultThreadPoolPluginManager getThreadPoolPluginManager().register(plugin); } diff --git a/threadpool/core/src/main/java/cn/hippo4j/core/executor/support/AbstractBuildThreadPoolTemplate.java b/threadpool/core/src/main/java/cn/hippo4j/core/executor/support/AbstractBuildThreadPoolTemplate.java index c7f8df03..e8239837 100644 --- a/threadpool/core/src/main/java/cn/hippo4j/core/executor/support/AbstractBuildThreadPoolTemplate.java +++ b/threadpool/core/src/main/java/cn/hippo4j/core/executor/support/AbstractBuildThreadPoolTemplate.java @@ -85,6 +85,7 @@ public class AbstractBuildThreadPoolTemplate { * * @param initParam init param * @return dynamic monitor thread-pool + * 创建动态线程池的方法 */ public static DynamicThreadPoolExecutor buildDynamicPool(ThreadPoolInitParam initParam) { Assert.notNull(initParam); @@ -105,6 +106,7 @@ public class AbstractBuildThreadPoolTemplate { } catch (IllegalArgumentException ex) { throw new IllegalArgumentException(String.format("Error creating thread pool parameter. threadPool id: %s", initParam.getThreadPoolId()), ex); } + //在这里设置了任务装饰器 dynamicThreadPoolExecutor.setTaskDecorator(initParam.getTaskDecorator()); dynamicThreadPoolExecutor.allowCoreThreadTimeOut(initParam.allowCoreThreadTimeOut); return dynamicThreadPoolExecutor; @@ -112,6 +114,7 @@ public class AbstractBuildThreadPoolTemplate { /** * Thread-pool init param. + * 这个内部类的对象封装了线程池的核心参数,最后就是用它来创建线程池的 */ @Data @Accessors(chain = true) diff --git a/threadpool/core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolBuilder.java b/threadpool/core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolBuilder.java index 3d432362..830e5549 100644 --- a/threadpool/core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolBuilder.java +++ b/threadpool/core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolBuilder.java @@ -35,14 +35,27 @@ import java.util.concurrent.TimeUnit; */ public class ThreadPoolBuilder implements Builder { + //这个表示线程池构建器是否要创建快速线程池,所谓快速线程池 + //其实就是使用的任务队列特殊一点,使用了TaskQueue任务队列 + //这个队列有一个特点,那就是只要当提交给任务队列的数量大于线程池当前线程的数量了 + //但是当前线程数量仍然小于线程池的最大线程数量,这时候就可以不把任务添加到队列中 + //而是让线程池直接创建线程执行任务,所谓快速,就快速在这里。实际上在hippo4j框架中 + //不仅提供了动态线程池,还提供了FastThreadPoolExecutor快速线程池,最后还提供了普通线程池,也就是原生的ThreadPoolExecutor。 + //这三种线程池想创建哪一种就创建哪一种,只不过只有DynamicThreadPoolExecutor动态线程池会被注册到服务单,并且可以动态更新配置信息 + //快速线程池和普通线程池我就不在文章中展示了,反正所有线程池的创建都在ThreadPoolBuilder体系下,大家掌握了DynamicThreadPoolExecutor是如何创建的 + //其他的也就都掌握了 private boolean isFastPool; + //表示这个线程池构建器是否要创建动态线程池 private boolean isDynamicPool; + //默认的线程池的核心线程数量,这个数量是根据CPU核心数量计算出来的 private int corePoolSize = calculateCoreNum(); + //默认的池最大线程数量 private int maximumPoolSize = corePoolSize + (corePoolSize >> 1); + //默认存活时间 private long keepAliveTime = 30000L; private TimeUnit timeUnit = TimeUnit.MILLISECONDS; @@ -418,6 +431,11 @@ public class ThreadPoolBuilder implements Builder { * @return dynamic thread-pool executor */ private static ThreadPoolExecutor buildDynamicPool(ThreadPoolBuilder builder) { + //AbstractBuildThreadPoolTemplate.buildDynamicPool()方法中又调用了当前构建器对象的buildInitParam(builder)方法 + //buildInitParam(builder)方法会得到一个ThreadPoolInitParam线程池初始化参数对象,该对象封装了线程池需要的配置参数 + //线程池构建器模板就会使用这个参数对象创建动态线程池 + //而线程池构建器模板就是AbstractBuildThreadPoolTemplate,至于为什么是抽象的,是因为这个模板不仅可以创建动态线程池 + //还可以创建动态线程池,还有普通线程池,这个就不再过多解释了 return AbstractBuildThreadPoolTemplate.buildDynamicPool(buildInitParam(builder)); } @@ -428,13 +446,19 @@ public class ThreadPoolBuilder implements Builder { * @return thread-pool init param */ private static AbstractBuildThreadPoolTemplate.ThreadPoolInitParam buildInitParam(ThreadPoolBuilder builder) { + //定义一个ThreadPoolInitParam对象 AbstractBuildThreadPoolTemplate.ThreadPoolInitParam initParam; if (builder.threadFactory == null) { + //判断线程工厂是否为空,如果线程工厂为空,就判断线程池中线程前缀是否为空 + //如果线程前缀不为空,则直接创建ThreadPoolInitParam对象即可,这里创建ThreadPoolInitParam对象的过程中,其实就把ThreadPoolInitParam + //对象内部的ThreadFactory成员变量创建成功了 Assert.notEmpty(builder.threadNamePrefix, "The thread name prefix cannot be empty or an empty string."); initParam = new AbstractBuildThreadPoolTemplate.ThreadPoolInitParam(builder.threadNamePrefix, builder.isDaemon); } else { + //如果线程工厂不为空,直接创建ThreadPoolInitParam对象即可 initParam = new AbstractBuildThreadPoolTemplate.ThreadPoolInitParam(builder.threadFactory); } + //接下来就要使用刚才得到的构造器对象给initParam中的其他成员变量赋值即可 initParam.setCorePoolNum(builder.corePoolSize) .setMaximumPoolSize(builder.maximumPoolSize) .setKeepAliveTime(builder.keepAliveTime) @@ -444,10 +468,15 @@ public class ThreadPoolBuilder implements Builder { .setTimeUnit(builder.timeUnit) .setAllowCoreThreadTimeOut(builder.allowCoreThreadTimeOut) .setTaskDecorator(builder.taskDecorator); + //判断用户要创建的是什么线程池 if (builder.isDynamicPool) { + //这里创建的就是动态线程池得到线程池Id String threadPoolId = Optional.ofNullable(builder.threadPoolId).orElse(builder.threadNamePrefix); + //设置线程池Id initParam.setThreadPoolId(threadPoolId); + //设置线程池关闭时是否等待正在执行的任务执行完毕 initParam.setWaitForTasksToCompleteOnShutdown(builder.waitForTasksToCompleteOnShutdown); + //设置线程池关闭时,等待剩余任务执行的最大时间 initParam.setAwaitTerminationMillis(builder.awaitTerminationMillis); } if (!builder.isFastPool) { diff --git a/threadpool/core/src/main/java/cn/hippo4j/core/executor/support/adpter/DynamicThreadPoolAdapterChoose.java b/threadpool/core/src/main/java/cn/hippo4j/core/executor/support/adpter/DynamicThreadPoolAdapterChoose.java index b4f75716..0e2670ae 100644 --- a/threadpool/core/src/main/java/cn/hippo4j/core/executor/support/adpter/DynamicThreadPoolAdapterChoose.java +++ b/threadpool/core/src/main/java/cn/hippo4j/core/executor/support/adpter/DynamicThreadPoolAdapterChoose.java @@ -30,11 +30,14 @@ import java.util.concurrent.ThreadPoolExecutor; /** * Dynamic thread pool adapter choose. + * 动态线程池适配器选择类 */ public class DynamicThreadPoolAdapterChoose { + // 存储了所有执行器适配器对象的集合 private static final List DYNAMIC_THREAD_POOL_ADAPTERS = new ArrayList<>(); + // 添加三个动态线程池适配器对象 static { DYNAMIC_THREAD_POOL_ADAPTERS.add(new TransmittableThreadLocalExecutorAdapter()); DYNAMIC_THREAD_POOL_ADAPTERS.add(new TransmittableThreadLocalExecutorServiceAdapter()); @@ -49,6 +52,7 @@ public class DynamicThreadPoolAdapterChoose { * @param executor objects where there may be instances * of dynamic thread pools * @return matching results + * //匹配执行器的适配器对象 */ public static boolean match(Object executor) { return DYNAMIC_THREAD_POOL_ADAPTERS.stream().anyMatch(each -> each.match(executor)); @@ -60,6 +64,7 @@ public class DynamicThreadPoolAdapterChoose { * @param executor objects where there may be instances * of dynamic thread pools * @return get the real dynamic thread pool instance + * //使用执行器的适配器对象得到执行器中的动态线程池 */ public static ThreadPoolExecutor unwrap(Object executor) { Optional dynamicThreadPoolAdapterOptional = DYNAMIC_THREAD_POOL_ADAPTERS.stream().filter(each -> each.match(executor)).findFirst(); @@ -73,6 +78,7 @@ public class DynamicThreadPoolAdapterChoose { * @param executor objects where there may be instances * of dynamic thread pools * @param dynamicThreadPoolExecutor dynamic thread-pool executor + * //使用dynamicThreadPoolExecutor替代executor中的线程池成员变量 */ public static void replace(Object executor, Executor dynamicThreadPoolExecutor) { Optional dynamicThreadPoolAdapterOptional = DYNAMIC_THREAD_POOL_ADAPTERS.stream().filter(each -> each.match(executor)).findFirst(); diff --git a/threadpool/core/src/test/java/cn/hippo4j/core/executor/plugin/ThreadPoolPluginTest.java b/threadpool/core/src/test/java/cn/hippo4j/core/executor/plugin/ThreadPoolPluginTest.java index d0c2968b..ff8574da 100644 --- a/threadpool/core/src/test/java/cn/hippo4j/core/executor/plugin/ThreadPoolPluginTest.java +++ b/threadpool/core/src/test/java/cn/hippo4j/core/executor/plugin/ThreadPoolPluginTest.java @@ -22,9 +22,11 @@ import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; import cn.hippo4j.core.executor.plugin.*; import cn.hippo4j.core.executor.plugin.manager.DefaultThreadPoolPluginManager; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.junit.Assert; import org.junit.Test; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -33,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger; /** * test for default method of {@link ThreadPoolPlugin} and it's subclass */ +@Slf4j public class ThreadPoolPluginTest { @Test @@ -88,6 +91,16 @@ public class ThreadPoolPluginTest { private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin { private final String id = this.getClass().getSimpleName(); + + @Override + public void beforeShutdown(ThreadPoolExecutor executor) { + log.info("before shutdown", id); + } + + @Override + public void afterShutdown(ThreadPoolExecutor executor, List remainingTasks) { + log.info("after shutdown", id); + } } }