@ -2,13 +2,13 @@
sidebar_position: 4
---
# Custom Blocking Queue
# 阻塞队列自定义
Hippo4j extends blocking queues through SPI, allowing users to implement custom blocking queue types in Hippo4j.
Hippo4j 通过 SPI 的方式对拒绝策略进行扩展,可以让用户在 Hippo4j 中完成自定义阻塞队列实现。
## 1. Define Custom Queue Class
## 1. 定义自定义队列类
Implement the interface `cn.hippo4j.common.executor.support.CustomBlockingQueue<T>` :
实现接口 `cn.hippo4j.common.executor.support.CustomBlockingQueue<T>` :
```java
public class MyArrayBlockingQueue implements CustomBlockingQueue< Runnable > {
@ -30,51 +30,51 @@ public class MyArrayBlockingQueue implements CustomBlockingQueue<Runnable> {
}
```
## 2. Declare SPI File
## 2. 声明 SPI 文件
Create a file in the `src/main/resources/META-INF/services/` directory:
在 `src/main/resources/META-INF/services/` 目录下新增文件:
```
cn.hippo4j.common.executor.support.CustomBlockingQueue
```
File content (single line):
文件内容仅一行:
```
com.example.queue.MyArrayBlockingQueue
```
## 3. Server-side Activation
## 3. 服务端生效方式
When the `queueType` and `capacity` delivered by the server match the custom type, the framework will automatically create the queue through SPI.
当服务端下发的 `queueType` 与 `capacity` 命中自定义类型时,框架会通过 SPI 自动创建队列。
### 3.1 Queue Creation and Validation
### 3.1 队列创建与验证
```java
// Create queue
// 创建队列
BlockingQueue< T > q = BlockingQueueManager.createQueue(queueType, capacity);
// Validate queue configuration
// 验证队列配置
boolean valid = BlockingQueueManager.validateQueueConfig(queueType, capacity);
// Dynamic capacity adjustment (only supported by ResizableCapacityLinkedBlockingQueue)
// 动态调整容量(仅 ResizableCapacityLinkedBlockingQueue 支持)
boolean ok = BlockingQueueManager.changeQueueCapacity(executor.getQueue(), newCapacity);
```
### 3.2 Queue Type Switching
### 3.2 队列类型切换
When you need to switch queue types, use the `ThreadPoolRebuilder.rebuildAndSwitch` method, which creates a new thread pool instance and safely migrates tasks:
当需要切换队列类型时,使用 `ThreadPoolRebuilder.rebuildAndSwitch` 方法,该方法会创建新的线程池实例并安全地迁移任务:
```java
boolean ok = ThreadPoolRebuilder.rebuildAndSwitch(
executor, // Current thread pool
newQueueType, // New queue type
capacity, // Queue capacity
threadPoolId // Thread pool ID
executor, // 当前线程池
newQueueType, // 新队列类型
capacity, // 队列容量
threadPoolId // 线程池 ID
);
```
Server-side dynamic refresh implementation:
服务端动态刷新处的实现:
```java
// ServerThreadPoolDynamicRefresh#handleQueueChanges
@ -82,7 +82,7 @@ boolean queueTypeChanged = parameter.getQueueType() != null &&
!Objects.equals(BlockingQueueManager.getQueueType(executor.getQueue()), parameter.getQueueType());
if (queueTypeChanged) {
// Use safe rebuild approach for queue switching
// 使用安全的重建方式切换队列
boolean ok = ThreadPoolRebuilder.rebuildAndSwitch(
executor,
parameter.getQueueType(),
@ -92,7 +92,7 @@ if (queueTypeChanged) {
if (ok) {
log.info("Queue type rebuilt and switched to: {}",
BlockingQueueTypeEnum.getBlockingQueueNameByType(parameter.getQueueType()));
}
}
}
```