Remove hardcoded queues

pull/1609/head
mingri31164 3 months ago
parent be900bca46
commit aed2c736ba

@ -21,6 +21,7 @@ import cn.hippo4j.common.api.ThreadPoolConfigChange;
import cn.hippo4j.common.executor.ThreadPoolExecutorRegistry;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.BlockingQueueManager;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.common.extension.enums.EnableEnum;
import cn.hippo4j.common.model.ThreadPoolParameter;
@ -109,25 +110,27 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh
}
private void changePoolInfo(ThreadPoolExecutor executor, ThreadPoolParameter parameter) {
if (parameter.getCoreSize() != null && parameter.getMaxSize() != null) {
ThreadPoolExecutorUtil.safeSetPoolSize(executor, parameter.getCoreSize(), parameter.getMaxSize());
Integer desiredCore = null;
Integer desiredMax = null;
if (parameter instanceof ThreadPoolParameterInfo) {
ThreadPoolParameterInfo info = (ThreadPoolParameterInfo) parameter;
desiredCore = info.corePoolSizeAdapt();
desiredMax = info.maximumPoolSizeAdapt();
} else {
if (parameter.getMaxSize() != null) {
executor.setMaximumPoolSize(parameter.getMaxSize());
}
if (parameter.getCoreSize() != null) {
executor.setCorePoolSize(parameter.getCoreSize());
}
desiredCore = parameter.getCoreSize();
desiredMax = parameter.getMaxSize();
}
if (parameter.getCapacity() != null
&& Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getType(), parameter.getQueueType())) {
if (executor.getQueue() instanceof ResizableCapacityLinkedBlockingQueue) {
ResizableCapacityLinkedBlockingQueue queue = (ResizableCapacityLinkedBlockingQueue) executor.getQueue();
queue.setCapacity(parameter.getCapacity());
} else {
log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type: {}", executor.getQueue().getClass().getSimpleName());
if (desiredCore != null && desiredMax != null) {
ThreadPoolExecutorUtil.safeSetPoolSize(executor, desiredCore, desiredMax);
} else {
if (desiredMax != null) {
executor.setMaximumPoolSize(desiredMax);
}
if (desiredCore != null) {
executor.setCorePoolSize(desiredCore);
}
}
handleQueueChanges(executor, parameter);
if (parameter.getKeepAliveTime() != null) {
executor.setKeepAliveTime(parameter.getKeepAliveTime(), TimeUnit.SECONDS);
}
@ -143,4 +146,46 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh
executor.allowCoreThreadTimeOut(EnableEnum.getBool(parameter.getAllowCoreThreadTimeOut()));
}
}
/**
* Handle queue changes using the new SPI mechanism
*
* @param executor thread pool executor
* @param parameter thread pool parameter
*/
private void handleQueueChanges(ThreadPoolExecutor executor, ThreadPoolParameter parameter) {
if (parameter.getQueueType() == null && parameter.getCapacity() == null) {
return;
}
if (parameter.getQueueType() != null && !BlockingQueueManager.validateQueueConfig(parameter.getQueueType(), parameter.getCapacity())) {
log.warn("Invalid queue configuration - Type: {}, Capacity: {}", parameter.getQueueType(), parameter.getCapacity());
return;
}
boolean queueTypeChanged = parameter.getQueueType() != null &&
!Objects.equals(BlockingQueueManager.getQueueType(executor.getQueue()), parameter.getQueueType());
boolean capacityChanged = parameter.getCapacity() != null &&
BlockingQueueManager.canChangeCapacity(executor.getQueue());
if (queueTypeChanged) {
boolean ok = ThreadPoolRebuilder.rebuildAndSwitch(executor, parameter.getQueueType(), parameter.getCapacity(), parameter.getTpId());
if (ok) {
log.info("Queue type rebuilt and switched to: {}", BlockingQueueTypeEnum.getBlockingQueueNameByType(parameter.getQueueType()));
} else {
log.warn("Queue type rebuild skipped or failed. Current: {}, Requested: {}",
BlockingQueueManager.getQueueName(executor.getQueue()),
BlockingQueueTypeEnum.getBlockingQueueNameByType(parameter.getQueueType()));
}
} else if (capacityChanged) {
// Only change capacity if queue type is the same or not specified
boolean success = BlockingQueueManager.changeQueueCapacity(executor.getQueue(), parameter.getCapacity());
if (success) {
log.info("Queue capacity changed to: {}", parameter.getCapacity());
} else {
log.warn("Failed to change queue capacity to: {}", parameter.getCapacity());
}
} else if (parameter.getCapacity() != null) {
log.warn("Queue capacity cannot be changed for current queue type: {}",
BlockingQueueManager.getQueueName(executor.getQueue()));
}
}
}

