1. 删掉传统配置的线程池(代码和配置信息)

2. 优化cron模块线程池相关代码
3. cron模块当读取完文件后,释放单线程池资源(后续可做通知模块)
4. 注册动态线程池/spring管理工具类
pull/6/head
3y 3 years ago
parent ecaedf8cd5
commit 9ed6b06343

@ -0,0 +1,37 @@
package com.java3y.austin.common.constant;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 线
* 使线apollo)
*
* @author 3y
*/
public class ThreadPoolConstant {
/**
* small
*/
public static final Integer SINGLE_CORE_POOL_SIZE = 1;
public static final Integer SINGLE_MAX_POOL_SIZE = 1;
public static final Integer SMALL_KEEP_LIVE_TIME = 10;
/**
* medium
*/
public static final Integer COMMON_CORE_POOL_SIZE = 2;
public static final Integer COMMON_MAX_POOL_SIZE = 2;
public static final Integer COMMON_KEEP_LIVE_TIME = 60;
public static final Integer COMMON_QUEUE_SIZE = 20;
/**
* big
*/
public static final Integer BIG_QUEUE_SIZE = 1024;
public static final BlockingQueue BIG_BLOCKING_QUEUE = new LinkedBlockingQueue(BIG_QUEUE_SIZE);
}

@ -1,49 +0,0 @@
package com.java3y.austin.cron.config;
import com.google.common.base.Throwables;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* 线@Async
* 线
*
* @author 3y
* @see TaskExecutionAutoConfiguration
*/
@Slf4j
@Configuration
@EnableAsync
@EnableConfigurationProperties(AsyncExecutionProperties.class)
public class AsyncConfiguration implements AsyncConfigurer {
@Bean("austinExecutor")
@Primary
public ThreadPoolTaskExecutor executor(AsyncExecutionProperties properties) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(properties.getCoreSize());
executor.setMaxPoolSize(properties.getMaxSize());
executor.setKeepAliveSeconds(properties.getKeepAlive());
executor.setQueueCapacity(properties.getQueueCapacity());
executor.setThreadNamePrefix(properties.getThreadNamePrefix());
executor.setRejectedExecutionHandler(properties.getRejectedHandler().getHandler());
executor.setAllowCoreThreadTimeOut(properties.isAllowCoreThreadTimeout());
executor.setWaitForTasksToCompleteOnShutdown(properties.isWaitForTasksToCompleteOnShutDown());
executor.setAwaitTerminationSeconds(properties.getAwaitTerminationSeconds());
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> log.error("austinExecutor execute fail!method:{},params:{},ex:{}", method, params, Throwables.getStackTraceAsString(ex));
}
}

@ -1,116 +0,0 @@
package com.java3y.austin.cron.config;
import lombok.Data;
import org.springframework.boot.autoconfigure.task.TaskExecutionProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import javax.annotation.PostConstruct;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @program: austin
* @description: spring 线
* @author: WhyWhatHow
* @create: 2022-02-27 09:41
* @see TaskExecutionProperties
**/
@Data
@ConfigurationProperties("austin.async.task")
public class AsyncExecutionProperties {
/**
* 线,cpu线
*/
int coreSize;
/**
* 线 ,coreSize*2
*/
int maxSize;
/**
* 线 eg: "austinAsyncExecutor-"
*/
private String threadNamePrefix = "austinAsyncExecutor-";
/**
* queue capacity
*/
private int queueCapacity = 1000;
/**
* 线,s
*/
private int keepAlive = 60;
/**
* 线
*/
private boolean allowCoreThreadTimeout = false;
/**
* ,callRun
*/
private RejectedEnum rejectedHandler = RejectedEnum.CALLRUNSPOLICY;
/**
* ,true
*/
private boolean waitForTasksToCompleteOnShutDown = true;
/**
* ,10s
*/
private int awaitTerminationSeconds = 10;
/**
* 线, 线,
*/
@PostConstruct
void init() {
if (coreSize <= 0) {
this.coreSize = Runtime.getRuntime().availableProcessors();
}
if (maxSize <= 0) {
this.maxSize = coreSize << 1;
}
}
/**
*
*/
public enum RejectedEnum {
/**
*
*/
ABORTPOLICY(new ThreadPoolExecutor.AbortPolicy()),
/**
* run_thread
*/
CALLRUNSPOLICY(new ThreadPoolExecutor.CallerRunsPolicy()),
/***
*
*/
DISCARDPOLICY(new ThreadPoolExecutor.DiscardPolicy()),
/**
*
*/
DISCARDOLDESTPOLICY(new ThreadPoolExecutor.DiscardOldestPolicy());
/**
* 线
*/
private RejectedExecutionHandler handler;
RejectedEnum(RejectedExecutionHandler handler) {
this.handler = handler;
}
public RejectedExecutionHandler getHandler() {
return handler;
}
public void setHandler(RejectedExecutionHandler handler) {
this.handler = handler;
}
}
}

