Registered thread pool defaults to specify expiration time and thread factory (#976)

pull/983/head
chen.ma 2 years ago
parent a124f6ee27
commit 33b08102d7

@ -25,6 +25,8 @@ import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import java.util.concurrent.ThreadFactory;
/** /**
* Dynamic thread-pool register parameter. * Dynamic thread-pool register parameter.
*/ */
@ -101,6 +103,11 @@ public class DynamicThreadPoolRegisterParameter {
*/ */
private String threadNamePrefix; private String threadNamePrefix;
/**
* Thread factory
*/
private ThreadFactory threadFactory;
/** /**
* Execute timeout * Execute timeout
*/ */

@ -43,6 +43,7 @@ public abstract class AbstractDynamicThreadPoolService implements DynamicThreadP
.maxPoolNum(registerParameter.getMaximumPoolSize()) .maxPoolNum(registerParameter.getMaximumPoolSize())
.workQueue(BlockingQueueTypeEnum.createBlockingQueue(registerParameter.getBlockingQueueType().getType(), registerParameter.getCapacity())) .workQueue(BlockingQueueTypeEnum.createBlockingQueue(registerParameter.getBlockingQueueType().getType(), registerParameter.getCapacity()))
.threadFactory(registerParameter.getThreadNamePrefix()) .threadFactory(registerParameter.getThreadNamePrefix())
.threadFactory(registerParameter.getThreadFactory())
.keepAliveTime(registerParameter.getKeepAliveTime(), TimeUnit.SECONDS) .keepAliveTime(registerParameter.getKeepAliveTime(), TimeUnit.SECONDS)
.executeTimeOut(registerParameter.getExecuteTimeOut()) .executeTimeOut(registerParameter.getExecuteTimeOut())
.rejected(RejectedPolicyTypeEnum.createPolicy(registerParameter.getRejectedPolicyType().getType())) .rejected(RejectedPolicyTypeEnum.createPolicy(registerParameter.getRejectedPolicyType().getType()))

@ -67,10 +67,10 @@ public class TaskTimeRecordPluginTest {
Assert.assertTrue(summary.getMaxTaskTimeMillis() > 0L); Assert.assertTrue(summary.getMaxTaskTimeMillis() > 0L);
Assert.assertTrue(summary.getAvgTaskTimeMillis() > 0L); Assert.assertTrue(summary.getAvgTaskTimeMillis() > 0L);
Assert.assertTrue(summary.getTotalTaskTimeMillis() > 0L); Assert.assertTrue(summary.getTotalTaskTimeMillis() > 0L);
//Assert.assertTrue(testInDeviation(summary.getMinTaskTimeMillis(), 1000L, 300L)); // Assert.assertTrue(testInDeviation(summary.getMinTaskTimeMillis(), 1000L, 300L));
//Assert.assertTrue(testInDeviation(summary.getMaxTaskTimeMillis(), 3000L, 300L)); // Assert.assertTrue(testInDeviation(summary.getMaxTaskTimeMillis(), 3000L, 300L));
//Assert.assertTrue(testInDeviation(summary.getAvgTaskTimeMillis(), 2000L, 300L)); // Assert.assertTrue(testInDeviation(summary.getAvgTaskTimeMillis(), 2000L, 300L));
//Assert.assertTrue(testInDeviation(summary.getTotalTaskTimeMillis(), 8000L, 300L)); // Assert.assertTrue(testInDeviation(summary.getTotalTaskTimeMillis(), 8000L, 300L));
} }
private boolean testInDeviation(long except, long actual, long offer) { private boolean testInDeviation(long except, long actual, long offer) {

@ -67,7 +67,7 @@ public class ConfigServletInner {
* This problem belongs to an extremely individual scenario. Since it cannot be reproduced, so first solve the problem in this way. * This problem belongs to an extremely individual scenario. Since it cannot be reproduced, so first solve the problem in this way.
* *
* @param request http servlet request * @param request http servlet request
* @return * @return weight verification
*/ */
private boolean weightVerification(HttpServletRequest request) { private boolean weightVerification(HttpServletRequest request) {
String clientIdentify = request.getParameter(WEIGHT_CONFIGS); String clientIdentify = request.getParameter(WEIGHT_CONFIGS);

@ -83,6 +83,9 @@ public class LongPollingService {
}); });
} }
/**
* Stat task.
*/
class StatTask implements Runnable { class StatTask implements Runnable {
@Override @Override
@ -93,6 +96,9 @@ public class LongPollingService {
final Queue<ClientLongPolling> allSubs; final Queue<ClientLongPolling> allSubs;
/**
* Data change task.
*/
class DataChangeTask implements Runnable { class DataChangeTask implements Runnable {
final String identify; final String identify;
@ -107,8 +113,8 @@ public class LongPollingService {
@Override @Override
public void run() { public void run() {
try { try {
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) { for (Iterator<ClientLongPolling> iterator = allSubs.iterator(); iterator.hasNext();) {
ClientLongPolling clientSub = iter.next(); ClientLongPolling clientSub = iterator.next();
String identity = groupKey + GROUP_KEY_DELIMITER + identify; String identity = groupKey + GROUP_KEY_DELIMITER + identify;
List<String> parseMapForFilter = CollectionUtil.newArrayList(identity); List<String> parseMapForFilter = CollectionUtil.newArrayList(identity);
if (StringUtil.isBlank(identify)) { if (StringUtil.isBlank(identify)) {
@ -118,7 +124,7 @@ public class LongPollingService {
if (clientSub.clientMd5Map.containsKey(each)) { if (clientSub.clientMd5Map.containsKey(each)) {
getRetainIps().put(clientSub.clientIdentify, System.currentTimeMillis()); getRetainIps().put(clientSub.clientIdentify, System.currentTimeMillis());
ConfigCacheService.updateMd5(each, clientSub.clientIdentify, ConfigCacheService.getContentMd5(each)); ConfigCacheService.updateMd5(each, clientSub.clientIdentify, ConfigCacheService.getContentMd5(each));
iter.remove(); iterator.remove();
clientSub.sendResponse(Collections.singletonList(groupKey)); clientSub.sendResponse(Collections.singletonList(groupKey));
} }
}); });
@ -132,10 +138,10 @@ public class LongPollingService {
/** /**
* Add long polling client. * Add long polling client.
* *
* @param req * @param req http servlet request
* @param rsp * @param rsp http servlet response
* @param clientMd5Map * @param clientMd5Map client md5 map
* @param probeRequestSize * @param probeRequestSize probe request size
*/ */
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map, public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) { int probeRequestSize) {
@ -236,7 +242,7 @@ public class LongPollingService {
/** /**
* Generate async response. * Generate async response.
* *
* @param changedGroups Changed thread pool group key * @param changedGroups changed thread pool group key
*/ */
private void generateResponse(List<String> changedGroups) { private void generateResponse(List<String> changedGroups) {
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse(); HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
@ -265,6 +271,11 @@ public class LongPollingService {
} }
} }
/**
* Get retain ips.
*
* @return retain ips
*/
public Map<String, Long> getRetainIps() { public Map<String, Long> getRetainIps() {
return retainIps; return retainIps;
} }
@ -294,7 +305,7 @@ public class LongPollingService {
* Build resp str. * Build resp str.
* *
* @param changedGroups Changed thread pool group key * @param changedGroups Changed thread pool group key
* @return * @return resp str
*/ */
@SneakyThrows @SneakyThrows
private String buildRespStr(List<String> changedGroups) { private String buildRespStr(List<String> changedGroups) {
@ -305,8 +316,8 @@ public class LongPollingService {
/** /**
* Is support long polling. * Is support long polling.
* *
* @param request * @param request http servlet request
* @return * @return is support long polling
*/ */
public static boolean isSupportLongPolling(HttpServletRequest request) { public static boolean isSupportLongPolling(HttpServletRequest request) {
return request.getHeader(LONG_POLLING_HEADER) != null; return request.getHeader(LONG_POLLING_HEADER) != null;
@ -315,7 +326,7 @@ public class LongPollingService {
/** /**
* Is fixed polling. * Is fixed polling.
* *
* @return * @return is fixed polling
*/ */
private static boolean isFixedPolling() { private static boolean isFixedPolling() {
return SwitchService.getSwitchBoolean(SwitchService.FIXED_POLLING, false); return SwitchService.getSwitchBoolean(SwitchService.FIXED_POLLING, false);
@ -324,7 +335,7 @@ public class LongPollingService {
/** /**
* Get fixed polling interval. * Get fixed polling interval.
* *
* @return * @return fixed polling interval
*/ */
private static int getFixedPollingInterval() { private static int getFixedPollingInterval() {
return SwitchService.getSwitchInteger(SwitchService.FIXED_POLLING_INTERVAL, FIXED_POLLING_INTERVAL_MS); return SwitchService.getSwitchInteger(SwitchService.FIXED_POLLING_INTERVAL, FIXED_POLLING_INTERVAL_MS);

@ -148,11 +148,13 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.maximumPoolSize(executor.getMaximumPoolSize()) .maximumPoolSize(executor.getMaximumPoolSize())
.blockingQueueType(BlockingQueueTypeEnum.getBlockingQueueTypeEnumByName(executor.getQueue().getClass().getSimpleName())) .blockingQueueType(BlockingQueueTypeEnum.getBlockingQueueTypeEnumByName(executor.getQueue().getClass().getSimpleName()))
.capacity(executor.getQueue().remainingCapacity()) .capacity(executor.getQueue().remainingCapacity())
.threadFactory(executor.getThreadFactory())
.allowCoreThreadTimeOut(executor.allowsCoreThreadTimeOut()) .allowCoreThreadTimeOut(executor.allowsCoreThreadTimeOut())
.keepAliveTime(executor.getKeepAliveTime(TimeUnit.MILLISECONDS)) .keepAliveTime(executor.getKeepAliveTime(TimeUnit.MILLISECONDS))
.isAlarm(false) .isAlarm(false)
.activeAlarm(80) .activeAlarm(80)
.capacityAlarm(80) .capacityAlarm(80)
.executeTimeOut(10000L)
.rejectedPolicyType(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(executor.getRejectedExecutionHandler().getClass().getSimpleName())) .rejectedPolicyType(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(executor.getRejectedExecutionHandler().getClass().getSimpleName()))
.build(); .build();
DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder() DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder()

Loading…
Cancel
Save