handler 线程池模块代码重构

pull/6/head
3y 3 years ago
parent 92cece87b1
commit 24d1bc621f

@ -25,7 +25,7 @@ public class ThreadPoolConstant {
public static final Integer COMMON_CORE_POOL_SIZE = 2; public static final Integer COMMON_CORE_POOL_SIZE = 2;
public static final Integer COMMON_MAX_POOL_SIZE = 2; public static final Integer COMMON_MAX_POOL_SIZE = 2;
public static final Integer COMMON_KEEP_LIVE_TIME = 60; public static final Integer COMMON_KEEP_LIVE_TIME = 60;
public static final Integer COMMON_QUEUE_SIZE = 20; public static final Integer COMMON_QUEUE_SIZE = 128;
/** /**

@ -1,5 +1,6 @@
package com.java3y.austin.cron.utils; package com.java3y.austin.cron.utils;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.io.FileUtil; import cn.hutool.core.io.FileUtil;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.csv.*; import cn.hutool.core.text.csv.*;
@ -107,7 +108,8 @@ public class ReadFileUtils {
for (int j = 1; j < headerInfo.size(); j++) { for (int j = 1; j < headerInfo.size(); j++) {
param.put(headerInfo.get(j), row.get(j)); param.put(headerInfo.get(j), row.get(j));
} }
result.add(CrowdInfoVo.builder().receiver(row.get(0)).params(param).build());
result.add(CrowdInfoVo.builder().receiver(CollUtil.getFirst(row.iterator())).params(param).build());
} }
} catch (Exception e) { } catch (Exception e) {

@ -0,0 +1,40 @@
package com.java3y.austin.handler.config;
import com.dtp.common.em.QueueTypeEnum;
import com.dtp.common.em.RejectedTypeEnum;
import com.dtp.core.thread.DtpExecutor;
import com.dtp.core.thread.ThreadPoolBuilder;
import com.java3y.austin.common.constant.ThreadPoolConstant;
import java.util.concurrent.TimeUnit;
/**
* handler 线
*
* @author 3y
*/
public class HandlerThreadPoolConfig {
private static final String PRE_FIX = "austin.";
/**
* 线
* 线keepAliveTime()
* 线Springtrue
*
* @return
*/
public static DtpExecutor getExecutor(String groupId) {
return ThreadPoolBuilder.newBuilder()
.threadPoolName(PRE_FIX + groupId)
.corePoolSize(ThreadPoolConstant.COMMON_CORE_POOL_SIZE)
.maximumPoolSize(ThreadPoolConstant.COMMON_MAX_POOL_SIZE)
.keepAliveTime(ThreadPoolConstant.COMMON_KEEP_LIVE_TIME)
.timeUnit(TimeUnit.SECONDS)
.rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName())
.allowCoreThreadTimeOut(false)
.workQueue(QueueTypeEnum.VARIABLE_LINKED_BLOCKING_QUEUE.getName(), ThreadPoolConstant.COMMON_QUEUE_SIZE, false)
.buildDynamic();
}
}

@ -1,4 +1,4 @@
package com.java3y.austin.handler.handler; package com.java3y.austin.handler.handler.impl;
import cn.hutool.extra.mail.MailAccount; import cn.hutool.extra.mail.MailAccount;
@ -7,6 +7,8 @@ import com.google.common.base.Throwables;
import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.dto.EmailContentModel; import com.java3y.austin.common.dto.EmailContentModel;
import com.java3y.austin.common.enums.ChannelType; import com.java3y.austin.common.enums.ChannelType;
import com.java3y.austin.handler.handler.BaseHandler;
import com.java3y.austin.handler.handler.Handler;
import com.java3y.austin.support.utils.AccountUtils; import com.java3y.austin.support.utils.AccountUtils;
import com.sun.mail.util.MailSSLSocketFactory; import com.sun.mail.util.MailSSLSocketFactory;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;

@ -1,10 +1,12 @@
package com.java3y.austin.handler.handler; package com.java3y.austin.handler.handler.impl;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.dto.OfficialAccountsContentModel; import com.java3y.austin.common.dto.OfficialAccountsContentModel;
import com.java3y.austin.common.enums.ChannelType; import com.java3y.austin.common.enums.ChannelType;
import com.java3y.austin.handler.handler.BaseHandler;
import com.java3y.austin.handler.handler.Handler;
import com.java3y.austin.handler.script.OfficialAccountScript; import com.java3y.austin.handler.script.OfficialAccountScript;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.mp.bean.template.WxMpTemplateData; import me.chanjar.weixin.mp.bean.template.WxMpTemplateData;

@ -1,4 +1,4 @@
package com.java3y.austin.handler.handler; package com.java3y.austin.handler.handler.impl;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
@ -8,6 +8,8 @@ import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.dto.SmsContentModel; import com.java3y.austin.common.dto.SmsContentModel;
import com.java3y.austin.common.enums.ChannelType; import com.java3y.austin.common.enums.ChannelType;
import com.java3y.austin.handler.domain.SmsParam; import com.java3y.austin.handler.domain.SmsParam;
import com.java3y.austin.handler.handler.BaseHandler;
import com.java3y.austin.handler.handler.Handler;
import com.java3y.austin.handler.script.SmsScript; import com.java3y.austin.handler.script.SmsScript;
import com.java3y.austin.support.dao.SmsRecordDao; import com.java3y.austin.support.dao.SmsRecordDao;
import com.java3y.austin.support.domain.SmsRecord; import com.java3y.austin.support.domain.SmsRecord;

