diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/config/AsyncConfiguration.java b/austin-cron/src/main/java/com/java3y/austin/cron/config/AsyncConfiguration.java new file mode 100644 index 0000000..e415dd3 --- /dev/null +++ b/austin-cron/src/main/java/com/java3y/austin/cron/config/AsyncConfiguration.java @@ -0,0 +1,48 @@ +package com.java3y.austin.cron.config; + +import com.google.common.base.Throwables; +import lombok.extern.slf4j.Slf4j; +import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.AsyncConfigurer; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * 处理定时任务的线程池配置信息,为@Async注解服务 + * + * @author 3y + */ +@Slf4j +@Configuration +@EnableAsync +public class AsyncConfiguration implements AsyncConfigurer { + + @Bean("austinExecutor") + public ThreadPoolTaskExecutor executor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(10); + executor.setMaxPoolSize(10); + executor.setQueueCapacity(30); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.setThreadNamePrefix("austinAsyncExecutor-"); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(10); + executor.initialize(); + return executor; + } + + @Override + public Executor getAsyncExecutor() { + return executor(); + } + + @Override + public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { + return (ex, method, params) -> log.error("austinExecutor execute fail!method:{},params:{},ex:{}", method, params, Throwables.getStackTraceAsString(ex)); + } +} diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/constants/PendingConstant.java b/austin-cron/src/main/java/com/java3y/austin/cron/constants/PendingConstant.java new file mode 100644 index 0000000..488d5ef --- /dev/null +++ b/austin-cron/src/main/java/com/java3y/austin/cron/constants/PendingConstant.java @@ -0,0 +1,31 @@ +package com.java3y.austin.cron.constants; + +/** + * @author 3y + * @date 2022/2/13 + * 缓冲pending 常量 + */ +public class PendingConstant { + + /** + * 阻塞队列大小 + */ + public static final Integer QUEUE_SIZE = 100; + + /** + * 触发执行的数量阈值 + */ + public static final Integer NUM_THRESHOLD = 100; + + /** + * batch 触发执行的时间阈值,单位毫秒【必填】 + */ + public static final Long TIME_THRESHOLD = 1000L; + + /** + * 消费线程数 + */ + public static final Integer THREAD_NUM = 2; + + +} diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/domain/CrowdInfoVo.java b/austin-cron/src/main/java/com/java3y/austin/cron/domain/CrowdInfoVo.java index 7e994c4..ae17152 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/domain/CrowdInfoVo.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/domain/CrowdInfoVo.java @@ -24,7 +24,7 @@ public class CrowdInfoVo implements Serializable { /** * 接收者id */ - private String id; + private String receiver; /** * 参数信息 diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/handler/CronTaskHandler.java b/austin-cron/src/main/java/com/java3y/austin/cron/handler/CronTaskHandler.java index 2f830a7..317bd20 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/handler/CronTaskHandler.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/handler/CronTaskHandler.java @@ -24,9 +24,8 @@ public class CronTaskHandler { */ @XxlJob("austinJob") public void execute() { - log.info("XXL-JOB, Hello World."); + log.info("CronTaskHandler#execute messageTemplateId:{} cron exec!", XxlJobHelper.getJobParam()); Long messageTemplateId = Long.valueOf(XxlJobHelper.getJobParam()); - taskHandler.handle(messageTemplateId); } diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/pending/CrowdBatchTaskPending.java b/austin-cron/src/main/java/com/java3y/austin/cron/pending/CrowdBatchTaskPending.java new file mode 100644 index 0000000..eb24d97 --- /dev/null +++ b/austin-cron/src/main/java/com/java3y/austin/cron/pending/CrowdBatchTaskPending.java @@ -0,0 +1,39 @@ +package com.java3y.austin.cron.pending; + +import com.java3y.austin.cron.domain.CrowdInfoVo; +import com.java3y.austin.support.pending.BatchPendingThread; +import com.java3y.austin.support.pending.Pending; +import com.java3y.austin.support.pending.PendingParam; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * 批量处理任务信息 + * + * @author 3y + */ +@Component +@Slf4j +public class CrowdBatchTaskPending extends Pending { + + @Override + public void initAndStart(PendingParam pendingParam) { + threadNum = pendingParam.getThreadNum() == null ? threadNum : pendingParam.getThreadNum(); + queue = pendingParam.getQueue(); + + for (int i = 0; i < threadNum; ++i) { + BatchPendingThread batchPendingThread = new BatchPendingThread(); + batchPendingThread.setPendingParam(pendingParam); + batchPendingThread.setName("batchPendingThread-" + i); + batchPendingThread.start(); + } + } + + @Override + public void doHandle(List list) { + log.info("theadName:{},doHandle:{}", Thread.currentThread().getName(), list.size()); + + } +} diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/service/impl/TaskHandlerImpl.java b/austin-cron/src/main/java/com/java3y/austin/cron/service/impl/TaskHandlerImpl.java index d299bd9..d4a3e02 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/service/impl/TaskHandlerImpl.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/service/impl/TaskHandlerImpl.java @@ -2,18 +2,21 @@ package com.java3y.austin.cron.service.impl; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; -import com.alibaba.fastjson.JSON; +import com.java3y.austin.cron.constants.PendingConstant; import com.java3y.austin.cron.domain.CrowdInfoVo; +import com.java3y.austin.cron.pending.CrowdBatchTaskPending; import com.java3y.austin.cron.service.TaskHandler; import com.java3y.austin.cron.utils.ReadFileUtils; import com.java3y.austin.support.dao.MessageTemplateDao; import com.java3y.austin.support.domain.MessageTemplate; +import com.java3y.austin.support.pending.PendingParam; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; -import java.util.List; +import java.util.HashMap; +import java.util.concurrent.LinkedBlockingQueue; /** * @author 3y @@ -24,23 +27,42 @@ import java.util.List; public class TaskHandlerImpl implements TaskHandler { @Autowired private MessageTemplateDao messageTemplateDao; + @Autowired + private CrowdBatchTaskPending crowdBatchTaskPending; @Override @Async public void handle(Long messageTemplateId) { + log.info("start:{}", Thread.currentThread().getName()); + MessageTemplate messageTemplate = messageTemplateDao.findById(messageTemplateId).get(); if (messageTemplate == null || StrUtil.isBlank(messageTemplate.getCronCrowdPath())) { - log.error("TaskHandler#handle crowdPath empty!"); + log.error("TaskHandler#handle crowdPath empty! messageTemplateId:{}", messageTemplateId); return; } - List csvRowList = ReadFileUtils.getCsvRowList(messageTemplate.getCronCrowdPath()); - if (CollUtil.isNotEmpty(csvRowList)) { + // 初始化pending的信息 + PendingParam pendingParam = new PendingParam<>(); + pendingParam.setNumThreshold(PendingConstant.NUM_THRESHOLD) + .setQueue(new LinkedBlockingQueue(PendingConstant.QUEUE_SIZE)) + .setTimeThreshold(PendingConstant.TIME_THRESHOLD) + .setThreadNum(PendingConstant.THREAD_NUM) + .setPending(crowdBatchTaskPending); + crowdBatchTaskPending.initAndStart(pendingParam); - } + // 读取文件得到每一行记录给到队列做batch处理 + ReadFileUtils.getCsvRow(messageTemplate.getCronCrowdPath(), row -> { + if (CollUtil.isEmpty(row.getFieldMap()) + || StrUtil.isBlank(row.getFieldMap().get(ReadFileUtils.RECEIVER_KEY))) { + return; + } + HashMap params = ReadFileUtils.getParamFromLine(row.getFieldMap()); + CrowdInfoVo crowdInfoVo = CrowdInfoVo.builder().receiver(row.getFieldMap().get(ReadFileUtils.RECEIVER_KEY)) + .params(params).build(); + crowdBatchTaskPending.pending(crowdInfoVo); + }); - log.info("csv info:", JSON.toJSONString(csvRowList)); } } diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/utils/ReadFileUtils.java b/austin-cron/src/main/java/com/java3y/austin/cron/utils/ReadFileUtils.java index f9279aa..61a3706 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/utils/ReadFileUtils.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/utils/ReadFileUtils.java @@ -2,14 +2,14 @@ package com.java3y.austin.cron.utils; import cn.hutool.core.io.FileUtil; import cn.hutool.core.map.MapUtil; -import cn.hutool.core.text.csv.CsvData; -import cn.hutool.core.text.csv.CsvRow; -import cn.hutool.core.text.csv.CsvUtil; +import cn.hutool.core.text.csv.*; import com.google.common.base.Throwables; import com.java3y.austin.cron.domain.CrowdInfoVo; import lombok.extern.slf4j.Slf4j; +import java.io.FileReader; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -23,7 +23,47 @@ import java.util.Map; public class ReadFileUtils { /** - * 读取csv文件 + * csv文件 存储 接收者 的列名 + */ + public static final String RECEIVER_KEY = "userId"; + + /** + * 读取csv文件,每读取一行都会调用 csvRowHandler 对应的方法 + * + * @param path + * @param csvRowHandler + */ + public static void getCsvRow(String path, CsvRowHandler csvRowHandler) { + try { + // 把首行当做是标题,获取reader + CsvReader reader = CsvUtil.getReader(new FileReader(path), + new CsvReadConfig().setContainsHeader(true)); + reader.read(csvRowHandler); + } catch (Exception e) { + log.error("ReadFileUtils#getCsvRow fail!{}", Throwables.getStackTraceAsString(e)); + + } + } + + /** + * 从文件的每一行数据获取到params信息 + * [{key:value},{key:value}] + * @param fieldMap + * @return + */ + public static HashMap getParamFromLine(Map fieldMap) { + HashMap params = MapUtil.newHashMap(); + for (Map.Entry entry : fieldMap.entrySet()) { + if (!ReadFileUtils.RECEIVER_KEY.equals(entry.getKey())) { + params.put(entry.getKey(), entry.getValue()); + } + } + return params; + } + + + /** + * 一次性读取csv文件整个内容 * 1. 获取第一行信息(id,paramsKey1,params2Key2),第一列默认为接收者Id * 2. 把文件信息塞进对象内 * 3. 把对象返回 @@ -46,12 +86,14 @@ public class ReadFileUtils { for (int j = 1; j < headerInfo.size(); j++) { param.put(headerInfo.get(j), row.get(j)); } - result.add(CrowdInfoVo.builder().id(row.get(0)).params(param).build()); + result.add(CrowdInfoVo.builder().receiver(row.get(0)).params(param).build()); } } catch (Exception e) { - log.error("TaskHandler#getCsvRowList fail!{}", Throwables.getStackTraceAsString(e)); + log.error("ReadFileUtils#getCsvRowList fail!{}", Throwables.getStackTraceAsString(e)); } return result; } + + } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/pending/Task.java b/austin-handler/src/main/java/com/java3y/austin/handler/pending/Task.java index 8a63bbf..3cc49ac 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/pending/Task.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/pending/Task.java @@ -44,7 +44,7 @@ public class Task implements Runnable { return; } - // 1.平台通用去重 test + // 1.平台通用去重 deduplicationRuleService.duplication(taskInfo); // 2. 真正发送消息 diff --git a/austin-support/src/main/java/com/java3y/austin/support/pending/BatchPendingThread.java b/austin-support/src/main/java/com/java3y/austin/support/pending/BatchPendingThread.java new file mode 100644 index 0000000..fac0842 --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/pending/BatchPendingThread.java @@ -0,0 +1,64 @@ +package com.java3y.austin.support.pending; + +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import lombok.Data; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * 延迟消费的线程 实现 + * 积攒一定的数量 或 时间 才消费,达到批量消费的效果 + * + * @author 3y + */ +@Data +@Accessors(chain = true) +@Slf4j +public class BatchPendingThread extends Thread { + + private PendingParam pendingParam; + + /** + * 批量装载任务 + */ + private List tasks = new ArrayList<>(); + + /** + * 当前装载任务的大小 + */ + private Integer total = 0; + + /** + * 上次执行的时间 + */ + private Long lastHandleTime = System.currentTimeMillis(); + + + @Override + public void run() { + while (true) { + try { + T obj = pendingParam.getQueue().poll(pendingParam.getTimeThreshold(), TimeUnit.MILLISECONDS); + if (null != obj) { + tasks.add(obj); + } + + // 处理条件:1. 数量超限 2. 时间超限 + if ((tasks.size() >= pendingParam.getNumThreshold()) + || (System.currentTimeMillis() - lastHandleTime >= pendingParam.getTimeThreshold())) { + List taskRef = tasks; + tasks = Lists.newArrayList(); + lastHandleTime = System.currentTimeMillis(); + pendingParam.getPending().handle(taskRef); + } + } catch (Exception e) { + log.error("BatchPendingThread#run failed:{}", Throwables.getStackTraceAsString(e)); + } + } + } +} diff --git a/austin-support/src/main/java/com/java3y/austin/support/pending/Pending.java b/austin-support/src/main/java/com/java3y/austin/support/pending/Pending.java new file mode 100644 index 0000000..6bf043e --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/pending/Pending.java @@ -0,0 +1,79 @@ +package com.java3y.austin.support.pending; + +import com.google.common.base.Throwables; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.concurrent.BlockingQueue; + +/** + * 阻塞队列-消费者和生产者实现 + * + * @author 3y + */ +@Slf4j +@Data +public abstract class Pending { + + /** + * 可使用的线程数 + */ + public static final int DEFAULT_THREAD_NUM = Runtime.getRuntime().availableProcessors(); + + /** + * 阻塞队列 实现类 + */ + protected BlockingQueue queue; + + /** + * 默认消费线程数 = 目前可使用的线程数 + */ + protected Integer threadNum = DEFAULT_THREAD_NUM; + + + /** + * 将元素放入阻塞队列中 + * + * @param t + */ + public void pending(T t) { + try { + queue.put(t); + } catch (InterruptedException e) { + log.error("Pending#pending error:{}", Throwables.getStackTraceAsString(e)); + } + } + + /** + * 消费阻塞队列元素时的方法 + * + * @param t + */ + public void handle(List t) { + if (t.isEmpty()) { + return; + } + try { + doHandle(t); + } catch (Exception e) { + log.error("Pending#handle failed:{}", Throwables.getStackTraceAsString(e)); + } + } + + + /** + * 初始化并启动 + * + * @param pendingParam 参数信息 + */ + public abstract void initAndStart(PendingParam pendingParam); + + + /** + * 处理阻塞队列的元素 真正方法 + * + * @param list + */ + public abstract void doHandle(List list); +} diff --git a/austin-support/src/main/java/com/java3y/austin/support/pending/PendingParam.java b/austin-support/src/main/java/com/java3y/austin/support/pending/PendingParam.java new file mode 100644 index 0000000..0e81128 --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/pending/PendingParam.java @@ -0,0 +1,48 @@ +package com.java3y.austin.support.pending; + + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; + +import java.util.concurrent.BlockingQueue; + + +/** + * @author 3y + * pending初始化参数类 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +@Accessors(chain = true) +public class PendingParam { + + /** + * 阻塞队列实现类【必填】 + */ + private BlockingQueue queue; + + /** + * batch 触发执行的数量阈值【必填】 + */ + private Integer numThreshold; + + /** + * batch 触发执行的时间阈值,单位毫秒【必填】 + */ + private Long timeThreshold; + + /** + * pending具体实现对象 + */ + private Pending pending; + + /** + * 消费线程数【可选】 + */ + protected Integer threadNum; +}