feat: 功能持续更新.

pull/161/head
chen.ma 4 years ago
parent 2bb07c11c3
commit 0e713a370e

@ -29,7 +29,6 @@
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
</project>

@ -0,0 +1,52 @@
package io.dynamic.threadpool.common.enums;
/**
*
*
* @author chen.ma
* @date 2021/6/25 12:30
*/
public enum QueueTypeEnum {
/**
* {@link java.util.concurrent.ArrayBlockingQueue}
*/
ARRAY_BLOCKING_QUEUE(1),
/**
* {@link java.util.concurrent.LinkedBlockingQueue}
*/
Linked_Blocking_QUEUE(2),
/**
* {@link java.util.concurrent.LinkedBlockingDeque}
*/
Linked_Blocking_Deque(3),
/**
* {@link java.util.concurrent.SynchronousQueue}
*/
SynchronousQueue(4),
/**
* {@link java.util.concurrent.LinkedTransferQueue}
*/
LINKED_TRANSFER_QUEUE(5),
/**
* {@link java.util.concurrent.PriorityBlockingQueue}
*/
PriorityBlockingQueue(6),
/**
* {@link "io.dynamic.threadpool.starter.core.ResizableCapacityLinkedBlockIngQueue"}
*/
Resizable_LINKED_Blocking_QUEUE(9);
public Integer type;
QueueTypeEnum(int type) {
this.type = type;
}
}