@ -0,0 +1,64 @@
package com.java3y.austin.cron.config;
import cn.hutool.core.thread.ExecutorBuilder;
import com.dtp.common.em.QueueTypeEnum;
import com.dtp.common.em.RejectedTypeEnum;
import com.dtp.core.thread.DtpExecutor;
import com.dtp.core.thread.ThreadPoolBuilder;
import com.java3y.austin.common.constant.ThreadPoolConstant;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author 3y
* 线apolloapolloapollo
*/
public class CronAsyncThreadPoolConfig {
/**
* xxl-job线
*/
public static final String EXECUTE_XXL_THREAD_POOL_NAME = "execute-xxl-thread-pool";
/**
* pending线
* 线线线
* 线Springfalse
* @return
*/
public static ExecutorService getConsumePendingThreadPool() {
return ExecutorBuilder.create()
.setCorePoolSize(ThreadPoolConstant.COMMON_CORE_POOL_SIZE)
.setMaxPoolSize(ThreadPoolConstant.COMMON_MAX_POOL_SIZE)
.setWorkQueue(ThreadPoolConstant.BIG_BLOCKING_QUEUE)
.setHandler(new ThreadPoolExecutor.CallerRunsPolicy())
.setAllowCoreThreadTimeOut(true)
.setKeepAliveTime(ThreadPoolConstant.SMALL_KEEP_LIVE_TIME, TimeUnit.SECONDS)
.build();
}
/**
* xxl-job线
* 线keepAliveTime()
* 线Springtrue
*
* @return
*/
public static DtpExecutor getXxlCronExecutor() {
return ThreadPoolBuilder.newBuilder()
.threadPoolName(EXECUTE_XXL_THREAD_POOL_NAME)
.corePoolSize(ThreadPoolConstant.COMMON_CORE_POOL_SIZE)
.maximumPoolSize(ThreadPoolConstant.COMMON_MAX_POOL_SIZE)
.keepAliveTime(ThreadPoolConstant.COMMON_KEEP_LIVE_TIME)
.timeUnit(TimeUnit.SECONDS)
.rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName())
.allowCoreThreadTimeOut(false)
.workQueue(QueueTypeEnum.VARIABLE_LINKED_BLOCKING_QUEUE.getName(), ThreadPoolConstant.COMMON_QUEUE_SIZE, false)
.buildDynamic();
}
}

@ -1,8 +1,5 @@
package com.java3y.austin.cron.constants; package com.java3y.austin.cron.constants;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/** /**
* @author 3y * @author 3y
* @date 2022/2/13 * @date 2022/2/13
@ -25,12 +22,5 @@ public class PendingConstant {
*/ */
public static final Long TIME_THRESHOLD = 1000L; public static final Long TIME_THRESHOLD = 1000L;
/**
* 线
*/
public static final Integer CORE_POOL_SIZE = 2;
public static final Integer MAX_POOL_SIZE = 2;
public static final Integer KEEP_LIVE_TIME = 20;
public static final BlockingQueue BLOCKING_QUEUE = new LinkedBlockingQueue<>(5);
} }

