Support annotation subscription dynamic configuration.

pull/161/head
chen.ma 3 years ago
parent 2970ed6a8f
commit d329900599

@ -2,10 +2,7 @@ package com.github.dynamic.threadpool.starter.config;
import com.github.dynamic.threadpool.common.config.ApplicationContextHolder;
import com.github.dynamic.threadpool.starter.controller.PoolRunStateController;
import com.github.dynamic.threadpool.starter.core.ConfigService;
import com.github.dynamic.threadpool.starter.core.DynamicThreadPoolPostProcessor;
import com.github.dynamic.threadpool.starter.core.ThreadPoolConfigService;
import com.github.dynamic.threadpool.starter.core.ThreadPoolOperation;
import com.github.dynamic.threadpool.starter.core.*;
import com.github.dynamic.threadpool.starter.enable.MarkerConfiguration;
import com.github.dynamic.threadpool.starter.handler.DynamicThreadPoolBannerHandler;
import com.github.dynamic.threadpool.starter.remote.HttpAgent;
@ -57,7 +54,8 @@ public class DynamicThreadPoolAutoConfiguration {
@Bean
@SuppressWarnings("all")
public DynamicThreadPoolPostProcessor threadPoolBeanPostProcessor(HttpAgent httpAgent, ThreadPoolOperation threadPoolOperation) {
public DynamicThreadPoolPostProcessor threadPoolBeanPostProcessor(HttpAgent httpAgent, ThreadPoolOperation threadPoolOperation,
ApplicationContextHolder applicationContextHolder) {
return new DynamicThreadPoolPostProcessor(properties, httpAgent, threadPoolOperation);
}

@ -0,0 +1,18 @@
package com.github.dynamic.threadpool.starter.core;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Dynamic thread pool.
*
* @author chen.ma
* @date 2021/10/13 21:50
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DynamicThreadPool {
}

@ -1,6 +1,7 @@
package com.github.dynamic.threadpool.starter.core;
import com.alibaba.fastjson.JSON;
import com.github.dynamic.threadpool.common.config.ApplicationContextHolder;
import com.github.dynamic.threadpool.common.constant.Constants;
import com.github.dynamic.threadpool.common.model.PoolParameterInfo;
import com.github.dynamic.threadpool.common.web.base.Result;
@ -12,7 +13,9 @@ import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum;
import com.github.dynamic.threadpool.starter.toolkit.thread.RejectedTypeEnum;
import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder;
import com.github.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
@ -24,6 +27,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static com.github.dynamic.threadpool.common.constant.Constants.*;
/**
* Dynamic threadPool post processor.
*
@ -31,20 +36,14 @@ import java.util.concurrent.TimeUnit;
* @date 2021/8/2 20:40
*/
@Slf4j
@AllArgsConstructor
public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
private final BootstrapProperties properties;
private final ThreadPoolOperation threadPoolOperation;
private final HttpAgent httpAgent;
public DynamicThreadPoolPostProcessor(BootstrapProperties properties, HttpAgent httpAgent,
ThreadPoolOperation threadPoolOperation) {
this.properties = properties;
this.httpAgent = httpAgent;
this.threadPoolOperation = threadPoolOperation;
}
private final ThreadPoolOperation threadPoolOperation;
private final ExecutorService executorService = ThreadPoolBuilder.builder()
.poolThreadSize(2, 4)
@ -56,23 +55,32 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (!(bean instanceof DynamicThreadPoolWrap)) {
return bean;
if (bean instanceof CustomThreadPoolExecutor) {
var dynamicThreadPool = ApplicationContextHolder.findAnnotationOnBean(beanName, DynamicThreadPool.class);
if (Objects.isNull(dynamicThreadPool)) {
return bean;
}
var customExecutor = (CustomThreadPoolExecutor) bean;
var wrap = new DynamicThreadPoolWrap(customExecutor.getThreadPoolId(), customExecutor);
CustomThreadPoolExecutor remoteExecutor = fillPoolAndRegister(wrap);
subscribeConfig(wrap);
return remoteExecutor;
} else if (bean instanceof DynamicThreadPoolWrap) {
var wrap = (DynamicThreadPoolWrap) bean;
registerAndSubscribe(wrap);
}
DynamicThreadPoolWrap dynamicThreadPoolWrap = (DynamicThreadPoolWrap) bean;
/**
* TpId Server ,
* Server 使 ${@link CommonThreadPool#getInstance(String)}
*/
fillPoolAndRegister(dynamicThreadPoolWrap);
return bean;
}
/**
* Server , 线
*/
/**
* Register and subscribe.
*
* @param dynamicThreadPoolWrap
*/
protected void registerAndSubscribe(DynamicThreadPoolWrap dynamicThreadPoolWrap) {
fillPoolAndRegister(dynamicThreadPoolWrap);
subscribeConfig(dynamicThreadPoolWrap);
return bean;
}
/**
@ -80,14 +88,14 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
*
* @param dynamicThreadPoolWrap
*/
private void fillPoolAndRegister(DynamicThreadPoolWrap dynamicThreadPoolWrap) {
protected CustomThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrap dynamicThreadPoolWrap) {
String tpId = dynamicThreadPoolWrap.getTpId();
Map<String, String> queryStrMap = new HashMap(3);
queryStrMap.put("tpId", tpId);
queryStrMap.put("itemId", properties.getItemId());
queryStrMap.put("namespace", properties.getNamespace());
queryStrMap.put(TP_ID, tpId);
queryStrMap.put(ITEM_ID, properties.getItemId());
queryStrMap.put(NAMESPACE, properties.getNamespace());
Result result = null;
Result result;
boolean isSubscribe = false;
CustomThreadPoolExecutor poolExecutor = null;
PoolParameterInfo ppi = new PoolParameterInfo();
@ -126,9 +134,10 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
}
GlobalThreadPoolManage.register(dynamicThreadPoolWrap.getTpId(), ppi, dynamicThreadPoolWrap);
return poolExecutor;
}
private void subscribeConfig(DynamicThreadPoolWrap dynamicThreadPoolWrap) {
protected void subscribeConfig(DynamicThreadPoolWrap dynamicThreadPoolWrap) {
if (dynamicThreadPoolWrap.isSubscribeFlag()) {
threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getTpId(), executorService, config -> ThreadPoolDynamicRefresh.refreshDynamicPool(config));
}

Loading…
Cancel
Save