@ -2,14 +2,14 @@ package io.dynamic.threadpool.starter.core;
import com.alibaba.fastjson.JSON;
import io.dynamic.threadpool.common.model.PoolParameterInfo;
import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap;
import io.dynamic.threadpool.starter.toolkit.ThreadPoolChangeUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 线
* 线
*
* @author chen.ma
* @date 2021/6/20 15:51
@ -18,26 +18,23 @@ import java.util.concurrent.TimeUnit;
public class ThreadPoolDynamicRefresh {
public static void refreshDynamicPool(String content) {
log.info("[🔥] Start refreshing configuration. content :: {}", content);
PoolParameterInfo parameter = JSON.parseObject(content, PoolParameterInfo.class);
refreshDynamicPool(parameter.getTpId(), parameter.getCoreSize(), parameter.getMaxSize(), parameter.getCapacity(), parameter.getKeepAliveTime());
String tpId = parameter.getTpId();
Integer coreSize, maxSize, queueType, capacity, keepAliveTime;
log.info("[🔥] Start refreshing configuration. tpId :: {}, coreSize :: {}, maxSize :: {}, queueType :: {}, capacity :: {}, keepAliveTime :: {}",
tpId, coreSize = parameter.getCoreSize(), maxSize = parameter.getMaxSize(),
queueType = parameter.getQueueType(), capacity = parameter.getCapacity(), keepAliveTime = parameter.getKeepAliveTime());
refreshDynamicPool(tpId, coreSize, maxSize, queueType, capacity, keepAliveTime);
}
public static void refreshDynamicPool(String threadPoolId, Integer coreSize, Integer maxSize, Integer capacity, Integer keepAliveTime) {
DynamicThreadPoolWrap wrap = GlobalThreadPoolManage.getExecutorService(threadPoolId);
ThreadPoolExecutor executor = wrap.getPool();
if (coreSize != null) {
executor.setCorePoolSize(coreSize);
}
if (maxSize != null) {
executor.setMaximumPoolSize(maxSize);
}
if (capacity != null) {
ResizableCapacityLinkedBlockIngQueue queue = (ResizableCapacityLinkedBlockIngQueue) executor.getQueue();
queue.setCapacity(capacity);
}
if (keepAliveTime != null) {
executor.setKeepAliveTime(keepAliveTime, TimeUnit.SECONDS);
}
public static void refreshDynamicPool(String threadPoolId, Integer coreSize, Integer maxSize, Integer queueType, Integer capacity, Integer keepAliveTime) {
ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getPool();
log.info("[✈️] Original thread pool. coreSize :: {}, maxSize :: {}, queueType :: {}, capacity :: {}, keepAliveTime :: {}",
executor.getCorePoolSize(), executor.getMaximumPoolSize(), queueType, executor.getQueue().remainingCapacity(), executor.getKeepAliveTime(TimeUnit.MILLISECONDS));
ThreadPoolChangeUtil.changePool(executor, coreSize, maxSize, queueType, capacity, keepAliveTime);
ThreadPoolExecutor afterExecutor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getPool();
log.info("[🚀] Changed thread pool. coreSize :: {}, maxSize :: {}, queueType :: {}, capacity :: {}, keepAliveTime :: {}",
afterExecutor.getCorePoolSize(), afterExecutor.getMaximumPoolSize(), queueType, afterExecutor.getQueue().remainingCapacity(), afterExecutor.getKeepAliveTime(TimeUnit.MILLISECONDS));
}
}

@ -86,7 +86,7 @@ public class ClientWorker {
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
executorService.execute(new LongPollingRunnable(i));
executorService.execute(new LongPollingRunnable());
}
currentLongingTaskCount = longingTaskCount;
}
@ -97,12 +97,6 @@ public class ClientWorker {
*/
class LongPollingRunnable implements Runnable {
private final int taskId;
public LongPollingRunnable(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
List<CacheData> cacheDataList = new ArrayList();
@ -110,9 +104,8 @@ public class ClientWorker {
.stream().map(each -> each.getValue()).collect(Collectors.toList());
List<String> changedTpIds = checkUpdateDataIds(queryCacheDataList);
if (CollectionUtils.isEmpty(changedTpIds)) {
if (!CollectionUtils.isEmpty(changedTpIds)) {
log.info("[dynamic threadPool] tpIds changed :: {}", changedTpIds);
} else {
for (String each : changedTpIds) {
String[] keys = StrUtil.split(each, Constants.GROUP_KEY_DELIMITER);
String tpId = keys[0];

@ -1,10 +1,10 @@
package io.dynamic.threadpool.starter.toolkit;
import io.dynamic.threadpool.common.enums.QueueTypeEnum;
import io.dynamic.threadpool.starter.core.ResizableCapacityLinkedBlockIngQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.Objects;
import java.util.concurrent.*;
/**
*
@ -16,17 +16,20 @@ public class BlockingQueueUtil {
public static BlockingQueue createBlockingQueue(Integer type, Integer capacity) {
BlockingQueue blockingQueue = null;
switch (type) {
case 1:
blockingQueue = new ArrayBlockingQueue(capacity);
break;
case 2:
blockingQueue = new LinkedBlockingQueue(capacity);
break;
case 3:
blockingQueue = new ResizableCapacityLinkedBlockIngQueue(capacity);
default:
break;
if (Objects.equals(type, QueueTypeEnum.ARRAY_BLOCKING_QUEUE.type)) {
blockingQueue = new ArrayBlockingQueue(capacity);
} else if (Objects.equals(type, QueueTypeEnum.Linked_Blocking_QUEUE.type)) {
blockingQueue = new LinkedBlockingQueue(capacity);
} else if (Objects.equals(type, QueueTypeEnum.Linked_Blocking_Deque.type)) {
blockingQueue = new LinkedBlockingDeque(capacity);
} else if (Objects.equals(type, QueueTypeEnum.SynchronousQueue.type)) {
blockingQueue = new SynchronousQueue();
} else if (Objects.equals(type, QueueTypeEnum.LINKED_TRANSFER_QUEUE.type)) {
blockingQueue = new LinkedTransferQueue();
} else if (Objects.equals(type, QueueTypeEnum.PriorityBlockingQueue.type)) {
blockingQueue = new PriorityBlockingQueue(capacity);
} else if (Objects.equals(type, QueueTypeEnum.Resizable_LINKED_Blocking_QUEUE.type)) {
blockingQueue = new ResizableCapacityLinkedBlockIngQueue(capacity);
}
return blockingQueue;
}

@ -0,0 +1,33 @@
package io.dynamic.threadpool.starter.toolkit;
import io.dynamic.threadpool.common.enums.QueueTypeEnum;
import io.dynamic.threadpool.starter.core.ResizableCapacityLinkedBlockIngQueue;
import java.util.Objects;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 线
*
* @author chen.ma
* @date 2021/6/25 17:19
*/
public class ThreadPoolChangeUtil {
public static void changePool(ThreadPoolExecutor executor, Integer coreSize, Integer maxSize, Integer queueType, Integer capacity, Integer keepAliveTime) {
if (coreSize != null) {
executor.setCorePoolSize(coreSize);
}
if (maxSize != null) {
executor.setMaximumPoolSize(maxSize);
}
if (capacity != null && Objects.equals(QueueTypeEnum.Resizable_LINKED_Blocking_QUEUE.type, queueType)) {
ResizableCapacityLinkedBlockIngQueue queue = (ResizableCapacityLinkedBlockIngQueue) executor.getQueue();
queue.setCapacity(capacity);
}
if (keepAliveTime != null) {
executor.setKeepAliveTime(keepAliveTime, TimeUnit.SECONDS);
}
}
}

@ -0,0 +1,17 @@
package io.dynamic.threadpool.server.controller;
import io.dynamic.threadpool.common.constant.Constants;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
*
*
* @author chen.ma
* @date 2021/6/25 18:31
*/
@RestController
@RequestMapping(Constants.BASE_PATH)
public class BizController {
}
Loading…
Cancel
Save