@ -0,0 +1,25 @@
package com.java3y.austin.cron.csv;
import cn.hutool.core.text.csv.CsvRow;
import cn.hutool.core.text.csv.CsvRowHandler;
import lombok.Data;
/**
* @author 3y
* @date 2022/3/10
*
*/
@Data
public class CountFileRowHandler implements CsvRowHandler {
private long rowSize;
@Override
public void handle(CsvRow row) {
rowSize++;
}
public long getRowSize() {
return rowSize;
}
}

@ -1,6 +1,9 @@
package com.java3y.austin.cron.handler; package com.java3y.austin.cron.handler;
import com.dtp.core.thread.DtpExecutor;
import com.java3y.austin.cron.config.CronAsyncThreadPoolConfig;
import com.java3y.austin.cron.service.TaskHandler; import com.java3y.austin.cron.service.TaskHandler;
import com.java3y.austin.support.utils.ThreadPoolUtils;
import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -19,14 +22,21 @@ public class CronTaskHandler {
@Autowired @Autowired
private TaskHandler taskHandler; private TaskHandler taskHandler;
@Autowired
private ThreadPoolUtils threadPoolUtils;
private DtpExecutor dtpExecutor = CronAsyncThreadPoolConfig.getXxlCronExecutor();
/** /**
* austin * austin
*/ */
@XxlJob("austinJob") @XxlJob("austinJob")
public void execute() { public void execute() {
log.info("CronTaskHandler#execute messageTemplateId:{} cron exec!", XxlJobHelper.getJobParam()); log.info("CronTaskHandler#execute messageTemplateId:{} cron exec!", XxlJobHelper.getJobParam());
threadPoolUtils.register(dtpExecutor);
Long messageTemplateId = Long.valueOf(XxlJobHelper.getJobParam()); Long messageTemplateId = Long.valueOf(XxlJobHelper.getJobParam());
taskHandler.handle(messageTemplateId); dtpExecutor.execute(() -> taskHandler.handle(messageTemplateId));
} }
} }

