From aed2c736ba07d23bbfa507fe7f2b1db7627152dd Mon Sep 17 00:00:00 2001 From: mingri31164 <3116430062@qq.com> Date: Tue, 30 Sep 2025 23:15:18 +0800 Subject: [PATCH] Remove hardcoded queues --- .../core/ServerThreadPoolDynamicRefresh.java | 75 +++++-- .../starter/core/ThreadPoolRebuilder.java | 190 ++++++++++++++++++ 2 files changed, 250 insertions(+), 15 deletions(-) create mode 100644 starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolRebuilder.java diff --git a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/core/ServerThreadPoolDynamicRefresh.java b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/core/ServerThreadPoolDynamicRefresh.java index f4def9f4..9f923264 100644 --- a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/core/ServerThreadPoolDynamicRefresh.java +++ b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/core/ServerThreadPoolDynamicRefresh.java @@ -21,6 +21,7 @@ import cn.hippo4j.common.api.ThreadPoolConfigChange; import cn.hippo4j.common.executor.ThreadPoolExecutorRegistry; import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum; import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum; +import cn.hippo4j.common.executor.support.BlockingQueueManager; import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue; import cn.hippo4j.common.extension.enums.EnableEnum; import cn.hippo4j.common.model.ThreadPoolParameter; @@ -109,25 +110,27 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh } private void changePoolInfo(ThreadPoolExecutor executor, ThreadPoolParameter parameter) { - if (parameter.getCoreSize() != null && parameter.getMaxSize() != null) { - ThreadPoolExecutorUtil.safeSetPoolSize(executor, parameter.getCoreSize(), parameter.getMaxSize()); + Integer desiredCore = null; + Integer desiredMax = null; + if (parameter instanceof ThreadPoolParameterInfo) { + ThreadPoolParameterInfo info = (ThreadPoolParameterInfo) parameter; + desiredCore = info.corePoolSizeAdapt(); + desiredMax = info.maximumPoolSizeAdapt(); } else { - if (parameter.getMaxSize() != null) { - executor.setMaximumPoolSize(parameter.getMaxSize()); - } - if (parameter.getCoreSize() != null) { - executor.setCorePoolSize(parameter.getCoreSize()); - } + desiredCore = parameter.getCoreSize(); + desiredMax = parameter.getMaxSize(); } - if (parameter.getCapacity() != null - && Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getType(), parameter.getQueueType())) { - if (executor.getQueue() instanceof ResizableCapacityLinkedBlockingQueue) { - ResizableCapacityLinkedBlockingQueue queue = (ResizableCapacityLinkedBlockingQueue) executor.getQueue(); - queue.setCapacity(parameter.getCapacity()); - } else { - log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type: {}", executor.getQueue().getClass().getSimpleName()); + if (desiredCore != null && desiredMax != null) { + ThreadPoolExecutorUtil.safeSetPoolSize(executor, desiredCore, desiredMax); + } else { + if (desiredMax != null) { + executor.setMaximumPoolSize(desiredMax); + } + if (desiredCore != null) { + executor.setCorePoolSize(desiredCore); } } + handleQueueChanges(executor, parameter); if (parameter.getKeepAliveTime() != null) { executor.setKeepAliveTime(parameter.getKeepAliveTime(), TimeUnit.SECONDS); } @@ -143,4 +146,46 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh executor.allowCoreThreadTimeOut(EnableEnum.getBool(parameter.getAllowCoreThreadTimeOut())); } } + + /** + * Handle queue changes using the new SPI mechanism + * + * @param executor thread pool executor + * @param parameter thread pool parameter + */ + private void handleQueueChanges(ThreadPoolExecutor executor, ThreadPoolParameter parameter) { + if (parameter.getQueueType() == null && parameter.getCapacity() == null) { + return; + } + if (parameter.getQueueType() != null && !BlockingQueueManager.validateQueueConfig(parameter.getQueueType(), parameter.getCapacity())) { + log.warn("Invalid queue configuration - Type: {}, Capacity: {}", parameter.getQueueType(), parameter.getCapacity()); + return; + } + boolean queueTypeChanged = parameter.getQueueType() != null && + !Objects.equals(BlockingQueueManager.getQueueType(executor.getQueue()), parameter.getQueueType()); + + boolean capacityChanged = parameter.getCapacity() != null && + BlockingQueueManager.canChangeCapacity(executor.getQueue()); + if (queueTypeChanged) { + boolean ok = ThreadPoolRebuilder.rebuildAndSwitch(executor, parameter.getQueueType(), parameter.getCapacity(), parameter.getTpId()); + if (ok) { + log.info("Queue type rebuilt and switched to: {}", BlockingQueueTypeEnum.getBlockingQueueNameByType(parameter.getQueueType())); + } else { + log.warn("Queue type rebuild skipped or failed. Current: {}, Requested: {}", + BlockingQueueManager.getQueueName(executor.getQueue()), + BlockingQueueTypeEnum.getBlockingQueueNameByType(parameter.getQueueType())); + } + } else if (capacityChanged) { + // Only change capacity if queue type is the same or not specified + boolean success = BlockingQueueManager.changeQueueCapacity(executor.getQueue(), parameter.getCapacity()); + if (success) { + log.info("Queue capacity changed to: {}", parameter.getCapacity()); + } else { + log.warn("Failed to change queue capacity to: {}", parameter.getCapacity()); + } + } else if (parameter.getCapacity() != null) { + log.warn("Queue capacity cannot be changed for current queue type: {}", + BlockingQueueManager.getQueueName(executor.getQueue())); + } + } } diff --git a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolRebuilder.java b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolRebuilder.java new file mode 100644 index 00000000..a599e41d --- /dev/null +++ b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolRebuilder.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.springboot.starter.core; + +import cn.hippo4j.common.executor.ThreadPoolExecutorHolder; +import cn.hippo4j.common.executor.ThreadPoolExecutorRegistry; +import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum; +import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Thread-pool rebuild and switch helper. + */ +@Slf4j +public final class ThreadPoolRebuilder { + + private static final Map REBUILD_LOCKS = new ConcurrentHashMap<>(); + + private ThreadPoolRebuilder() { + } + + public static boolean rebuildAndSwitch(ThreadPoolExecutor oldExecutor, + Integer newQueueType, + Integer capacity, + String threadPoolId) { + if (oldExecutor == null || newQueueType == null || threadPoolId == null) { + return false; + } + ReentrantLock lock = REBUILD_LOCKS.computeIfAbsent(threadPoolId, k -> new ReentrantLock()); + if (!lock.tryLock()) { + log.warn("Thread pool [{}] is already being rebuilt, skipping concurrent rebuild request.", threadPoolId); + return false; + } + try { + return doRebuildAndSwitch(oldExecutor, newQueueType, capacity, threadPoolId); + } finally { + lock.unlock(); + } + } + + private static boolean doRebuildAndSwitch(ThreadPoolExecutor oldExecutor, + Integer newQueueType, + Integer capacity, + String threadPoolId) { + BlockingQueue newQueue = BlockingQueueTypeEnum.createBlockingQueue(newQueueType, capacity); + if (newQueue == null) { + log.warn("Rebuild skipped. Unable to create queue by type: {}", newQueueType); + return false; + } + int core = oldExecutor.getCorePoolSize(); + int max = oldExecutor.getMaximumPoolSize(); + long keepAlive = oldExecutor.getKeepAliveTime(TimeUnit.SECONDS); + ThreadFactory factory = oldExecutor.getThreadFactory(); + RejectedExecutionHandler rejected = oldExecutor.getRejectedExecutionHandler(); + boolean allowCoreTimeout = oldExecutor.allowsCoreThreadTimeOut(); + ThreadPoolExecutor newExecutor; + if (oldExecutor instanceof DynamicThreadPoolExecutor) { + DynamicThreadPoolExecutor dynOld = (DynamicThreadPoolExecutor) oldExecutor; + Long executeTimeout = dynOld.getExecuteTimeOut(); + newExecutor = new DynamicThreadPoolExecutor( + core, + max, + keepAlive, + TimeUnit.SECONDS, + executeTimeout == null ? 0L : executeTimeout, + false, + 0L, + newQueue, + threadPoolId, + factory, + rejected); + } else { + newExecutor = new ThreadPoolExecutor(core, max, keepAlive, TimeUnit.SECONDS, newQueue, factory, rejected); + } + newExecutor.allowCoreThreadTimeOut(allowCoreTimeout); + try { + newExecutor.prestartAllCoreThreads(); + } catch (Throwable ignore) { + } + ThreadPoolExecutorHolder oldHolder = switchRegistry(threadPoolId, newExecutor); + boolean transferSuccess = false; + try { + transferQueuedTasks(oldExecutor, newExecutor); + transferSuccess = true; + } catch (Throwable ex) { + log.error("Queue transfer failed for thread pool [{}], attempting to rollback.", threadPoolId, ex); + } + if (!transferSuccess) { + if (oldHolder != null) { + ThreadPoolExecutorRegistry.putHolder(oldHolder); + log.info("Rolled back to old thread pool [{}]", threadPoolId); + } + safeShutdownNow(newExecutor); + return false; + } + oldExecutor.shutdown(); + awaitQuietly(oldExecutor, 500, TimeUnit.MILLISECONDS); + if (!oldExecutor.isTerminated()) { + log.warn("Old thread pool [{}] did not terminate within 500ms, {} tasks may still be running.", + threadPoolId, oldExecutor.getActiveCount()); + } + return true; + } + + private static void transferQueuedTasks(ThreadPoolExecutor from, ThreadPoolExecutor to) { + BlockingQueue fromQueue = from.getQueue(); + int transferredCount = 0; + int failedCount = 0; + try { + List batch = new ArrayList<>(); + fromQueue.drainTo(batch); + for (Runnable r : batch) { + try { + to.execute(r); + transferredCount++; + } catch (Throwable ex) { + failedCount++; + log.error("Failed to transfer task during queue switch, may cause task loss.", ex); + } + } + } catch (Throwable ex) { + log.error("Failed to drain tasks from old queue.", ex); + } + Runnable task; + while ((task = fromQueue.poll()) != null) { + try { + to.execute(task); + transferredCount++; + } catch (Throwable ex) { + failedCount++; + log.error("Failed to transfer remaining task during queue switch.", ex); + } + } + if (transferredCount > 0 || failedCount > 0) { + log.info("Task transfer completed: {} tasks transferred, {} tasks failed.", transferredCount, failedCount); + } + if (failedCount > 0) { + throw new RuntimeException("Task transfer failed: " + failedCount + " tasks could not be transferred."); + } + } + + private static void safeShutdownNow(ThreadPoolExecutor executor) { + try { + List dropped = executor.shutdownNow(); + if (!dropped.isEmpty()) { + log.warn("Dropped {} queued tasks during rollback.", dropped.size()); + } + } catch (Throwable ignore) { + } + } + + private static void awaitQuietly(ThreadPoolExecutor executor, long time, TimeUnit unit) { + try { + executor.awaitTermination(time, unit); + } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + } catch (Throwable ignore) { + } + } + + private static ThreadPoolExecutorHolder switchRegistry(String threadPoolId, ThreadPoolExecutor newExecutor) { + Map map = ThreadPoolExecutorRegistry.getHolderMap(); + ThreadPoolExecutorHolder holder = map.get(threadPoolId); + ThreadPoolExecutorHolder newHolder = new ThreadPoolExecutorHolder(threadPoolId, newExecutor, + holder == null ? null : holder.getExecutorProperties()); + ThreadPoolExecutorRegistry.putHolder(newHolder); + return holder; + } +}