Refactoring blocking queues and refusing policy-related packet adjustments

1.4.0
chen.ma 2 years ago
parent 2947127f32
commit 7f823ea6d0

@ -34,9 +34,9 @@ public class ErrorLogRejectedExecutionHandler implements CustomRejectedExecution
} }
``` ```
创建 `src/main/resources/META-INF/services` 目录,创建 SPI 自定义拒绝策略文件 `cn.hippo4j.core.spi.CustomRejectedExecutionHandler`。 创建 `src/main/resources/META-INF/services` 目录,创建 SPI 自定义拒绝策略文件 `cn.hippo4j.common.executor.support.CustomRejectedExecutionHandler`。
`cn.hippo4j.core.spi.CustomRejectedExecutionHandler` 文件内仅放一行自定义拒绝策略全限定名即可,示例: `cn.hippo4j.common.executor.support.CustomRejectedExecutionHandler` 文件内仅放一行自定义拒绝策略全限定名即可,示例:
```text ```text
cn.hippo4j.example.core.handler.ErrorLogRejectedExecutionHandler cn.hippo4j.example.core.handler.ErrorLogRejectedExecutionHandler

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.core.spi; package cn.hippo4j.common.executor.support;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.core.spi; package cn.hippo4j.common.executor.support;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;

@ -15,10 +15,9 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.core.executor.support; package cn.hippo4j.common.executor.support;
import cn.hippo4j.core.spi.CustomBlockingQueue; import cn.hippo4j.common.spi.DynamicThreadPoolServiceLoader;
import cn.hippo4j.core.spi.DynamicThreadPoolServiceLoader;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;

@ -15,10 +15,9 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.core.executor.support; package cn.hippo4j.common.executor.support;
import cn.hippo4j.core.spi.CustomRejectedExecutionHandler; import cn.hippo4j.common.spi.DynamicThreadPoolServiceLoader;
import cn.hippo4j.core.spi.DynamicThreadPoolServiceLoader;
import java.util.Collection; import java.util.Collection;
import java.util.Objects; import java.util.Objects;

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.core.executor.support; package cn.hippo4j.common.executor.support;
import cn.hutool.core.util.ReflectUtil; import cn.hutool.core.util.ReflectUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.core.executor.support; package cn.hippo4j.common.executor.support;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.core.executor.support; package cn.hippo4j.common.executor.support;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;