@ -2,9 +2,9 @@ package com.java3y.austin.cron.pending;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import cn.hutool.core.thread.ExecutorBuilder;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.java3y.austin.cron.config.CronAsyncThreadPoolConfig;
import com.java3y.austin.cron.constants.PendingConstant; import com.java3y.austin.cron.constants.PendingConstant;
import com.java3y.austin.cron.vo.CrowdInfoVo; import com.java3y.austin.cron.vo.CrowdInfoVo;
import com.java3y.austin.service.api.domain.BatchSendRequest; import com.java3y.austin.service.api.domain.BatchSendRequest;
@ -23,8 +23,6 @@ import org.springframework.stereotype.Component;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** /**
* *
@ -45,15 +43,7 @@ public class CrowdBatchTaskPending extends AbstractLazyPending<CrowdInfoVo> {
pendingParam.setNumThreshold(PendingConstant.NUM_THRESHOLD) pendingParam.setNumThreshold(PendingConstant.NUM_THRESHOLD)
.setQueue(new LinkedBlockingQueue(PendingConstant.QUEUE_SIZE)) .setQueue(new LinkedBlockingQueue(PendingConstant.QUEUE_SIZE))
.setTimeThreshold(PendingConstant.TIME_THRESHOLD) .setTimeThreshold(PendingConstant.TIME_THRESHOLD)
.setExecutorService(ExecutorBuilder.create() .setExecutorService(CronAsyncThreadPoolConfig.getConsumePendingThreadPool());
.setCorePoolSize(PendingConstant.CORE_POOL_SIZE)
.setMaxPoolSize(PendingConstant.MAX_POOL_SIZE)
.setWorkQueue(PendingConstant.BLOCKING_QUEUE)
.setHandler(new ThreadPoolExecutor.CallerRunsPolicy())
.setAllowCoreThreadTimeOut(true)
.setKeepAliveTime(PendingConstant.KEEP_LIVE_TIME, TimeUnit.SECONDS)
.build());
this.pendingParam = pendingParam; this.pendingParam = pendingParam;
} }
@ -89,4 +79,5 @@ public class CrowdBatchTaskPending extends AbstractLazyPending<CrowdInfoVo> {
.build(); .build();
sendService.batchSend(batchSendRequest); sendService.batchSend(batchSendRequest);
} }
} }

@ -1,17 +1,19 @@
package com.java3y.austin.cron.service.impl; package com.java3y.austin.cron.service.impl;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.text.csv.CsvRow;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.java3y.austin.cron.csv.CountFileRowHandler;
import com.java3y.austin.cron.pending.CrowdBatchTaskPending; import com.java3y.austin.cron.pending.CrowdBatchTaskPending;
import com.java3y.austin.cron.service.TaskHandler; import com.java3y.austin.cron.service.TaskHandler;
import com.java3y.austin.cron.utils.ReadFileUtils; import com.java3y.austin.cron.utils.ReadFileUtils;
import com.java3y.austin.cron.vo.CrowdInfoVo; import com.java3y.austin.cron.vo.CrowdInfoVo;
import com.java3y.austin.support.dao.MessageTemplateDao; import com.java3y.austin.support.dao.MessageTemplateDao;
import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.domain.MessageTemplate;
import com.java3y.austin.support.pending.AbstractLazyPending;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.HashMap; import java.util.HashMap;
@ -29,10 +31,9 @@ public class TaskHandlerImpl implements TaskHandler {
@Autowired @Autowired
private ApplicationContext context; private ApplicationContext context;
@Override @Override
@Async
public void handle(Long messageTemplateId) { public void handle(Long messageTemplateId) {
log.info("TaskHandler handle:{}", Thread.currentThread().getName());
MessageTemplate messageTemplate = messageTemplateDao.findById(messageTemplateId).get(); MessageTemplate messageTemplate = messageTemplateDao.findById(messageTemplateId).get();
if (messageTemplate == null || StrUtil.isBlank(messageTemplate.getCronCrowdPath())) { if (messageTemplate == null || StrUtil.isBlank(messageTemplate.getCronCrowdPath())) {
@ -40,19 +41,42 @@ public class TaskHandlerImpl implements TaskHandler {
return; return;
} }
// 1. 获取文件行数大小
long countCsvRow = ReadFileUtils.countCsvRow(messageTemplate.getCronCrowdPath(), new CountFileRowHandler());
// 2. 读取文件得到每一行记录给到队列做lazy batch处理
CrowdBatchTaskPending crowdBatchTaskPending = context.getBean(CrowdBatchTaskPending.class); CrowdBatchTaskPending crowdBatchTaskPending = context.getBean(CrowdBatchTaskPending.class);
// 读取文件得到每一行记录给到队列做lazy batch处理
ReadFileUtils.getCsvRow(messageTemplate.getCronCrowdPath(), row -> { ReadFileUtils.getCsvRow(messageTemplate.getCronCrowdPath(), row -> {
if (CollUtil.isEmpty(row.getFieldMap()) if (CollUtil.isEmpty(row.getFieldMap())
|| StrUtil.isBlank(row.getFieldMap().get(ReadFileUtils.RECEIVER_KEY))) { || StrUtil.isBlank(row.getFieldMap().get(ReadFileUtils.RECEIVER_KEY))) {
return; return;
} }
// 3. 每一行处理交给LazyPending
HashMap<String, String> params = ReadFileUtils.getParamFromLine(row.getFieldMap()); HashMap<String, String> params = ReadFileUtils.getParamFromLine(row.getFieldMap());
CrowdInfoVo crowdInfoVo = CrowdInfoVo.builder().receiver(row.getFieldMap().get(ReadFileUtils.RECEIVER_KEY)) CrowdInfoVo crowdInfoVo = CrowdInfoVo.builder().receiver(row.getFieldMap().get(ReadFileUtils.RECEIVER_KEY))
.params(params).messageTemplateId(messageTemplateId).build(); .params(params).messageTemplateId(messageTemplateId).build();
crowdBatchTaskPending.pending(crowdInfoVo); crowdBatchTaskPending.pending(crowdInfoVo);
});
// 4. 判断是否读取文件完成回收资源且更改状态
onComplete(row, countCsvRow, crowdBatchTaskPending, messageTemplateId);
});
} }
/**
*
* 1. 线(线)
* 2. ()
*
* @param row
* @param countCsvRow
* @param crowdBatchTaskPending
* @param messageTemplateId
*/
private void onComplete(CsvRow row, long countCsvRow, AbstractLazyPending crowdBatchTaskPending, Long messageTemplateId) {
if (row.getOriginalLineNumber() == countCsvRow) {
crowdBatchTaskPending.setStop(true);
log.info("messageTemplate:[{}] read csv file complete!", messageTemplateId);
}
}
} }

