From 24d1bc621f12680d3df9c7d922649a011d54dc2d Mon Sep 17 00:00:00 2001 From: 3y Date: Fri, 11 Mar 2022 22:37:32 +0800 Subject: [PATCH] =?UTF-8?q?handler=20=E7=BA=BF=E7=A8=8B=E6=B1=A0=E6=A8=A1?= =?UTF-8?q?=E5=9D=97=E4=BB=A3=E7=A0=81=E9=87=8D=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/constant/ThreadPoolConstant.java | 2 +- .../austin/cron/utils/ReadFileUtils.java | 4 +- .../config/HandlerThreadPoolConfig.java | 40 +++++++++++++++++++ .../handler/{ => impl}/EmailHandler.java | 4 +- .../{ => impl}/OfficialAccountHandler.java | 4 +- .../handler/{ => impl}/SmsHandler.java | 4 +- .../handler/pending/TaskPendingHolder.java | 32 ++++----------- .../austin/handler/receiver/Receiver.java | 4 +- .../script/{ => impl}/TencentSmsScript.java | 3 +- .../script/{ => impl}/WxMpTemplateScript.java | 3 +- .../main/resources/dynamic-tp-apollo-dtp.yml | 2 +- 11 files changed, 68 insertions(+), 34 deletions(-) create mode 100644 austin-handler/src/main/java/com/java3y/austin/handler/config/HandlerThreadPoolConfig.java rename austin-handler/src/main/java/com/java3y/austin/handler/handler/{ => impl}/EmailHandler.java (93%) rename austin-handler/src/main/java/com/java3y/austin/handler/handler/{ => impl}/OfficialAccountHandler.java (95%) rename austin-handler/src/main/java/com/java3y/austin/handler/handler/{ => impl}/SmsHandler.java (94%) rename austin-handler/src/main/java/com/java3y/austin/handler/script/{ => impl}/TencentSmsScript.java (98%) rename austin-handler/src/main/java/com/java3y/austin/handler/script/{ => impl}/WxMpTemplateScript.java (93%) diff --git a/austin-common/src/main/java/com/java3y/austin/common/constant/ThreadPoolConstant.java b/austin-common/src/main/java/com/java3y/austin/common/constant/ThreadPoolConstant.java index e413860..73f40de 100644 --- a/austin-common/src/main/java/com/java3y/austin/common/constant/ThreadPoolConstant.java +++ b/austin-common/src/main/java/com/java3y/austin/common/constant/ThreadPoolConstant.java @@ -25,7 +25,7 @@ public class ThreadPoolConstant { public static final Integer COMMON_CORE_POOL_SIZE = 2; public static final Integer COMMON_MAX_POOL_SIZE = 2; public static final Integer COMMON_KEEP_LIVE_TIME = 60; - public static final Integer COMMON_QUEUE_SIZE = 20; + public static final Integer COMMON_QUEUE_SIZE = 128; /** diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/utils/ReadFileUtils.java b/austin-cron/src/main/java/com/java3y/austin/cron/utils/ReadFileUtils.java index 39d21f8..c1de5aa 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/utils/ReadFileUtils.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/utils/ReadFileUtils.java @@ -1,5 +1,6 @@ package com.java3y.austin.cron.utils; +import cn.hutool.core.collection.CollUtil; import cn.hutool.core.io.FileUtil; import cn.hutool.core.map.MapUtil; import cn.hutool.core.text.csv.*; @@ -107,7 +108,8 @@ public class ReadFileUtils { for (int j = 1; j < headerInfo.size(); j++) { param.put(headerInfo.get(j), row.get(j)); } - result.add(CrowdInfoVo.builder().receiver(row.get(0)).params(param).build()); + + result.add(CrowdInfoVo.builder().receiver(CollUtil.getFirst(row.iterator())).params(param).build()); } } catch (Exception e) { diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/config/HandlerThreadPoolConfig.java b/austin-handler/src/main/java/com/java3y/austin/handler/config/HandlerThreadPoolConfig.java new file mode 100644 index 0000000..62aff41 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/config/HandlerThreadPoolConfig.java @@ -0,0 +1,40 @@ +package com.java3y.austin.handler.config; + +import com.dtp.common.em.QueueTypeEnum; +import com.dtp.common.em.RejectedTypeEnum; +import com.dtp.core.thread.DtpExecutor; +import com.dtp.core.thread.ThreadPoolBuilder; +import com.java3y.austin.common.constant.ThreadPoolConstant; + +import java.util.concurrent.TimeUnit; + +/** + * handler模块 线程池的配置 + * + * @author 3y + */ +public class HandlerThreadPoolConfig { + + private static final String PRE_FIX = "austin."; + /** + * 业务:处理某个渠道的某种类型消息的线程池 + * 配置:不丢弃消息,核心线程数不会随着keepAliveTime而减少(不会被回收) + * 动态线程池且被Spring管理:true + * + * @return + */ + public static DtpExecutor getExecutor(String groupId) { + return ThreadPoolBuilder.newBuilder() + .threadPoolName(PRE_FIX + groupId) + .corePoolSize(ThreadPoolConstant.COMMON_CORE_POOL_SIZE) + .maximumPoolSize(ThreadPoolConstant.COMMON_MAX_POOL_SIZE) + .keepAliveTime(ThreadPoolConstant.COMMON_KEEP_LIVE_TIME) + .timeUnit(TimeUnit.SECONDS) + .rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName()) + .allowCoreThreadTimeOut(false) + .workQueue(QueueTypeEnum.VARIABLE_LINKED_BLOCKING_QUEUE.getName(), ThreadPoolConstant.COMMON_QUEUE_SIZE, false) + .buildDynamic(); + } + + +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/EmailHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EmailHandler.java similarity index 93% rename from austin-handler/src/main/java/com/java3y/austin/handler/handler/EmailHandler.java rename to austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EmailHandler.java index b0c2886..993cf89 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/EmailHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EmailHandler.java @@ -1,4 +1,4 @@ -package com.java3y.austin.handler.handler; +package com.java3y.austin.handler.handler.impl; import cn.hutool.extra.mail.MailAccount; @@ -7,6 +7,8 @@ import com.google.common.base.Throwables; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.dto.EmailContentModel; import com.java3y.austin.common.enums.ChannelType; +import com.java3y.austin.handler.handler.BaseHandler; +import com.java3y.austin.handler.handler.Handler; import com.java3y.austin.support.utils.AccountUtils; import com.sun.mail.util.MailSSLSocketFactory; import lombok.extern.slf4j.Slf4j; diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/OfficialAccountHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/OfficialAccountHandler.java similarity index 95% rename from austin-handler/src/main/java/com/java3y/austin/handler/handler/OfficialAccountHandler.java rename to austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/OfficialAccountHandler.java index 9207852..3554c46 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/OfficialAccountHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/OfficialAccountHandler.java @@ -1,10 +1,12 @@ -package com.java3y.austin.handler.handler; +package com.java3y.austin.handler.handler.impl; import com.alibaba.fastjson.JSON; import com.google.common.base.Throwables; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.dto.OfficialAccountsContentModel; import com.java3y.austin.common.enums.ChannelType; +import com.java3y.austin.handler.handler.BaseHandler; +import com.java3y.austin.handler.handler.Handler; import com.java3y.austin.handler.script.OfficialAccountScript; import lombok.extern.slf4j.Slf4j; import me.chanjar.weixin.mp.bean.template.WxMpTemplateData; diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/SmsHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/SmsHandler.java similarity index 94% rename from austin-handler/src/main/java/com/java3y/austin/handler/handler/SmsHandler.java rename to austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/SmsHandler.java index 10bbb7f..c9ee59e 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/SmsHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/SmsHandler.java @@ -1,4 +1,4 @@ -package com.java3y.austin.handler.handler; +package com.java3y.austin.handler.handler.impl; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; @@ -8,6 +8,8 @@ import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.dto.SmsContentModel; import com.java3y.austin.common.enums.ChannelType; import com.java3y.austin.handler.domain.SmsParam; +import com.java3y.austin.handler.handler.BaseHandler; +import com.java3y.austin.handler.handler.Handler; import com.java3y.austin.handler.script.SmsScript; import com.java3y.austin.support.dao.SmsRecordDao; import com.java3y.austin.support.domain.SmsRecord; diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/pending/TaskPendingHolder.java b/austin-handler/src/main/java/com/java3y/austin/handler/pending/TaskPendingHolder.java index c62ab28..32dbd08 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/pending/TaskPendingHolder.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/pending/TaskPendingHolder.java @@ -1,12 +1,9 @@ package com.java3y.austin.handler.pending; -import com.dtp.common.em.QueueTypeEnum; -import com.dtp.common.em.RejectedTypeEnum; -import com.dtp.core.DtpRegistry; import com.dtp.core.thread.DtpExecutor; -import com.dtp.core.thread.ThreadPoolBuilder; +import com.java3y.austin.handler.config.HandlerThreadPoolConfig; import com.java3y.austin.handler.utils.GroupIdMappingUtils; -import com.java3y.austin.support.config.ThreadPoolExecutorShutdownDefinition; +import com.java3y.austin.support.utils.ThreadPoolUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -24,17 +21,9 @@ import java.util.concurrent.ExecutorService; */ @Component public class TaskPendingHolder { - @Autowired - private ThreadPoolExecutorShutdownDefinition threadPoolExecutorShutdownDefinition; - + private ThreadPoolUtils threadPoolUtils; - /** - * 线程池的参数(初始化参数) - */ - private Integer coreSize = 2; - private Integer maxSize = 2; - private Integer queueSize = 100; private Map taskPendingHolder = new HashMap<>(32); /** @@ -48,22 +37,15 @@ public class TaskPendingHolder { @PostConstruct public void init() { /** - * example ThreadPoolName:austin-im.notice + * example ThreadPoolName:austin.im.notice * * 可以通过apollo配置:dynamic-tp-apollo-dtp.yml 动态修改线程池的信息 */ for (String groupId : groupIds) { - DtpExecutor dtpExecutor = ThreadPoolBuilder.newBuilder() - .threadPoolName("austin-" + groupId) - .corePoolSize(coreSize) - .maximumPoolSize(maxSize) - .workQueue(QueueTypeEnum.LINKED_BLOCKING_QUEUE.getName(), queueSize, false) - .rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName()) - .buildDynamic(); + DtpExecutor executor = HandlerThreadPoolConfig.getExecutor(groupId); + threadPoolUtils.register(executor); - DtpRegistry.register(dtpExecutor, "beanPostProcessor"); - threadPoolExecutorShutdownDefinition.registryExecutor(dtpExecutor); - taskPendingHolder.put(groupId, dtpExecutor); + taskPendingHolder.put(groupId, executor); } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java index 1eb9b0d..305c0e5 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java @@ -1,5 +1,6 @@ package com.java3y.austin.handler.receiver; +import cn.hutool.core.collection.CollUtil; import com.alibaba.fastjson.JSON; import com.java3y.austin.common.domain.AnchorInfo; import com.java3y.austin.common.domain.LogParam; @@ -47,7 +48,8 @@ public class Receiver { if (kafkaMessage.isPresent()) { List taskInfoLists = JSON.parseArray(kafkaMessage.get(), TaskInfo.class); - String messageGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(taskInfoLists.get(0)); + + String messageGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator())); /** * 每个消费者组 只消费 他们自身关心的消息 diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/script/TencentSmsScript.java b/austin-handler/src/main/java/com/java3y/austin/handler/script/impl/TencentSmsScript.java similarity index 98% rename from austin-handler/src/main/java/com/java3y/austin/handler/script/TencentSmsScript.java rename to austin-handler/src/main/java/com/java3y/austin/handler/script/impl/TencentSmsScript.java index 6d16c2d..acbcb54 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/script/TencentSmsScript.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/script/impl/TencentSmsScript.java @@ -1,4 +1,4 @@ -package com.java3y.austin.handler.script; +package com.java3y.austin.handler.script.impl; import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DateUtil; @@ -7,6 +7,7 @@ import cn.hutool.core.util.IdUtil; import com.java3y.austin.common.enums.SmsStatus; import com.java3y.austin.handler.domain.SmsParam; import com.java3y.austin.handler.domain.TencentSmsParam; +import com.java3y.austin.handler.script.SmsScript; import com.java3y.austin.support.domain.SmsRecord; import com.java3y.austin.support.utils.AccountUtils; import com.tencentcloudapi.common.Credential; diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/script/WxMpTemplateScript.java b/austin-handler/src/main/java/com/java3y/austin/handler/script/impl/WxMpTemplateScript.java similarity index 93% rename from austin-handler/src/main/java/com/java3y/austin/handler/script/WxMpTemplateScript.java rename to austin-handler/src/main/java/com/java3y/austin/handler/script/impl/WxMpTemplateScript.java index 2790bd1..5a86122 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/script/WxMpTemplateScript.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/script/impl/WxMpTemplateScript.java @@ -1,5 +1,6 @@ -package com.java3y.austin.handler.script; +package com.java3y.austin.handler.script.impl; +import com.java3y.austin.handler.script.OfficialAccountScript; import lombok.extern.slf4j.Slf4j; import me.chanjar.weixin.mp.api.WxMpService; import me.chanjar.weixin.mp.api.impl.WxMpServiceImpl; diff --git a/austin-web/src/main/resources/dynamic-tp-apollo-dtp.yml b/austin-web/src/main/resources/dynamic-tp-apollo-dtp.yml index d48e744..87f8c91 100644 --- a/austin-web/src/main/resources/dynamic-tp-apollo-dtp.yml +++ b/austin-web/src/main/resources/dynamic-tp-apollo-dtp.yml @@ -19,7 +19,7 @@ spring: secret: SECb544445a6a34f0315d08b17de41 receivers: 18888888888 executors: - - threadPoolName: austin-im.notice + - threadPoolName: austin.im.notice corePoolSize: 6 maximumPoolSize: 8 queueCapacity: 200