Remove ThreadPoolRebuilder runtime queue switching

pull/1612/head
mingri31164 6 months ago
parent 1c01362410
commit 2efaa3a56d

@ -61,38 +61,27 @@ boolean valid = BlockingQueueManager.validateQueueConfig(queueType, capacity);
boolean ok = BlockingQueueManager.changeQueueCapacity(executor.getQueue(), newCapacity);
```
### 3.2 队列类型切换
### 3.2 队列类型何时生效
当需要切换队列类型时,使用 `ThreadPoolRebuilder.rebuildAndSwitch` 方法,该方法会创建新的线程池实例并安全地迁移任务:
**重要说明**队列类型queueType的变更需要客户端应用重启后生效。
```java
boolean ok = ThreadPoolRebuilder.rebuildAndSwitch(
executor, // 当前线程池
newQueueType, // 新队列类型
capacity, // 队列容量
threadPoolId // 线程池ID
);
```
- **配置模板**:在线程池管理页面编辑队列类型,会保存到数据库,但不会推送到运行中的客户端。
- **生效时机**:客户端应用重启时,会从服务端读取最新配置,并使用反射替换线程池的 `workQueue` 字段。
- **运行时调整**:运行时仅支持队列容量的动态调整(仅限 `ResizableCapacityLinkedBlockingQueue`),不支持队列类型切换。
服务端动态刷新处的实现:
```java
// ServerThreadPoolDynamicRefresh#handleQueueChanges
boolean queueTypeChanged = parameter.getQueueType() != null &&
!Objects.equals(BlockingQueueManager.getQueueType(executor.getQueue()), parameter.getQueueType());
if (queueTypeChanged) {
// 使用安全的重建方式切换队列
boolean ok = ThreadPoolRebuilder.rebuildAndSwitch(
executor,
parameter.getQueueType(),
parameter.getCapacity(),
threadPoolId
);
if (ok) {
log.info("Queue type rebuilt and switched to: {}",
BlockingQueueTypeEnum.getBlockingQueueNameByType(parameter.getQueueType()));
// 仅支持容量调整,不支持队列类型切换
if (parameter.getCapacity() != null) {
if (BlockingQueueManager.canChangeCapacity(executor.getQueue())) {
boolean success = BlockingQueueManager.changeQueueCapacity(
executor.getQueue(), parameter.getCapacity());
if (success) {
log.info("Queue capacity changed to: {}", parameter.getCapacity());
}
}
}
```

@ -61,38 +61,27 @@ boolean valid = BlockingQueueManager.validateQueueConfig(queueType, capacity);
boolean ok = BlockingQueueManager.changeQueueCapacity(executor.getQueue(), newCapacity);
```
### 3.2 队列类型切换
### 3.2 队列类型何时生效
当需要切换队列类型时,使用 `ThreadPoolRebuilder.rebuildAndSwitch` 方法,该方法会创建新的线程池实例并安全地迁移任务:
**重要说明**队列类型queueType的变更需要客户端应用重启后生效。
```java
boolean ok = ThreadPoolRebuilder.rebuildAndSwitch(
executor, // 当前线程池
newQueueType, // 新队列类型
capacity, // 队列容量
threadPoolId // 线程池ID
);
```
- **配置模板**:在线程池管理页面编辑队列类型,会保存到数据库,但不会推送到运行中的客户端。
- **生效时机**:客户端应用重启时,会从服务端读取最新配置,并使用反射替换线程池的 `workQueue` 字段。
- **运行时调整**:运行时仅支持队列容量的动态调整(仅限 `ResizableCapacityLinkedBlockingQueue`),不支持队列类型切换。
服务端动态刷新处的实现:
```java
// ServerThreadPoolDynamicRefresh#handleQueueChanges
boolean queueTypeChanged = parameter.getQueueType() != null &&
!Objects.equals(BlockingQueueManager.getQueueType(executor.getQueue()), parameter.getQueueType());
if (queueTypeChanged) {
// 使用安全的重建方式切换队列
boolean ok = ThreadPoolRebuilder.rebuildAndSwitch(
executor,
parameter.getQueueType(),
parameter.getCapacity(),
threadPoolId
);
if (ok) {
log.info("Queue type rebuilt and switched to: {}",
BlockingQueueTypeEnum.getBlockingQueueNameByType(parameter.getQueueType()));
// 仅支持容量调整,不支持队列类型切换
if (parameter.getCapacity() != null) {
if (BlockingQueueManager.canChangeCapacity(executor.getQueue())) {
boolean success = BlockingQueueManager.changeQueueCapacity(
executor.getQueue(), parameter.getCapacity());
if (success) {
log.info("Queue capacity changed to: {}", parameter.getCapacity());
}
}
}
```

@ -26,8 +26,8 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Blocking queue manager for dynamic queue type switching.
* Supports SPI extension and dynamic queue replacement.
* Blocking queue manager for queue operations.
* Supports SPI extension, queue creation, capacity management, and type recognition.
*/
@Slf4j
public class BlockingQueueManager {
@ -66,27 +66,6 @@ public class BlockingQueueManager {
return BlockingQueueTypeEnum.createBlockingQueue(queueName, capacity);
}
/**
* Check if queue type can be dynamically changed
*
* @param currentQueue current queue instance
* @param newQueueType new queue type
* @return true if can be changed
*/
public static boolean canChangeQueueType(BlockingQueue<?> currentQueue, Integer newQueueType) {
if (currentQueue == null || newQueueType == null) {
return true;
}
// Special case: ResizableCapacityLinkedBlockingQueue can only change capacity
if (currentQueue instanceof ResizableCapacityLinkedBlockingQueue) {
return Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getType(), newQueueType);
}
// For other queue types, check if they are the same type
String currentQueueName = currentQueue.getClass().getSimpleName();
String newQueueName = BlockingQueueTypeEnum.getBlockingQueueNameByType(newQueueType);
return !Objects.equals(currentQueueName, newQueueName);
}
/**
* Check if queue capacity can be dynamically changed
*

@ -30,10 +30,10 @@ import java.util.concurrent.LinkedBlockingQueue;
/**
* Queue SPI Integration Test: Verifies the full flow from parameters to queue creation
* This test simulates a real scenario:
* 1. Config center pushes a queue type change (queueType)
* 2. ServerThreadPoolDynamicRefresh.handleQueueChanges() detects the change
* 3. ThreadPoolRebuilder.rebuildAndSwitch() creates a new queue (via SPI)
* 4. BlockingQueueTypeEnum.createBlockingQueue() calls SPI loader
* 1. Config center pushes queue configuration (queueType, capacity)
* 2. Thread pool is created with the specified queue type
* 3. BlockingQueueTypeEnum.createBlockingQueue() calls SPI loader
* 4. Custom queues can be used via SPI mechanism
*/
public class QueueSpiIntegrationTest {
@ -149,8 +149,8 @@ public class QueueSpiIntegrationTest {
Assert.assertEquals("New queue capacity should be 512", 512, newQueue.remainingCapacity());
System.out.println("Step 4: Verified new queue - Type: ArrayBlockingQueue, Capacity: 512");
System.out.println("Complete queue switch flow verified");
System.out.println("Proves: ServerThreadPoolDynamicRefresh → ThreadPoolRebuilder → BlockingQueueTypeEnum → SPI");
System.out.println("Complete queue creation flow verified");
System.out.println("Proves: Config → BlockingQueueTypeEnum → SPI → Custom Queue");
}
/**

@ -1,184 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.common.executor.support;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* ThreadPoolRebuilder Concurrency Test
* Verifies concurrency control and task migration safety during thread pool rebuilding
*/
public class ThreadPoolRebuilderConcurrencyTest {
/**
* Scenario 1: Concurrent rebuild requests should be serialized
* Verification: when multiple threads attempt to rebuild the same thread pool at the same time,
* only one succeeds while others are blocked
*/
@Test
public void testConcurrentRebuildSerialize() throws InterruptedException {
System.out.println("\n========== Scenario 1: Concurrent rebuild serialization ==========");
// Note: This test requires the actual ThreadPoolRebuilder class, here it is only a design validation
// Actual testing should be executed in the starters/threadpool/server module
System.out.println("Design validation: ReentrantLock ensures rebuild operations for the same threadPoolId are serialized");
System.out.println(" - tryLock() returns immediately to avoid blocking");
System.out.println(" - Subsequent requests receive warning logs and return false");
}
/**
* Scenario 2: Task migration tracking and error handling
* Verification: migration process records success/failure counts,
* failures throw exceptions that trigger rollback
*/
@Test
public void testTaskTransferTracking() {
System.out.println("\n========== Scenario 2: Task migration tracking ==========");
ThreadPoolExecutor fromPool = new ThreadPoolExecutor(
2, 4, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100));
ThreadPoolExecutor toPool = new ThreadPoolExecutor(
2, 4, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100));
// Submit 10 tasks to source pool
for (int i = 0; i < 10; i++) {
fromPool.execute(() -> {
try {
Thread.sleep(5000); // Simulate long running task
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// Verify tasks in queue
int queuedTasks = fromPool.getQueue().size();
System.out.println("Tasks in source pool queue: " + queuedTasks);
Assert.assertTrue("Queue should contain tasks", queuedTasks > 0);
System.out.println("Design validation: migration process records transferredCount and failedCount");
System.out.println(" - Successful transfers logged with count");
System.out.println(" - Failures throw exceptions and trigger rollback");
fromPool.shutdownNow();
toPool.shutdownNow();
}
/**
* Scenario 3: Rollback old pool state check
* Verification: after rollback, old pool termination status is checked
* and warnings are logged for unfinished tasks
*/
@Test
public void testRollbackAndOldPoolStatus() {
System.out.println("\n========== Scenario 3: Rollback state check ==========");
System.out.println("Design validation: rollback checks old pool state");
System.out.println(" - Call oldExecutor.isTerminated() to verify termination");
System.out.println(" - If not terminated, log warnings with activeCount");
System.out.println(" - Prevent resource leaks from unfinished tasks");
}
/**
* Scenario 4: Atomicity of registry switch
* Verification: switchRegistry operation runs under lock protection
* to prevent concurrent overwrites
*/
@Test
public void testRegistrySwitchAtomicity() {
System.out.println("\n========== Scenario 4: Registry switch atomicity ==========");
System.out.println("Design validation: registry switch runs under ReentrantLock protection");
System.out.println(" - switchRegistry invoked inside doRebuildAndSwitch");
System.out.println(" - Entire doRebuildAndSwitch runs under lock");
System.out.println(" - Prevent inconsistent registry state due to concurrent rebuilds");
}
/**
* Scenario 5: New task submission during migration
* Verification: after registry switch, new tasks are routed to the new pool
*/
@Test
public void testNewTasksDuringMigration() throws InterruptedException {
System.out.println("\n========== Scenario 5: New task routing during migration ==========");
ThreadPoolExecutor oldPool = new ThreadPoolExecutor(
2, 4, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10));
ThreadPoolExecutor newPool = new ThreadPoolExecutor(
2, 4, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10));
CountDownLatch latch = new CountDownLatch(5);
AtomicInteger oldPoolTasks = new AtomicInteger(0);
AtomicInteger newPoolTasks = new AtomicInteger(0);
// Submit tasks to old pool
for (int i = 0; i < 3; i++) {
oldPool.execute(() -> {
oldPoolTasks.incrementAndGet();
latch.countDown();
});
}
// Simulate registry switch: new tasks submitted to new pool
for (int i = 0; i < 2; i++) {
newPool.execute(() -> {
newPoolTasks.incrementAndGet();
latch.countDown();
});
}
latch.await(2, TimeUnit.SECONDS);
System.out.println("Tasks executed by old pool: " + oldPoolTasks.get());
System.out.println("Tasks executed by new pool: " + newPoolTasks.get());
Assert.assertEquals("Old pool should execute 3 tasks", 3, oldPoolTasks.get());
Assert.assertEquals("New pool should execute 2 tasks", 2, newPoolTasks.get());
System.out.println("Test passed: after registry switch, new tasks are routed to the new pool");
oldPool.shutdownNow();
newPool.shutdownNow();
}
/**
* Scenario 6: Rollback protection on migration failure
* Verification: when migration fails, registry is rolled back and new pool shut down
*/
@Test
public void testRollbackOnTransferFailure() {
System.out.println("\n========== Scenario 6: Migration failure rollback ==========");
System.out.println("Design validation: rollback logic on migration failure");
System.out.println(" - transferSuccess flag controls rollback");
System.out.println(" - On failure: ThreadPoolExecutorRegistry.putHolder(oldHolder)");
System.out.println(" - On failure: safeShutdownNow(newExecutor)");
System.out.println(" - Return false to notify caller of rebuild failure");
System.out.println(" - Old pool continues serving without business interruption");
}
}

@ -147,43 +147,26 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh
}
/**
* Handle queue changes using the new SPI mechanism
* Handle queue capacity changes
*
* @param executor thread pool executor
* @param parameter thread pool parameter
*/
private void handleQueueChanges(ThreadPoolExecutor executor, ThreadPoolParameter parameter) {
if (parameter.getQueueType() == null && parameter.getCapacity() == null) {
if (parameter.getCapacity() == null) {
return;
}
if (parameter.getQueueType() != null && !BlockingQueueManager.validateQueueConfig(parameter.getQueueType(), parameter.getCapacity())) {
log.warn("Invalid queue configuration - Type: {}, Capacity: {}", parameter.getQueueType(), parameter.getCapacity());
return;
}
boolean queueTypeChanged = parameter.getQueueType() != null &&
!Objects.equals(BlockingQueueManager.getQueueType(executor.getQueue()), parameter.getQueueType());
boolean capacityChanged = parameter.getCapacity() != null &&
BlockingQueueManager.canChangeCapacity(executor.getQueue());
if (queueTypeChanged) {
boolean ok = ThreadPoolRebuilder.rebuildAndSwitch(executor, parameter.getQueueType(), parameter.getCapacity(), parameter.getTpId());
if (ok) {
log.info("Queue type rebuilt and switched to: {}", BlockingQueueTypeEnum.getBlockingQueueNameByType(parameter.getQueueType()));
} else {
log.warn("Queue type rebuild skipped or failed. Current: {}, Requested: {}",
BlockingQueueManager.getQueueName(executor.getQueue()),
BlockingQueueTypeEnum.getBlockingQueueNameByType(parameter.getQueueType()));
}
} else if (capacityChanged) {
// Only change capacity if queue type is the same or not specified
// Only support capacity adjustment for queues that support it
if (BlockingQueueManager.canChangeCapacity(executor.getQueue())) {
boolean success = BlockingQueueManager.changeQueueCapacity(executor.getQueue(), parameter.getCapacity());
if (success) {
log.info("Queue capacity changed to: {}", parameter.getCapacity());
} else {
log.warn("Failed to change queue capacity to: {}", parameter.getCapacity());
}
} else if (parameter.getCapacity() != null) {
log.warn("Queue capacity cannot be changed for current queue type: {}",
} else {
log.warn("Queue capacity cannot be changed for current queue type: {}. " +
"Only ResizableCapacityLinkedBlockingQueue supports dynamic capacity changes.",
BlockingQueueManager.getQueueName(executor.getQueue()));
}
}

@ -1,204 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.common.executor.ThreadPoolExecutorHolder;
import cn.hippo4j.common.executor.ThreadPoolExecutorRegistry;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
/**
* Thread-pool rebuild and switch helper.
*/
@Slf4j
public final class ThreadPoolRebuilder {
private static final Map<String, ReentrantLock> REBUILD_LOCKS = new ConcurrentHashMap<>();
private ThreadPoolRebuilder() {
}
public static boolean rebuildAndSwitch(ThreadPoolExecutor oldExecutor,
Integer newQueueType,
Integer capacity,
String threadPoolId) {
if (oldExecutor == null || newQueueType == null || threadPoolId == null) {
return false;
}
ReentrantLock lock = REBUILD_LOCKS.computeIfAbsent(threadPoolId, k -> new ReentrantLock());
if (!lock.tryLock()) {
log.warn("Thread pool [{}] is already being rebuilt, skipping concurrent rebuild request.", threadPoolId);
return false;
}
try {
return doRebuildAndSwitch(oldExecutor, newQueueType, capacity, threadPoolId);
} finally {
try {
lock.unlock();
} finally {
REBUILD_LOCKS.remove(threadPoolId, lock);
}
}
}
private static boolean doRebuildAndSwitch(ThreadPoolExecutor oldExecutor,
Integer newQueueType,
Integer capacity,
String threadPoolId) {
BlockingQueue<Runnable> newQueue = BlockingQueueTypeEnum.createBlockingQueue(newQueueType, capacity);
if (newQueue == null) {
log.warn("Rebuild skipped. Unable to create queue by type: {}", newQueueType);
return false;
}
int core = oldExecutor.getCorePoolSize();
int max = oldExecutor.getMaximumPoolSize();
long keepAlive = oldExecutor.getKeepAliveTime(TimeUnit.SECONDS);
ThreadFactory factory = oldExecutor.getThreadFactory();
RejectedExecutionHandler rejected = oldExecutor.getRejectedExecutionHandler();
boolean allowCoreTimeout = oldExecutor.allowsCoreThreadTimeOut();
ThreadPoolExecutor newExecutor;
if (oldExecutor instanceof DynamicThreadPoolExecutor) {
DynamicThreadPoolExecutor dynOld = (DynamicThreadPoolExecutor) oldExecutor;
Long executeTimeout = dynOld.getExecuteTimeOut();
newExecutor = new DynamicThreadPoolExecutor(
core,
max,
keepAlive,
TimeUnit.SECONDS,
executeTimeout == null ? 0L : executeTimeout,
false,
0L,
newQueue,
threadPoolId,
factory,
rejected);
} else {
newExecutor = new ThreadPoolExecutor(core, max, keepAlive, TimeUnit.SECONDS, newQueue, factory, rejected);
}
newExecutor.allowCoreThreadTimeOut(allowCoreTimeout);
try {
newExecutor.prestartAllCoreThreads();
} catch (Throwable ignore) {
}
boolean transferSuccess = false;
try {
transferQueuedTasks(oldExecutor, newExecutor);
transferSuccess = true;
} catch (Throwable ex) {
log.error("Queue transfer failed for thread pool [{}], keeping old executor unchanged.", threadPoolId, ex);
}
if (!transferSuccess) {
safeShutdownNow(newExecutor);
return false;
}
ThreadPoolExecutorHolder oldHolder = switchRegistry(threadPoolId, newExecutor);
oldExecutor.shutdown();
awaitQuietly(oldExecutor, 500, TimeUnit.MILLISECONDS);
if (!oldExecutor.isTerminated()) {
log.warn("Old thread pool [{}] did not terminate within 500ms, {} tasks may still be running.",
threadPoolId, oldExecutor.getActiveCount());
}
return true;
}
private static void transferQueuedTasks(ThreadPoolExecutor from, ThreadPoolExecutor to) {
BlockingQueue<Runnable> fromQueue = from.getQueue();
int transferredCount = 0;
int failedCount = 0;
try {
List<Runnable> batch = new ArrayList<>();
fromQueue.drainTo(batch);
for (Runnable r : batch) {
try {
to.execute(r);
transferredCount++;
} catch (Throwable ex) {
failedCount++;
try {
from.execute(r);
log.warn("Failed to transfer task to new executor, restored to old executor.", ex);
} catch (Throwable reEnqueueEx) {
if (!fromQueue.offer(r)) {
log.error("Failed to restore task to old executor, task may be lost.", reEnqueueEx);
}
}
}
}
} catch (Throwable ex) {
log.error("Failed to drain tasks from old queue.", ex);
}
Runnable task;
while ((task = fromQueue.poll()) != null) {
try {
to.execute(task);
transferredCount++;
} catch (Throwable ex) {
failedCount++;
try {
from.execute(task);
log.warn("Failed to transfer remaining task to new executor, restored to old executor.", ex);
} catch (Throwable reEnqueueEx) {
if (!fromQueue.offer(task)) {
log.error("Failed to restore remaining task to old executor, task may be lost.", reEnqueueEx);
}
}
}
}
if (transferredCount > 0 || failedCount > 0) {
log.info("Task transfer completed: {} tasks transferred, {} tasks failed (restored to old executor).", transferredCount, failedCount);
}
if (failedCount > 0) {
throw new RuntimeException("Task transfer failed: " + failedCount + " tasks could not be transferred.");
}
}
private static void safeShutdownNow(ThreadPoolExecutor executor) {
try {
List<Runnable> dropped = executor.shutdownNow();
if (!dropped.isEmpty()) {
log.warn("Dropped {} queued tasks during rollback.", dropped.size());
}
} catch (Throwable ignore) {
}
}
private static void awaitQuietly(ThreadPoolExecutor executor, long time, TimeUnit unit) {
try {
executor.awaitTermination(time, unit);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
} catch (Throwable ignore) {
}
}
private static ThreadPoolExecutorHolder switchRegistry(String threadPoolId, ThreadPoolExecutor newExecutor) {
Map<String, ThreadPoolExecutorHolder> map = ThreadPoolExecutorRegistry.getHolderMap();
ThreadPoolExecutorHolder holder = map.get(threadPoolId);
ThreadPoolExecutorHolder newHolder = new ThreadPoolExecutorHolder(threadPoolId, newExecutor,
holder == null ? null : holder.getExecutorProperties());
ThreadPoolExecutorRegistry.putHolder(newHolder);
return holder;
}
}
Loading…
Cancel
Save