@ -4,6 +4,7 @@ import cn.hutool.core.io.FileUtil;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.csv.*; import cn.hutool.core.text.csv.*;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.java3y.austin.cron.csv.CountFileRowHandler;
import com.java3y.austin.cron.vo.CrowdInfoVo; import com.java3y.austin.cron.vo.CrowdInfoVo;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -45,9 +46,28 @@ public class ReadFileUtils {
} }
} }
/**
* csv
*
* @param path
* @param countFileRowHandler
*/
public static long countCsvRow(String path, CountFileRowHandler countFileRowHandler) {
try {
// 把首行当做是标题获取reader
CsvReader reader = CsvUtil.getReader(new FileReader(path),
new CsvReadConfig().setContainsHeader(true));
reader.read(countFileRowHandler);
} catch (Exception e) {
log.error("ReadFileUtils#getCsvRow fail!{}", Throwables.getStackTraceAsString(e));
}
return countFileRowHandler.getRowSize();
}
/** /**
* params * params
* [{key:value},{key:value}] * [{key:value},{key:value}]
*
* @param fieldMap * @param fieldMap
* @return * @return
*/ */

@ -0,0 +1,33 @@
package com.java3y.austin.support.config;
import cn.hutool.core.thread.ExecutorBuilder;
import com.java3y.austin.common.constant.ThreadPoolConstant;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author 3y
* support 线
*
*/
public class SupportThreadPoolConfig {
/**
* pending线
* 线线线
* 线Springfalse
*/
public static ExecutorService getPendingSingleThreadPool() {
return ExecutorBuilder.create()
.setCorePoolSize(ThreadPoolConstant.SINGLE_CORE_POOL_SIZE)
.setMaxPoolSize(ThreadPoolConstant.SINGLE_MAX_POOL_SIZE)
.setWorkQueue(ThreadPoolConstant.BIG_BLOCKING_QUEUE)
.setHandler(new ThreadPoolExecutor.CallerRunsPolicy())
.setAllowCoreThreadTimeOut(true)
.setKeepAliveTime(ThreadPoolConstant.SMALL_KEEP_LIVE_TIME, TimeUnit.SECONDS)
.build();
}
}

@ -1,36 +0,0 @@
package com.java3y.austin.support.config;
import com.dtp.common.em.QueueTypeEnum;
import com.dtp.core.support.ThreadPoolCreator;
import com.dtp.core.thread.DtpExecutor;
import com.dtp.core.thread.ThreadPoolBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author Redick01
*/
@Configuration
public class ThreadPoolConfiguration {
@Bean
public DtpExecutor dtpExecutor() {
return ThreadPoolCreator.createDynamicFast("dynamic-tp-test-1");
}
@Bean
public ThreadPoolExecutor threadPoolExecutor() {
return ThreadPoolBuilder.newBuilder()
.threadPoolName("dynamic-tp-test-2")
.corePoolSize(10)
.maximumPoolSize(15)
.keepAliveTime(15000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false)
.buildDynamic();
}
}

