1、crowdBatchTaskPending 线程池 设置核心线程空闲 回收策略以及时间

2、线程池配置优化
3、优雅关闭非Spring管理的线程池(停机减少数据丢失)
4、优化read.me
pull/6/head
3y 3 years ago
parent 30457baa3d
commit ef19e876bc

@ -8,10 +8,12 @@
<a href="https://github.com/ZhongFuCheng3y/austin"><img src="https://img.shields.io/github/stars/ZhongFuCheng3y/austin.svg?style=flat&label=GithubStars"></a>
<a href="https://github.com/ZhongFuCheng3y/austin-admin"><img src="https://img.shields.io/badge/austin前端-GitHub-green.svg" alt="作者"></a>
<a href="https://996.icu"><img src="https://img.shields.io/badge/link-996.icu-red.svg" alt="996.icu"></a>
<a href="#项目交流"><img src="https://img.shields.io/badge/项目-交流-orange.svg" alt="项目交流"></a>
<a href="#如何准备面试"><img src="https://img.shields.io/badge/如何准备-面试-yellow.svg" alt="对线面试官"></a>
</p>
## 项目介绍
austin项目**核心功能**:发送消息
@ -73,33 +75,49 @@ austin项目**核心流程**`austin-api`接收到发送消息请求,直接
**10**、调用http接口`com.java3y.austin.web.controller#send`给自己发一条短信或者邮件感受
```shell
curl -XPOST "127.0.0.1:8080/send" -H 'Content-Type: application/json' -d '"{"code":"send","messageParam":{"extra":null,"receiver":"13719333899","variables":{"content":"2222","url":"1111"}},"messageTemplateId":1}"'
curl -XPOST "127.0.0.1:8080/send" -H 'Content-Type: application/json' -d '{"code":"send","messageParam":{"extra":null,"receiver":"13719333899"},"messageTemplateId":1}'
```
**11**、austin前端管理系统部署一分钟即能打开戳[GitHub](https://github.com/ZhongFuCheng3y/austin-admin)或[Gitee](https://gitee.com/zhongfucheng/austin-admin)查看
![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/4c818806aa7540f5afe72cfbdef2f7d7~tplv-k3u1fbpfcp-watermark.image?)
![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/70b931917b5448d7be46daa384fd6220~tplv-k3u1fbpfcp-watermark.image?)
![](https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/1cb026b6d5c2458faacf26a0bd35dc01~tplv-k3u1fbpfcp-watermark.image?)
![](https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/eb4c7f61ddc64de5accd231bc7145010~tplv-k3u1fbpfcp-watermark.image?)
## 里程碑
第四季度:[三个月已完成的内容](https://mp.weixin.qq.com/s?__biz=MzI4Njg5MDA5NA==&mid=2247503562&idx=1&sn=99ba92325ae5e8f8054700e770f0898d&chksm=ebd48fcbdca306dd65f2a56c2febd0ef5e6227aa8166183b9fe7edc1ba09ef6066ef3d797af6&token=1246005878&lang=zh_CN#rd)
**11**、austin前端管理系统部署一分钟即能打开戳[GitHub](https://github.com/ZhongFuCheng3y/austin-admin)或[Gitee](https://gitee.com/zhongfucheng/austin-admin)查看
![](https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/4f99631fe25c42b39cbfb6e59cccec85~tplv-k3u1fbpfcp-watermark.image?)
![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/a023d9082fa644bda9b50144e02985cb~tplv-k3u1fbpfcp-zoom-1.image)
![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/7125184e9fbf4de8b522aecbd4e791df~tplv-k3u1fbpfcp-zoom-1.image)
短时间内规划:
- [x] 01、搭建消息推送Web后台管理页面进一步降低接入和使用门槛
- [x] 02、引入分布式定时任务框架实现定时推送消息提高运营侧下发营销消息的效率。
- [ ] 03、接入流式处理平台实时处理日志数据多维度聚合后产生全链路追踪数据供业务方自行排查问题减少排查问题的人效
- [ ] 04、持续提高消息推送系统的影响力让更多的业务方了解其功能进而挖掘更多拉新和唤醒用户的玩法提高站内的次留率和转化率
![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/4adde725eeee443baf96f286f5429f05~tplv-k3u1fbpfcp-zoom-1.image)
![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/01d028359e6e4229825a7fd8cc22c6c7~tplv-k3u1fbpfcp-zoom-1.image)
**近期更新时间**2022年2月22日
## 里程碑
**近期更新功能**接入flink流式处理平台编写实时相关数据多维度进redis
- [x] Maven+SpringBoot项目搭建
- [x] logback日志记录项目运行时信息引入common/guava/Hutool/Lombok/fastjson/OkHttp工具包
- [x] 接入腾讯云渠道发送一条短信
- [x] 使用SpringData JPA将短信发送记录存储至MySQL
- [x] 使用SpringBoot接入Kafka
- [x] 利用责任链完成对接入层的请求进行封装(责任链模式)
- [x] 消费层实现数据隔离(线程池:生产者与消费者模式)
- [x] 通用去重消息功能SpringBoot接入Redis并使用pipeline减少网络请求
- [x] 配置服务器和docker容器以及SpringBoot应用的监控prometheus+Grafana+auctuator
- [x] 接入分布式配置中心完成 丢失消息、白名单以及账号配置Apollo分布式配置中心
- [x] 邮件渠道接入
- [x] 日志链路数据追踪 + 注解式打印日志(优雅打印日志以及数据埋点)
- [x] 接入GrayLog分布式日志收集框架
- [x] 引入前端低代码平台AMIS搭建后台管理页面
- [x] 接入分布式定时任务框架定时发送任务xxl-job定时任务框架编写上传文件接口并使用LazyPending批处理人群文件数据
- [x] 接入实时流计算平台Flink实时日志数据根据用户维度和消息模板维度清洗至Redis
- [x] 通过AMIS低代码平台接入echarts图表展示实时聚合后的数据
- [ ] 优化现有的代码:优雅停机、动态配置等等
- [ ] 接入微信服务号渠道
- [ ] 接入微信小程序渠道
- [ ] 接入PUSH渠道
- [ ] 持续提高消息推送系统的影响力,让更多的业务方了解其功能,进而挖掘更多拉新和唤醒用户的玩法,提高站内的次留率和转化率
**近期更新时间**2022年3月3日
**近期更新功能**已接入Flink实时流前端图表展示用户和消息模板维度的下发数据
## 项目交流
@ -119,6 +137,4 @@ curl -XPOST "127.0.0.1:8080/send" -H 'Content-Type: application/json' -d '"{"c
**对线面试官**公众号持续更新**面试系列**文章对线面试官系列深受各大开发的好评已有不少的同学通过对线面试官系列得到BATTMD等一线大厂的的offer。一个**讲人话的面试系列**,八股文不再是背诵。
<img align="center" src='https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/f87f574e93964921a4d02146bf3ccdac~tplv-k3u1fbpfcp-zoom-1.image' width=300px height=300px />
<img align="center" src='https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/f87f574e93964921a4d02146bf3ccdac~tplv-k3u1fbpfcp-zoom-1.image' width=300px height=300px />

@ -27,7 +27,6 @@ public class AsyncConfiguration implements AsyncConfigurer {
@Bean("austinExecutor")
@Primary
public ThreadPoolTaskExecutor executor(AsyncExecutionProperties properties) {
log.info("funExecutor -- init ");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(properties.getCoreSize());
executor.setMaxPoolSize(properties.getMaxSize());
@ -38,7 +37,6 @@ public class AsyncConfiguration implements AsyncConfigurer {
executor.setAllowCoreThreadTimeOut(properties.isAllowCoreThreadTimeout());
executor.setWaitForTasksToCompleteOnShutdown(properties.isWaitForTasksToCompleteOnShutDown());
executor.setAwaitTerminationSeconds(properties.getAwaitTerminationSeconds());
log.info("austinExecutor: {} ", executor);
executor.initialize();
return executor;
}

@ -84,7 +84,7 @@ public class AsyncExecutionProperties {
*/
ABORTPOLICY(new ThreadPoolExecutor.AbortPolicy()),
/**
* run_thread
* run_thread
*/
CALLRUNSPOLICY(new ThreadPoolExecutor.CallerRunsPolicy()),
/***

@ -30,6 +30,7 @@ public class PendingConstant {
*/
public static final Integer CORE_POOL_SIZE = 2;
public static final Integer MAX_POOL_SIZE = 2;
public static final Integer KEEP_LIVE_TIME = 20;
public static final BlockingQueue BLOCKING_QUEUE = new LinkedBlockingQueue<>(5);
}

@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
*
@ -49,7 +50,10 @@ public class CrowdBatchTaskPending extends AbstractLazyPending<CrowdInfoVo> {
.setMaxPoolSize(PendingConstant.MAX_POOL_SIZE)
.setWorkQueue(PendingConstant.BLOCKING_QUEUE)
.setHandler(new ThreadPoolExecutor.CallerRunsPolicy())
.setAllowCoreThreadTimeOut(true)
.setKeepAliveTime(PendingConstant.KEEP_LIVE_TIME, TimeUnit.SECONDS)
.build());
this.pendingParam = pendingParam;
}

@ -2,6 +2,8 @@ package com.java3y.austin.handler.pending;
import com.java3y.austin.handler.config.ThreadPoolConfig;
import com.java3y.austin.handler.utils.GroupIdMappingUtils;
import com.java3y.austin.support.config.ThreadPoolExecutorShutdownDefinition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@ -17,6 +19,9 @@ import java.util.concurrent.ExecutorService;
*/
@Component
public class TaskPendingHolder {
@Autowired
private ThreadPoolExecutorShutdownDefinition threadPoolExecutorShutdownDefinition;
/**
* 线
*/
@ -38,7 +43,9 @@ public class TaskPendingHolder {
@PostConstruct
public void init() {
for (String groupId : groupIds) {
taskPendingHolder.put(groupId, ThreadPoolConfig.getThreadPool(coreSize, maxSize, queueSize));
ExecutorService threadPool = ThreadPoolConfig.getThreadPool(coreSize, maxSize, queueSize);
threadPoolExecutorShutdownDefinition.registryExecutor(threadPool);
taskPendingHolder.put(groupId, threadPool);
}
}
/**

@ -0,0 +1,78 @@
package com.java3y.austin.support.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 线
*
* @author 3y
*/
@Component
@Slf4j
public class ThreadPoolExecutorShutdownDefinition implements ApplicationListener<ContextClosedEvent> {
private final List<ExecutorService> POOLS = Collections.synchronizedList(new ArrayList<>(12));
/**
* 线 线
*/
private final long AWAIT_TERMINATION = 60;
/**
* awaitTermination
*/
private final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
public void registryExecutor(ThreadPoolTaskExecutor threadPoolTaskExecutor) {
POOLS.add(threadPoolTaskExecutor.getThreadPoolExecutor());
}
public void registryExecutor(ThreadPoolTaskScheduler threadPoolTaskExecutor) {
POOLS.add(threadPoolTaskExecutor.getScheduledThreadPoolExecutor());
}
public void registryExecutor(ExecutorService executor) {
POOLS.add(executor);
}
/**
* {@link org.springframework.scheduling.concurrent.ExecutorConfigurationSupport#shutdown()}
*
* @param event the event to respond to
*/
@Override
public void onApplicationEvent(ContextClosedEvent event) {
log.info("容器关闭前处理线程池优雅关闭开始, 当前要处理的线程池数量为: {} >>>>>>>>>>>>>>>>", POOLS.size());
if (CollectionUtils.isEmpty(POOLS)) {
return;
}
for (ExecutorService pool : POOLS) {
pool.shutdown();
try {
if (!pool.awaitTermination(AWAIT_TERMINATION, TIME_UNIT)) {
if (log.isWarnEnabled()) {
log.warn("Timed out while waiting for executor [{}] to terminate", pool);
}
}
} catch (InterruptedException ex) {
if (log.isWarnEnabled()) {
log.warn("Timed out while waiting for executor [{}] to terminate", pool);
}
Thread.currentThread().interrupt();
}
}
}
}

@ -143,8 +143,9 @@ public class DataServiceImpl implements DataService {
if (AustinConstant.BUSINESS_ID_LENGTH == businessId.length()) {
return businessId;
}
MessageTemplate messageTemplate = messageTemplateDao.findById(Long.valueOf(businessId)).get();
if (messageTemplate != null) {
Optional<MessageTemplate> optional = messageTemplateDao.findById(Long.valueOf(businessId));
if (optional.isPresent()) {
MessageTemplate messageTemplate = optional.get();
return String.valueOf(TaskInfoUtils.generateBusinessId(messageTemplate.getId(), messageTemplate.getTemplateType()));
}
return businessId;

@ -70,7 +70,7 @@ public class MessageTemplateServiceImpl implements MessageTemplateService {
Iterable<MessageTemplate> messageTemplates = messageTemplateDao.findAllById(ids);
messageTemplates.forEach(messageTemplate -> messageTemplate.setIsDeleted(AustinConstant.TRUE));
for (MessageTemplate messageTemplate : messageTemplates) {
if (messageTemplate.getCronTaskId() > 0) {
if (messageTemplate.getCronTaskId()!=null && messageTemplate.getCronTaskId() > 0) {
cronTaskService.deleteCronTask(messageTemplate.getCronTaskId());
}
}

@ -52,6 +52,17 @@ austin.business.log.topic.name=austinLog
austin.business.graylog.ip=${austin-grayLog-ip}
# TODO if windows os ,replace path !
austin.business.upload.crowd.path=/Users/3y/temp
##################### business cron async properties #####################
austin.async.task.thread-name-prefix=austinAsyncExecutor-
austin.async.task.max-size=2
austin.async.task.core-size=2
austin.async.task.queue-capacity=20
austin.async.task.keep-alive=60
austin.async.task.rejected-handler=callrunspolicy
austin.async.task.allow-core-thread-timeout=false
austin.async.task.await-termination-seconds=10
austin.async.task.wait-for-tasks-to-complete-on-shut-down=true
##################### xxl properties #####################
@ -90,4 +101,4 @@ management.metrics.export.prometheus.enabled=true
wx.mp.account.appid="appid"
wx.mp.account.secret="secret"
wx.mp.account.token="token"
wx.mp.account.aesKey="aesKey"
wx.mp.account.aesKey="aesKey"

@ -1,13 +0,0 @@
austin:
async:
task:
thread-name-prefix: "fun-task-" # task前缀名
max-size: 8 #最大线程数
core-size: 4 #核心线程数
queue-capacity: 100
keep-alive: 60
rejected-handler: callrunspolicy #拒绝策略,不能自定义
allow-core-thread-timeout: true # 是否允许核心线程超时,默认false
await-termination-seconds: 10 #
wait-for-tasks-to-complete-on-shut-down: true # 任务执行完,在关闭应用

@ -0,0 +1,12 @@
userId,content,url
13788881111,尊贵的B站用户,https://www.baidu.com/
13788881112,尊贵的鲁智深用户,https://www.baidu.com/
13788881113,尊贵的孙悟空用户,https://www.baidu.com/
13788881114,尊贵的沙和尚用户,https://www.baidu.com/
13788881115,尊贵的西施用户,https://www.baidu.com/
13788881116,尊贵的安琪拉用户,https://www.baidu.com/
13788881117,尊贵的小鱼人用户,https://www.baidu.com/
13788881118,尊贵的德玛西亚用户,https://www.baidu.com/
13788881119,尊贵的侠剑客用户,https://www.baidu.com/
13788881120,尊贵的梦幻西游用户,https://www.baidu.com/
13788881121,尊贵的网易云用户,https://www.baidu.com/
1 userId content url
2 13788881111 尊贵的B站用户 https://www.baidu.com/
3 13788881112 尊贵的鲁智深用户 https://www.baidu.com/
4 13788881113 尊贵的孙悟空用户 https://www.baidu.com/
5 13788881114 尊贵的沙和尚用户 https://www.baidu.com/
6 13788881115 尊贵的西施用户 https://www.baidu.com/
7 13788881116 尊贵的安琪拉用户 https://www.baidu.com/
8 13788881117 尊贵的小鱼人用户 https://www.baidu.com/
9 13788881118 尊贵的德玛西亚用户 https://www.baidu.com/
10 13788881119 尊贵的侠剑客用户 https://www.baidu.com/
11 13788881120 尊贵的梦幻西游用户 https://www.baidu.com/
12 13788881121 尊贵的网易云用户 https://www.baidu.com/

@ -58,9 +58,12 @@ CREATE TABLE `sms_record`
DEFAULT CHARSET = utf8mb4
COLLATE = utf8mb4_unicode_ci COMMENT ='短信记录信息';
-- 实时类型 短信(无占位符)
INSERT INTO austin.message_template (id, name, audit_status, flow_id, msg_status, cron_task_id, cron_crowd_path, expect_push_time, id_type, send_channel, template_type, msg_type, msg_content, send_account, creator, updator, auditor, team, proposer, is_deleted, created, updated) VALUES (1, '买一送十活动', 10, '', 10, null, '', '', 30, 30, 20, 20, '{"content":"6666","url":"","title":""}', 10, 'Java3y', 'Java3y', '3y', '公众号Java3y', '三歪', 0, 1646274112, 1646275242);
-- 短信测试
INSERT INTO austin.message_template (id, name, audit_status, flow_id, msg_status, cron_task_id, cron_crowd_path, expect_push_time, id_type, send_channel, template_type, msg_type, msg_content, send_account, creator, updator, auditor, team, proposer, is_deleted, created, updated) VALUES (1, '短信测试', 10, '', 10, null, '', '', 30, 30, 20, 20, '{"content":"{$content}","url":"{$url}","title":""}', 10, 'Java3y', 'Java3y', '3y', '公众号Java3y', '3y', 0, 1644387139, 1644387139);
-- 实时类型 邮件(无占位符)
INSERT INTO austin.message_template (id, name, audit_status, flow_id, msg_status, cron_task_id, cron_crowd_path, expect_push_time, id_type, send_channel, template_type, msg_type, msg_content, send_account, creator, updator, auditor, team, proposer, is_deleted, created, updated) VALUES (2, '校招信息', 10, '', 10, null, '', '', 50, 40, 20, 10, '{"content":"你已成功获取到offer","url":"","title":"招聘通知"}', 10, 'Java3y', 'Java3y', '3y', '公众号Java3y', '鸡蛋', 0, 1646274195, 1646274195);
-- 实时类型 短信有占位符占位符key 为 content
INSERT INTO austin.message_template (id, name, audit_status, flow_id, msg_status, cron_task_id, cron_crowd_path, expect_push_time, id_type, send_channel, template_type, msg_type, msg_content, send_account, creator, updator, auditor, team, proposer, is_deleted, created, updated) VALUES (4, '验证码通知', 10, '', 10, null, '', '', 30, 30, 20, 30, '{"content":"{$content}","url":"","title":""}', 10, 'Java3y', 'Java3y', '3y', '公众号Java3y', '孙悟空', 0, 1646275213, 1646275213);
-- 邮件测试
INSERT INTO austin.message_template (id, name, audit_status, flow_id, msg_status, cron_task_id, cron_crowd_path, expect_push_time, id_type, send_channel, template_type, msg_type, msg_content, send_account, creator, updator, auditor, team, proposer, is_deleted, created, updated) VALUES (2, '测试邮件', 10, '', 10, null, '', '', 50, 40, 20, 10, '{"content":"4344444444","url":"","title":"6666666"}', 10, 'Java3y', 'Java3y', '3y', '公众号Java3y', '3y', 0, 1644387638, 1644387638);
Loading…
Cancel
Save