@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.common.executor.ThreadPoolExecutorHolder;
import cn.hippo4j.common.executor.ThreadPoolExecutorRegistry;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
/**
* Thread-pool rebuild and switch helper.
*/
@Slf4j
public final class ThreadPoolRebuilder {
private static final Map<String, ReentrantLock> REBUILD_LOCKS = new ConcurrentHashMap<>();
private ThreadPoolRebuilder() {
}
public static boolean rebuildAndSwitch(ThreadPoolExecutor oldExecutor,
Integer newQueueType,
Integer capacity,
String threadPoolId) {
if (oldExecutor == null || newQueueType == null || threadPoolId == null) {
return false;
}
ReentrantLock lock = REBUILD_LOCKS.computeIfAbsent(threadPoolId, k -> new ReentrantLock());
if (!lock.tryLock()) {
log.warn("Thread pool [{}] is already being rebuilt, skipping concurrent rebuild request.", threadPoolId);
return false;
}
try {
return doRebuildAndSwitch(oldExecutor, newQueueType, capacity, threadPoolId);
} finally {
lock.unlock();
}
}
private static boolean doRebuildAndSwitch(ThreadPoolExecutor oldExecutor,
Integer newQueueType,
Integer capacity,
String threadPoolId) {
BlockingQueue<Runnable> newQueue = BlockingQueueTypeEnum.createBlockingQueue(newQueueType, capacity);
if (newQueue == null) {
log.warn("Rebuild skipped. Unable to create queue by type: {}", newQueueType);
return false;
}
int core = oldExecutor.getCorePoolSize();
int max = oldExecutor.getMaximumPoolSize();
long keepAlive = oldExecutor.getKeepAliveTime(TimeUnit.SECONDS);
ThreadFactory factory = oldExecutor.getThreadFactory();
RejectedExecutionHandler rejected = oldExecutor.getRejectedExecutionHandler();
boolean allowCoreTimeout = oldExecutor.allowsCoreThreadTimeOut();
ThreadPoolExecutor newExecutor;
if (oldExecutor instanceof DynamicThreadPoolExecutor) {
DynamicThreadPoolExecutor dynOld = (DynamicThreadPoolExecutor) oldExecutor;
Long executeTimeout = dynOld.getExecuteTimeOut();
newExecutor = new DynamicThreadPoolExecutor(
core,
max,
keepAlive,
TimeUnit.SECONDS,
executeTimeout == null ? 0L : executeTimeout,
false,
0L,
newQueue,
threadPoolId,
factory,
rejected);
} else {
newExecutor = new ThreadPoolExecutor(core, max, keepAlive, TimeUnit.SECONDS, newQueue, factory, rejected);
}
newExecutor.allowCoreThreadTimeOut(allowCoreTimeout);
try {
newExecutor.prestartAllCoreThreads();
} catch (Throwable ignore) {
}
ThreadPoolExecutorHolder oldHolder = switchRegistry(threadPoolId, newExecutor);
boolean transferSuccess = false;
try {
transferQueuedTasks(oldExecutor, newExecutor);
transferSuccess = true;
} catch (Throwable ex) {
log.error("Queue transfer failed for thread pool [{}], attempting to rollback.", threadPoolId, ex);
}
if (!transferSuccess) {
if (oldHolder != null) {
ThreadPoolExecutorRegistry.putHolder(oldHolder);
log.info("Rolled back to old thread pool [{}]", threadPoolId);
}
safeShutdownNow(newExecutor);
return false;
}
oldExecutor.shutdown();
awaitQuietly(oldExecutor, 500, TimeUnit.MILLISECONDS);
if (!oldExecutor.isTerminated()) {
log.warn("Old thread pool [{}] did not terminate within 500ms, {} tasks may still be running.",
threadPoolId, oldExecutor.getActiveCount());
}
return true;
}
private static void transferQueuedTasks(ThreadPoolExecutor from, ThreadPoolExecutor to) {
BlockingQueue<Runnable> fromQueue = from.getQueue();
int transferredCount = 0;
int failedCount = 0;
try {
List<Runnable> batch = new ArrayList<>();
fromQueue.drainTo(batch);
for (Runnable r : batch) {
try {
to.execute(r);
transferredCount++;
} catch (Throwable ex) {
failedCount++;
log.error("Failed to transfer task during queue switch, may cause task loss.", ex);
}
}
} catch (Throwable ex) {
log.error("Failed to drain tasks from old queue.", ex);
}
Runnable task;
while ((task = fromQueue.poll()) != null) {
try {
to.execute(task);
transferredCount++;
} catch (Throwable ex) {
failedCount++;
log.error("Failed to transfer remaining task during queue switch.", ex);
}
}
if (transferredCount > 0 || failedCount > 0) {
log.info("Task transfer completed: {} tasks transferred, {} tasks failed.", transferredCount, failedCount);
}
if (failedCount > 0) {
throw new RuntimeException("Task transfer failed: " + failedCount + " tasks could not be transferred.");
}
}
private static void safeShutdownNow(ThreadPoolExecutor executor) {
try {
List<Runnable> dropped = executor.shutdownNow();
if (!dropped.isEmpty()) {
log.warn("Dropped {} queued tasks during rollback.", dropped.size());
}
} catch (Throwable ignore) {
}
}
private static void awaitQuietly(ThreadPoolExecutor executor, long time, TimeUnit unit) {
try {
executor.awaitTermination(time, unit);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
} catch (Throwable ignore) {
}
}
private static ThreadPoolExecutorHolder switchRegistry(String threadPoolId, ThreadPoolExecutor newExecutor) {
Map<String, ThreadPoolExecutorHolder> map = ThreadPoolExecutorRegistry.getHolderMap();
ThreadPoolExecutorHolder holder = map.get(threadPoolId);
ThreadPoolExecutorHolder newHolder = new ThreadPoolExecutorHolder(threadPoolId, newExecutor,
holder == null ? null : holder.getExecutorProperties());
ThreadPoolExecutorRegistry.putHolder(newHolder);
return holder;
}
}
Loading…
Cancel
Save