@ -1,15 +1,16 @@
package com.java3y.austin.support.pending; package com.java3y.austin.support.pending;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.thread.ThreadUtil;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.java3y.austin.support.config.SupportThreadPoolConfig;
import lombok.Data; import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -36,12 +37,18 @@ public abstract class AbstractLazyPending<T> {
*/ */
private Long lastHandleTime = System.currentTimeMillis(); private Long lastHandleTime = System.currentTimeMillis();
/**
* 线
*/
private Boolean stop = false;
/** /**
* 线 * 线
*/ */
@PostConstruct @PostConstruct
public void initConsumePending() { public void initConsumePending() {
ThreadUtil.newSingleExecutor().execute(() -> { ExecutorService executorService = SupportThreadPoolConfig.getPendingSingleThreadPool();
executorService.execute(() -> {
while (true) { while (true) {
try { try {
T obj = pendingParam.getQueue().poll(pendingParam.getTimeThreshold(), TimeUnit.MILLISECONDS); T obj = pendingParam.getQueue().poll(pendingParam.getTimeThreshold(), TimeUnit.MILLISECONDS);
@ -58,16 +65,23 @@ public abstract class AbstractLazyPending<T> {
// 具体执行逻辑 // 具体执行逻辑
pendingParam.getExecutorService().execute(() -> this.handle(taskRef)); pendingParam.getExecutorService().execute(() -> this.handle(taskRef));
} }
// 判断是否停止当前线程
if (stop && CollUtil.isEmpty(tasks)) {
break;
}
} catch (Exception e) { } catch (Exception e) {
log.error("Pending#initConsumePending failed:{}", Throwables.getStackTraceAsString(e)); log.error("Pending#initConsumePending failed:{}", Throwables.getStackTraceAsString(e));
} }
} }
}); });
executorService.shutdown();
} }
/** /**
* 1. * 1.
* 2. * 2.
*
* @return * @return
*/ */
private boolean dataReady() { private boolean dataReady() {
@ -110,4 +124,5 @@ public abstract class AbstractLazyPending<T> {
* @param list * @param list
*/ */
public abstract void doHandle(List<T> list); public abstract void doHandle(List<T> list);
} }

@ -0,0 +1,31 @@
package com.java3y.austin.support.utils;
import com.dtp.core.DtpRegistry;
import com.dtp.core.thread.DtpExecutor;
import com.java3y.austin.support.config.ThreadPoolExecutorShutdownDefinition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 线
*
* @author 3y
*/
@Component
public class ThreadPoolUtils {
@Autowired
private ThreadPoolExecutorShutdownDefinition shutdownDefinition;
private static final String SOURCE_NAME = "austin";
/**
* 1. 线 线
* 2. 线 Spring
*/
public void register(DtpExecutor dtpExecutor) {
DtpRegistry.register(dtpExecutor, SOURCE_NAME);
shutdownDefinition.registryExecutor(dtpExecutor);
}
}

@ -13,10 +13,12 @@ public class ThreadPoolTest {
@GetMapping("/tp") @GetMapping("/tp")
public void send() { public void send() {
DtpExecutor dtpExecutor1 = DtpRegistry.getExecutor("austin-im.notice"); DtpExecutor dtpExecutor1 = DtpRegistry.getExecutor("austin-im.notice");
DtpExecutor dtpExecutor2 = DtpRegistry.getExecutor("dynamic-tp-test-2"); DtpExecutor dtpExecutor2 = DtpRegistry.getExecutor("execute-xxl-thread-pool");
DtpExecutor dtpExecutor3 = DtpRegistry.getExecutor("dynamic-tp-test-2");
System.out.println(dtpExecutor1); System.out.println(dtpExecutor1);
System.out.println(dtpExecutor2); System.out.println(dtpExecutor2);
System.out.println(dtpExecutor3);
} }
} }