@ -1,12 +1,9 @@
package com.java3y.austin.handler.pending; package com.java3y.austin.handler.pending;
import com.dtp.common.em.QueueTypeEnum;
import com.dtp.common.em.RejectedTypeEnum;
import com.dtp.core.DtpRegistry;
import com.dtp.core.thread.DtpExecutor; import com.dtp.core.thread.DtpExecutor;
import com.dtp.core.thread.ThreadPoolBuilder; import com.java3y.austin.handler.config.HandlerThreadPoolConfig;
import com.java3y.austin.handler.utils.GroupIdMappingUtils; import com.java3y.austin.handler.utils.GroupIdMappingUtils;
import com.java3y.austin.support.config.ThreadPoolExecutorShutdownDefinition; import com.java3y.austin.support.utils.ThreadPoolUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -24,17 +21,9 @@ import java.util.concurrent.ExecutorService;
*/ */
@Component @Component
public class TaskPendingHolder { public class TaskPendingHolder {
@Autowired @Autowired
private ThreadPoolExecutorShutdownDefinition threadPoolExecutorShutdownDefinition; private ThreadPoolUtils threadPoolUtils;
/**
* 线()
*/
private Integer coreSize = 2;
private Integer maxSize = 2;
private Integer queueSize = 100;
private Map<String, ExecutorService> taskPendingHolder = new HashMap<>(32); private Map<String, ExecutorService> taskPendingHolder = new HashMap<>(32);
/** /**
@ -48,22 +37,15 @@ public class TaskPendingHolder {
@PostConstruct @PostConstruct
public void init() { public void init() {
/** /**
* example ThreadPoolName:austin-im.notice * example ThreadPoolName:austin.im.notice
* *
* apollodynamic-tp-apollo-dtp.yml 线 * apollodynamic-tp-apollo-dtp.yml 线
*/ */
for (String groupId : groupIds) { for (String groupId : groupIds) {
DtpExecutor dtpExecutor = ThreadPoolBuilder.newBuilder() DtpExecutor executor = HandlerThreadPoolConfig.getExecutor(groupId);
.threadPoolName("austin-" + groupId) threadPoolUtils.register(executor);
.corePoolSize(coreSize)
.maximumPoolSize(maxSize)
.workQueue(QueueTypeEnum.LINKED_BLOCKING_QUEUE.getName(), queueSize, false)
.rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName())
.buildDynamic();
DtpRegistry.register(dtpExecutor, "beanPostProcessor"); taskPendingHolder.put(groupId, executor);
threadPoolExecutorShutdownDefinition.registryExecutor(dtpExecutor);
taskPendingHolder.put(groupId, dtpExecutor);
} }
} }

@ -1,5 +1,6 @@
package com.java3y.austin.handler.receiver; package com.java3y.austin.handler.receiver;
import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.java3y.austin.common.domain.AnchorInfo; import com.java3y.austin.common.domain.AnchorInfo;
import com.java3y.austin.common.domain.LogParam; import com.java3y.austin.common.domain.LogParam;
@ -47,7 +48,8 @@ public class Receiver {
if (kafkaMessage.isPresent()) { if (kafkaMessage.isPresent()) {
List<TaskInfo> taskInfoLists = JSON.parseArray(kafkaMessage.get(), TaskInfo.class); List<TaskInfo> taskInfoLists = JSON.parseArray(kafkaMessage.get(), TaskInfo.class);
String messageGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(taskInfoLists.get(0));
String messageGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator()));
/** /**
* *

@ -1,4 +1,4 @@
package com.java3y.austin.handler.script; package com.java3y.austin.handler.script.impl;
import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
@ -7,6 +7,7 @@ import cn.hutool.core.util.IdUtil;
import com.java3y.austin.common.enums.SmsStatus; import com.java3y.austin.common.enums.SmsStatus;
import com.java3y.austin.handler.domain.SmsParam; import com.java3y.austin.handler.domain.SmsParam;
import com.java3y.austin.handler.domain.TencentSmsParam; import com.java3y.austin.handler.domain.TencentSmsParam;
import com.java3y.austin.handler.script.SmsScript;
import com.java3y.austin.support.domain.SmsRecord; import com.java3y.austin.support.domain.SmsRecord;
import com.java3y.austin.support.utils.AccountUtils; import com.java3y.austin.support.utils.AccountUtils;
import com.tencentcloudapi.common.Credential; import com.tencentcloudapi.common.Credential;

@ -1,5 +1,6 @@
package com.java3y.austin.handler.script; package com.java3y.austin.handler.script.impl;
import com.java3y.austin.handler.script.OfficialAccountScript;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.mp.api.WxMpService; import me.chanjar.weixin.mp.api.WxMpService;
import me.chanjar.weixin.mp.api.impl.WxMpServiceImpl; import me.chanjar.weixin.mp.api.impl.WxMpServiceImpl;

@ -19,7 +19,7 @@ spring:
secret: SECb544445a6a34f0315d08b17de41 secret: SECb544445a6a34f0315d08b17de41
receivers: 18888888888 receivers: 18888888888
executors: executors:
- threadPoolName: austin-im.notice - threadPoolName: austin.im.notice
corePoolSize: 6 corePoolSize: 6
maximumPoolSize: 8 maximumPoolSize: 8
queueCapacity: 200 queueCapacity: 200

Loading…
Cancel
Save