@ -17,6 +17,8 @@
package cn.hippo4j.common.model.register; package cn.hippo4j.common.model.register;
import cn.hippo4j.common.executor.support.QueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedTypeEnum;
import com.fasterxml.jackson.annotation.JsonAlias; import com.fasterxml.jackson.annotation.JsonAlias;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;
@ -56,7 +58,7 @@ public class DynamicThreadPoolRegisterParameter {
/** /**
* Queue type * Queue type
*/ */
private Integer queueType; private QueueTypeEnum queueType;
/** /**
* Capacity * Capacity
@ -71,7 +73,7 @@ public class DynamicThreadPoolRegisterParameter {
/** /**
* Rejected type * Rejected type
*/ */
private Integer rejectedType; private RejectedTypeEnum rejectedType;
/** /**
* Is alarm * Is alarm

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.core.spi; package cn.hippo4j.common.spi;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.core.spi; package cn.hippo4j.common.spi;
/** /**
* Service loader instantiation exception. * Service loader instantiation exception.

@ -181,6 +181,8 @@ public class ConfigServiceImpl implements ConfigService {
configAllInfo.setItemId(registerWrapper.getItemId()); configAllInfo.setItemId(registerWrapper.getItemId());
configAllInfo.setTpId(registerParameter.getThreadPoolId()); configAllInfo.setTpId(registerParameter.getThreadPoolId());
configAllInfo.setLivenessAlarm(registerParameter.getActiveAlarm()); configAllInfo.setLivenessAlarm(registerParameter.getActiveAlarm());
configAllInfo.setQueueType(registerParameter.getQueueType().type);
configAllInfo.setRejectedType(registerParameter.getRejectedType().type);
configAllInfo.setAllowCoreThreadTimeOut(registerParameter.getAllowCoreThreadTimeOut()); configAllInfo.setAllowCoreThreadTimeOut(registerParameter.getAllowCoreThreadTimeOut());
return configAllInfo; return configAllInfo;
} }

@ -17,6 +17,7 @@
package cn.hippo4j.core.executor.support; package cn.hippo4j.core.executor.support;
import cn.hippo4j.common.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;

@ -18,6 +18,7 @@
package cn.hippo4j.core.executor.support; package cn.hippo4j.core.executor.support;
import cn.hippo4j.common.design.builder.Builder; import cn.hippo4j.common.design.builder.Builder;
import cn.hippo4j.common.executor.support.QueueTypeEnum;
import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.common.toolkit.Assert;
import org.springframework.core.task.TaskDecorator; import org.springframework.core.task.TaskDecorator;

@ -18,8 +18,8 @@
package cn.hippo4j.core.executor.support.service; package cn.hippo4j.core.executor.support.service;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.core.executor.support.QueueTypeEnum; import cn.hippo4j.common.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.support.RejectedTypeEnum; import cn.hippo4j.common.executor.support.RejectedTypeEnum;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder; import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -41,11 +41,11 @@ public abstract class AbstractDynamicThreadPoolService implements DynamicThreadP
.threadPoolId(registerParameter.getThreadPoolId()) .threadPoolId(registerParameter.getThreadPoolId())
.corePoolSize(registerParameter.getCorePoolSize()) .corePoolSize(registerParameter.getCorePoolSize())
.maxPoolNum(registerParameter.getMaximumPoolSize()) .maxPoolNum(registerParameter.getMaximumPoolSize())
.workQueue(QueueTypeEnum.createBlockingQueue(registerParameter.getQueueType(), registerParameter.getCapacity())) .workQueue(QueueTypeEnum.createBlockingQueue(registerParameter.getQueueType().type, registerParameter.getCapacity()))
.threadFactory(registerParameter.getThreadNamePrefix()) .threadFactory(registerParameter.getThreadNamePrefix())
.keepAliveTime(registerParameter.getKeepAliveTime(), TimeUnit.SECONDS) .keepAliveTime(registerParameter.getKeepAliveTime(), TimeUnit.SECONDS)
.executeTimeOut(registerParameter.getExecuteTimeOut()) .executeTimeOut(registerParameter.getExecuteTimeOut())
.rejected(RejectedTypeEnum.createPolicy(registerParameter.getRejectedType())) .rejected(RejectedTypeEnum.createPolicy(registerParameter.getRejectedType().type))
.dynamicPool() .dynamicPool()
.build(); .build();
return dynamicThreadPoolExecutor; return dynamicThreadPoolExecutor;

@ -17,7 +17,7 @@
package cn.hippo4j.example.core.handler; package cn.hippo4j.example.core.handler;
import cn.hippo4j.core.spi.CustomRejectedExecutionHandler; import cn.hippo4j.common.executor.support.CustomRejectedExecutionHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;

@ -23,8 +23,8 @@ import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterCoreNoti
import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterServerNotifyParameter; import cn.hippo4j.common.model.register.notify.DynamicThreadPoolRegisterServerNotifyParameter;
import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.QueueTypeEnum; import cn.hippo4j.common.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.support.RejectedTypeEnum; import cn.hippo4j.common.executor.support.RejectedTypeEnum;
import cn.hippo4j.message.enums.NotifyPlatformEnum; import cn.hippo4j.message.enums.NotifyPlatformEnum;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -40,13 +40,13 @@ public class RegisterDynamicThreadPoolTest {
DynamicThreadPoolRegisterParameter parameterInfo = DynamicThreadPoolRegisterParameter.builder() DynamicThreadPoolRegisterParameter parameterInfo = DynamicThreadPoolRegisterParameter.builder()
.corePoolSize(1) .corePoolSize(1)
.maximumPoolSize(2) .maximumPoolSize(2)
.queueType(QueueTypeEnum.LINKED_BLOCKING_QUEUE.type) .queueType(QueueTypeEnum.LINKED_BLOCKING_QUEUE)
.capacity(1024) .capacity(1024)
// TimeUnit.SECONDS // TimeUnit.SECONDS
.keepAliveTime(1024L) .keepAliveTime(1024L)
// TimeUnit.MILLISECONDS // TimeUnit.MILLISECONDS
.executeTimeOut(1024L) .executeTimeOut(1024L)
.rejectedType(RejectedTypeEnum.DISCARD_POLICY.type) .rejectedType(RejectedTypeEnum.DISCARD_POLICY)
.isAlarm(true) .isAlarm(true)
.allowCoreThreadTimeOut(false) .allowCoreThreadTimeOut(false)
.capacityAlarm(90) .capacityAlarm(90)

@ -20,7 +20,7 @@ package cn.hippo4j.core.springboot.starter.monitor;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.StringUtil; import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.executor.support.ThreadFactoryBuilder; import cn.hippo4j.core.executor.support.ThreadFactoryBuilder;
import cn.hippo4j.core.spi.DynamicThreadPoolServiceLoader; import cn.hippo4j.common.spi.DynamicThreadPoolServiceLoader;
import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties; import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties;
import cn.hippo4j.monitor.base.DynamicThreadPoolMonitor; import cn.hippo4j.monitor.base.DynamicThreadPoolMonitor;
import cn.hippo4j.monitor.base.ThreadPoolMonitor; import cn.hippo4j.monitor.base.ThreadPoolMonitor;

@ -23,9 +23,9 @@ import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.executor.support.QueueTypeEnum; import cn.hippo4j.common.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.support.RejectedTypeEnum; import cn.hippo4j.common.executor.support.RejectedTypeEnum;
import cn.hippo4j.core.executor.support.ResizableCapacityLinkedBlockingQueue; import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.core.proxy.RejectedProxyUtil; import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties; import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties;
import cn.hippo4j.core.springboot.starter.config.ExecutorProperties; import cn.hippo4j.core.springboot.starter.config.ExecutorProperties;

@ -24,8 +24,8 @@ import cn.hippo4j.common.toolkit.BooleanUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.QueueTypeEnum; import cn.hippo4j.common.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.support.RejectedTypeEnum; import cn.hippo4j.common.executor.support.RejectedTypeEnum;
import cn.hippo4j.core.executor.support.service.AbstractDynamicThreadPoolService; import cn.hippo4j.core.executor.support.service.AbstractDynamicThreadPoolService;
import cn.hippo4j.core.springboot.starter.config.ExecutorProperties; import cn.hippo4j.core.springboot.starter.config.ExecutorProperties;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
@ -67,9 +67,10 @@ public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolSer
.maximumPoolSize(registerParameter.getMaximumPoolSize()) .maximumPoolSize(registerParameter.getMaximumPoolSize())
.allowCoreThreadTimeOut(BooleanUtil.toBoolean(String.valueOf(registerParameter.getAllowCoreThreadTimeOut()))) .allowCoreThreadTimeOut(BooleanUtil.toBoolean(String.valueOf(registerParameter.getAllowCoreThreadTimeOut())))
.keepAliveTime(registerParameter.getKeepAliveTime()) .keepAliveTime(registerParameter.getKeepAliveTime())
.blockingQueue(QueueTypeEnum.getBlockingQueueNameByType(registerParameter.getQueueType())) .blockingQueue(QueueTypeEnum.getBlockingQueueNameByType(registerParameter.getQueueType().type))
.capacityAlarm(registerParameter.getCapacity())
.threadNamePrefix(registerParameter.getThreadNamePrefix()) .threadNamePrefix(registerParameter.getThreadNamePrefix())
.rejectedHandler(RejectedTypeEnum.getRejectedNameByType(registerParameter.getRejectedType())) .rejectedHandler(RejectedTypeEnum.getRejectedNameByType(registerParameter.getRejectedType().type))
.executeTimeOut(registerParameter.getExecuteTimeOut()) .executeTimeOut(registerParameter.getExecuteTimeOut())
.threadPoolId(registerParameter.getThreadPoolId()) .threadPoolId(registerParameter.getThreadPoolId())
.build(); .build();

@ -18,6 +18,8 @@
package cn.hippo4j.core.springboot.starter.support; package cn.hippo4j.core.springboot.starter.support;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.executor.support.QueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedTypeEnum;
import cn.hippo4j.common.toolkit.StringUtil; import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.executor.DynamicThreadPool; import cn.hippo4j.core.executor.DynamicThreadPool;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;

@ -18,7 +18,7 @@
package cn.hippo4j.springboot.starter.core; package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh; import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.core.executor.support.QueueTypeEnum; import cn.hippo4j.common.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder; import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;

@ -26,9 +26,9 @@ import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.executor.support.QueueTypeEnum; import cn.hippo4j.common.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.support.RejectedTypeEnum; import cn.hippo4j.common.executor.support.RejectedTypeEnum;
import cn.hippo4j.core.executor.support.ResizableCapacityLinkedBlockingQueue; import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.core.proxy.RejectedProxyUtil; import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh; import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;

@ -23,7 +23,7 @@ import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.ThreadFactoryBuilder; import cn.hippo4j.core.executor.support.ThreadFactoryBuilder;
import cn.hippo4j.core.spi.DynamicThreadPoolServiceLoader; import cn.hippo4j.common.spi.DynamicThreadPoolServiceLoader;
import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import cn.hippo4j.springboot.starter.monitor.collect.Collector; import cn.hippo4j.springboot.starter.monitor.collect.Collector;
import cn.hippo4j.springboot.starter.monitor.send.MessageSender; import cn.hippo4j.springboot.starter.monitor.send.MessageSender;

@ -20,6 +20,8 @@ package cn.hippo4j.springboot.starter.support;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.constant.Constants; import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.enums.EnableEnum; import cn.hippo4j.common.enums.EnableEnum;
import cn.hippo4j.common.executor.support.QueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedTypeEnum;
import cn.hippo4j.common.model.ThreadPoolParameterInfo; import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.toolkit.BooleanUtil; import cn.hippo4j.common.toolkit.BooleanUtil;
import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.toolkit.JSONUtil;

Loading…
Cancel
Save