@ -52,18 +52,6 @@ austin.business.log.topic.name=austinLog
austin.business.graylog.ip=${austin-grayLog-ip} austin.business.graylog.ip=${austin-grayLog-ip}
# TODO if windows os ,replace path ! # TODO if windows os ,replace path !
austin.business.upload.crowd.path=/Users/3y/temp austin.business.upload.crowd.path=/Users/3y/temp
##################### business cron async properties #####################
austin.async.task.thread-name-prefix=austinAsyncExecutor-
austin.async.task.max-size=2
austin.async.task.core-size=2
austin.async.task.queue-capacity=20
austin.async.task.keep-alive=60
austin.async.task.rejected-handler=callrunspolicy
austin.async.task.allow-core-thread-timeout=false
austin.async.task.await-termination-seconds=10
austin.async.task.wait-for-tasks-to-complete-on-shut-down=true
##################### xxl properties ##################### ##################### xxl properties #####################
xxl.job.admin.addresses=http://${austin-xxl-job-ip}:${austin-xxl-job-port}/xxl-job-admin xxl.job.admin.addresses=http://${austin-xxl-job-ip}:${austin-xxl-job-port}/xxl-job-admin

@ -19,22 +19,24 @@ spring:
secret: SECb544445a6a34f0315d08b17de41 secret: SECb544445a6a34f0315d08b17de41
receivers: 18888888888 receivers: 18888888888
executors: executors:
- threadPoolName: dynamic-tp-test-1 - threadPoolName: austin-im.notice
corePoolSize: 5 corePoolSize: 6
maximumPoolSize: 8 maximumPoolSize: 8
keepAliveTime: 40 queueCapacity: 200
queueType: VariableLinkedBlockingQueue queueType: VariableLinkedBlockingQueue # 任务队列查看源码QueueTypeEnum枚举类
queueCapacity: 500 rejectedHandlerType: CallerRunsPolicy # 拒绝策略查看RejectedTypeEnum枚举类
rejectedHandlerType: CallerRunsPolicy keepAliveTime: 50
threadNamePrefix: test-1 allowCoreThreadTimeOut: false
threadNamePrefix: austin- # 线程名前缀
- threadPoolName: dynamic-tp-test-2 - threadPoolName: execute-xxl-thread-pool
corePoolSize: 3 corePoolSize: 3
maximumPoolSize: 4 maximumPoolSize: 3
queueCapacity: 200
queueType: VariableLinkedBlockingQueue # 任务队列查看源码QueueTypeEnum枚举类
rejectedHandlerType: CallerRunsPolicy # 拒绝策略查看RejectedTypeEnum枚举类
keepAliveTime: 50 keepAliveTime: 50
queueType: VariableLinkedBlockingQueue allowCoreThreadTimeOut: false
queueCapacity: 5000 threadNamePrefix: austin- # 线程名前缀
threadNamePrefix: test2
notifyItems: # 报警项,不配置自动会配置(变更通知、容量报警、活性报警、拒绝报警) notifyItems: # 报警项,不配置自动会配置(变更通知、容量报警、活性报警、拒绝报警)
- type: capacity # 报警项类型,查看源码 NotifyTypeEnum枚举类 - type: capacity # 报警项类型,查看源码 NotifyTypeEnum枚举类
enabled: true enabled: true
@ -48,5 +50,4 @@ spring:
threshold: 80 threshold: 80
- type: reject - type: reject
enabled: true enabled: true
threshold: 1 threshold: 1

@ -164,7 +164,7 @@
<dependency> <dependency>
<groupId>io.github.lyh200</groupId> <groupId>io.github.lyh200</groupId>
<artifactId>dynamic-tp-spring-boot-starter-apollo</artifactId> <artifactId>dynamic-tp-spring-boot-starter-apollo</artifactId>
<version>1.0.1</version> <version>1.0.2</version>
</dependency> </dependency>
</dependencies> </dependencies>

Loading…
Cancel
Save