diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/config/ThreadPoolConfig.java b/austin-handler/src/main/java/com/java3y/austin/handler/config/ThreadPoolConfig.java deleted file mode 100644 index 25cf7cb..0000000 --- a/austin-handler/src/main/java/com/java3y/austin/handler/config/ThreadPoolConfig.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.java3y.austin.handler.config; - -import cn.hutool.core.thread.ExecutorBuilder; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * 线程池配置 - * - * @author 3y - */ -public class ThreadPoolConfig { - - /** - * @param coreSize - * @param maxSize - * @param queueSize 阻塞队列满了,也不丢弃任务 CallerRunsPolicy 策略 - * @return - */ - public static ExecutorService getThreadPool(Integer coreSize, Integer maxSize, Integer queueSize) { - return ExecutorBuilder.create() - .setCorePoolSize(coreSize) - .setMaxPoolSize(maxSize) - .setKeepAliveTime(60, TimeUnit.SECONDS) - .setWorkQueue(new LinkedBlockingQueue<>(queueSize)) - .setHandler(new ThreadPoolExecutor.CallerRunsPolicy()) - .build(); - } - - -} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/pending/TaskPendingHolder.java b/austin-handler/src/main/java/com/java3y/austin/handler/pending/TaskPendingHolder.java index f1e7a03..c62ab28 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/pending/TaskPendingHolder.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/pending/TaskPendingHolder.java @@ -1,6 +1,7 @@ package com.java3y.austin.handler.pending; import com.dtp.common.em.QueueTypeEnum; +import com.dtp.common.em.RejectedTypeEnum; import com.dtp.core.DtpRegistry; import com.dtp.core.thread.DtpExecutor; import com.dtp.core.thread.ThreadPoolBuilder; @@ -14,11 +15,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; /** * 存储 每种消息类型 与 TaskPending 的关系 + * * @author 3y */ @Component @@ -27,18 +28,12 @@ public class TaskPendingHolder { @Autowired private ThreadPoolExecutorShutdownDefinition threadPoolExecutorShutdownDefinition; - @Autowired - private SpringUtils springUtils; - - - @Autowired - private DtpRegistry dtpRegistry; /** - * 线程池的参数 + * 线程池的参数(初始化参数) */ - private Integer coreSize = 3; - private Integer maxSize = 3; + private Integer coreSize = 2; + private Integer maxSize = 2; private Integer queueSize = 100; private Map taskPendingHolder = new HashMap<>(32); @@ -46,30 +41,35 @@ public class TaskPendingHolder { * 获取得到所有的groupId */ private static List groupIds = GroupIdMappingUtils.getAllGroupIds(); + /** * 给每个渠道,每种消息类型初始化一个线程池 - * - * TODO 不同的 groupId 分配不同的线程和队列大小 - * */ @PostConstruct public void init() { + /** + * example ThreadPoolName:austin-im.notice + * + * 可以通过apollo配置:dynamic-tp-apollo-dtp.yml 动态修改线程池的信息 + */ for (String groupId : groupIds) { DtpExecutor dtpExecutor = ThreadPoolBuilder.newBuilder() .threadPoolName("austin-" + groupId) - .corePoolSize(10) - .maximumPoolSize(15) - .keepAliveTime(15000) - .timeUnit(TimeUnit.MILLISECONDS) - .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false) + .corePoolSize(coreSize) + .maximumPoolSize(maxSize) + .workQueue(QueueTypeEnum.LINKED_BLOCKING_QUEUE.getName(), queueSize, false) + .rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName()) .buildDynamic(); + DtpRegistry.register(dtpExecutor, "beanPostProcessor"); threadPoolExecutorShutdownDefinition.registryExecutor(dtpExecutor); taskPendingHolder.put(groupId, dtpExecutor); } } + /** * 得到对应的线程池 + * * @param groupId * @return */ diff --git a/austin-support/src/main/java/com/java3y/austin/support/config/ThreadPoolExecutorShutdownDefinition.java b/austin-support/src/main/java/com/java3y/austin/support/config/ThreadPoolExecutorShutdownDefinition.java index a6d852e..f3a2743 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/config/ThreadPoolExecutorShutdownDefinition.java +++ b/austin-support/src/main/java/com/java3y/austin/support/config/ThreadPoolExecutorShutdownDefinition.java @@ -3,8 +3,6 @@ package com.java3y.austin.support.config; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextClosedEvent; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; @@ -28,7 +26,7 @@ public class ThreadPoolExecutorShutdownDefinition implements ApplicationListener /** * 线程中的任务在接收到应用关闭信号量后最多等待多久就强制终止,其实就是给剩余任务预留的时间, 到时间后线程池必须销毁 */ - private final long AWAIT_TERMINATION = 60; + private final long AWAIT_TERMINATION = 20; /** * awaitTermination的单位 @@ -36,14 +34,6 @@ public class ThreadPoolExecutorShutdownDefinition implements ApplicationListener private final TimeUnit TIME_UNIT = TimeUnit.SECONDS; - public void registryExecutor(ThreadPoolTaskExecutor threadPoolTaskExecutor) { - POOLS.add(threadPoolTaskExecutor.getThreadPoolExecutor()); - } - - public void registryExecutor(ThreadPoolTaskScheduler threadPoolTaskExecutor) { - POOLS.add(threadPoolTaskExecutor.getScheduledThreadPoolExecutor()); - } - public void registryExecutor(ExecutorService executor) { POOLS.add(executor); } diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index 03764d7..a1a4da6 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -80,7 +80,7 @@ xxl.job.accessToken= ##################### apollo ##################### app.id=austin apollo.bootstrap.enabled=true -apollo.bootstrap.namespaces=boss.austin +apollo.bootstrap.namespaces=boss.austin,dynamic-tp-apollo-dtp.yml ##################### httpUtils properties ##################### ok.http.connect-timeout=30