feat: 功能持续更新.

pull/161/head
chen.ma 4 years ago
parent e21ac470b0
commit e02e528a88

@ -1,4 +1,4 @@
package io.dynamic.threadpool.starter.core;
package io.dynamic.threadpool.starter.banner;
import org.springframework.boot.ansi.AnsiColor;
import org.springframework.boot.ansi.AnsiOutput;
@ -14,25 +14,27 @@ public class DynamicThreadPoolBanner {
private static final String DYNAMIC_THREAD_POOL = " :: Dynamic ThreadPool :: ";
private static final int STRAP_LINE_SIZE = 42;
private static final int STRAP_LINE_SIZE = 50;
public static void printBanner() {
public static void printBanner(boolean isBanner) {
String banner = "\n___ _ _____ ___ \n" +
"| \\ _ _ _ _ __ _ _ __ (_)__ |_ _| _ \\\n" +
"| |) | || | ' \\/ _` | ' \\| / _| | | | _/\n" +
"|___/ \\_, |_||_\\__,_|_|_|_|_\\__| |_| |_| \n" +
" |__/ \n";
String version = getVersion();
version = (version != null) ? " (v" + version + ")" : "no version.";
if (isBanner) {
String version = getVersion();
version = (version != null) ? " (v" + version + ")" : "no version.";
StringBuilder padding = new StringBuilder();
while (padding.length() < STRAP_LINE_SIZE - (version.length() + DYNAMIC_THREAD_POOL.length())) {
padding.append(" ");
}
StringBuilder padding = new StringBuilder();
while (padding.length() < STRAP_LINE_SIZE - (version.length() + DYNAMIC_THREAD_POOL.length())) {
padding.append(" ");
}
System.out.println(AnsiOutput.toString(banner, AnsiColor.GREEN, DYNAMIC_THREAD_POOL, AnsiColor.DEFAULT,
padding.toString(), AnsiStyle.FAINT, version, "\n"));
System.out.println(AnsiOutput.toString(banner, AnsiColor.GREEN, DYNAMIC_THREAD_POOL, AnsiColor.DEFAULT,
padding.toString(), AnsiStyle.FAINT, version, "\n"));
}
}
public static String getVersion() {

@ -37,5 +37,10 @@ public class DynamicThreadPoolProperties {
/**
* 线
*/
private String enabled;
private boolean enabled;
/**
* banner
*/
private boolean banner;
}

@ -10,7 +10,7 @@ import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* CacheData.
* Cache Data.
*
* @author chen.ma
* @date 2021/6/22 20:46

@ -9,6 +9,7 @@ import io.dynamic.threadpool.common.toolkit.GroupKey;
import io.dynamic.threadpool.common.web.base.Result;
import io.dynamic.threadpool.starter.core.CacheData;
import io.dynamic.threadpool.starter.remote.HttpAgent;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
@ -98,12 +99,19 @@ public class ClientWorker {
class LongPollingRunnable implements Runnable {
@Override
@SneakyThrows
public void run() {
List<CacheData> cacheDataList = new ArrayList();
List<CacheData> queryCacheDataList = cacheMap.entrySet()
.stream().map(each -> each.getValue()).collect(Collectors.toList());
List<String> changedTpIds = checkUpdateDataIds(queryCacheDataList);
List<String> changedTpIds = null;
try {
changedTpIds = checkUpdateDataIds(queryCacheDataList);
} catch (Exception ex) {
log.error("[Long polling] Error. exception message :: {}, Thread sleep 30 s.", ex.getMessage(), ex);
Thread.sleep(30000);
}
if (!CollectionUtils.isEmpty(changedTpIds)) {
log.info("[dynamic threadPool] tpIds changed :: {}", changedTpIds);
for (String each : changedTpIds) {
@ -281,6 +289,7 @@ public class ClientWorker {
cacheData = new CacheData(namespace, itemId, tpId);
CacheData lastCacheData = cacheMap.putIfAbsent(tpId, cacheData);
if (lastCacheData == null) {
// TODO 连接不到 server 端报错
String serverConfig = getServerConfig(namespace, itemId, tpId, 3000L);
PoolParameterInfo poolInfo = JSON.parseObject(serverConfig, PoolParameterInfo.class);
cacheData.setContent(ContentUtil.getPoolContent(poolInfo));

@ -5,14 +5,15 @@ import io.dynamic.threadpool.common.config.ApplicationContextHolder;
import io.dynamic.threadpool.common.constant.Constants;
import io.dynamic.threadpool.common.model.PoolParameterInfo;
import io.dynamic.threadpool.common.web.base.Result;
import io.dynamic.threadpool.starter.banner.DynamicThreadPoolBanner;
import io.dynamic.threadpool.starter.common.CommonThreadPool;
import io.dynamic.threadpool.starter.config.DynamicThreadPoolProperties;
import io.dynamic.threadpool.starter.core.DynamicThreadPoolBanner;
import io.dynamic.threadpool.starter.core.GlobalThreadPoolManage;
import io.dynamic.threadpool.starter.remote.HttpAgent;
import io.dynamic.threadpool.starter.remote.ServerHttpAgent;
import io.dynamic.threadpool.starter.toolkit.BlockingQueueUtil;
import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.Order;
import javax.annotation.PostConstruct;
@ -28,18 +29,19 @@ import java.util.concurrent.TimeUnit;
* @author chen.ma
* @date 2021/6/20 16:34
*/
@Slf4j
public class ThreadPoolRunListener {
private final DynamicThreadPoolProperties dynamicThreadPoolProperties;
private final DynamicThreadPoolProperties properties;
public ThreadPoolRunListener(DynamicThreadPoolProperties properties) {
this.dynamicThreadPoolProperties = properties;
this.properties = properties;
}
@Order(1024)
@PostConstruct
public void run() {
DynamicThreadPoolBanner.printBanner();
DynamicThreadPoolBanner.printBanner(properties.isBanner());
Map<String, DynamicThreadPoolWrap> executorMap =
ApplicationContextHolder.getBeansOfType(DynamicThreadPoolWrap.class);
@ -47,19 +49,25 @@ public class ThreadPoolRunListener {
executorMap.forEach((key, val) -> {
Map<String, String> queryStrMap = new HashMap(3);
queryStrMap.put("tpId", val.getTpId());
queryStrMap.put("itemId", dynamicThreadPoolProperties.getItemId());
queryStrMap.put("namespace", dynamicThreadPoolProperties.getNamespace());
queryStrMap.put("itemId", properties.getItemId());
queryStrMap.put("namespace", properties.getNamespace());
PoolParameterInfo ppi = null;
HttpAgent httpAgent = new ServerHttpAgent(dynamicThreadPoolProperties);
Result result = httpAgent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 3000L);
if (result.isSuccess() && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) {
// 使用相关参数创建线程池
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue workQueue = BlockingQueueUtil.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity());
ThreadPoolExecutor resultTpe = new ThreadPoolExecutor(ppi.getCoreSize(), ppi.getMaxSize(), ppi.getKeepAliveTime(), unit, workQueue);
val.setPool(resultTpe);
} else if (val.getPool() == null) {
PoolParameterInfo ppi = new PoolParameterInfo();
HttpAgent httpAgent = new ServerHttpAgent(properties);
Result result = null;
try {
result = httpAgent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 3000L);
if (result.isSuccess() && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) {
// 使用相关参数创建线程池
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue workQueue = BlockingQueueUtil.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity());
ThreadPoolExecutor resultTpe = new ThreadPoolExecutor(ppi.getCoreSize(), ppi.getMaxSize(), ppi.getKeepAliveTime(), unit, workQueue);
val.setPool(resultTpe);
} else if (val.getPool() == null) {
val.setPool(CommonThreadPool.getInstance(val.getTpId()));
}
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
val.setPool(CommonThreadPool.getInstance(val.getTpId()));
}

@ -8,8 +8,8 @@
},
{
"name": "spring.dynamic.thread-pool.enabled",
"type": "java.lang.String",
"defaultValue": "false",
"type": "java.lang.Boolean",
"defaultValue": false,
"description": "dynamic thread-pool enabled."
},
{
@ -22,6 +22,12 @@
"name": "spring.dynamic.thread-pool.item-id",
"type": "java.lang.String",
"description": "dynamic thread-pool item-id."
},
{
"name": "spring.dynamic.thread-pool.banner",
"type": "java.lang.Boolean",
"defaultValue": true,
"description": "dynamic thread-pool banner."
}
]
}
Loading…
Cancel
Save