@ -2,9 +2,12 @@ package io.dynamic.threadpool.starter.core;
import com.alibaba.fastjson.JSON ;
import com.alibaba.fastjson.JSON ;
import io.dynamic.threadpool.common.model.PoolParameterInfo ;
import io.dynamic.threadpool.common.model.PoolParameterInfo ;
import io.dynamic.threadpool.starter.handler.ThreadPoolChangeHandler ;
import io.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum ;
import io.dynamic.threadpool.starter.toolkit.thread.RejectedTypeEnum ;
import io.dynamic.threadpool.starter.toolkit.thread.ResizableCapacityLinkedBlockIngQueue ;
import lombok.extern.slf4j.Slf4j ;
import lombok.extern.slf4j.Slf4j ;
import java.util.Objects ;
import java.util.concurrent.ThreadPoolExecutor ;
import java.util.concurrent.ThreadPoolExecutor ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.TimeUnit ;
@ -22,23 +25,60 @@ public class ThreadPoolDynamicRefresh {
String tpId = parameter . getTpId ( ) ;
String tpId = parameter . getTpId ( ) ;
Integer coreSize = parameter . getCoreSize ( ) , maxSize = parameter . getMaxSize ( ) ,
Integer coreSize = parameter . getCoreSize ( ) , maxSize = parameter . getMaxSize ( ) ,
queueType = parameter . getQueueType ( ) , capacity = parameter . getCapacity ( ) ,
queueType = parameter . getQueueType ( ) , capacity = parameter . getCapacity ( ) ,
keepAliveTime = parameter . getKeepAliveTime ( ) ;
keepAliveTime = parameter . getKeepAliveTime ( ) , rejectedType = parameter . getRejectedType ( ) ;
refreshDynamicPool ( tpId , coreSize , maxSize , queueType , capacity , keepAliveTime );
refreshDynamicPool ( tpId , coreSize , maxSize , queueType , capacity , keepAliveTime , rejectedType );
}
}
public static void refreshDynamicPool ( String threadPoolId , Integer coreSize , Integer maxSize , Integer queueType , Integer capacity , Integer keepAliveTime ) {
public static void refreshDynamicPool ( String threadPoolId , Integer coreSize , Integer maxSize , Integer queueType , Integer capacity , Integer keepAliveTime , Integer rejectedType ) {
ThreadPoolExecutor executor = GlobalThreadPoolManage . getExecutorService ( threadPoolId ) . getPool ( ) ;
ThreadPoolExecutor executor = GlobalThreadPoolManage . getExecutorService ( threadPoolId ) . getPool ( ) ;
printLog ( "[🔥] Original thread pool. " ,
printLog ( "[🔥] Original thread pool. " ,
executor . getCorePoolSize ( ) , executor . getMaximumPoolSize ( ) , queueType , executor . getQueue ( ) . remainingCapacity ( ) , executor . getKeepAliveTime ( TimeUnit . MILLISECONDS ) ) ;
executor . getCorePoolSize ( ) ,
executor . getMaximumPoolSize ( ) ,
queueType ,
( executor . getQueue ( ) . remainingCapacity ( ) + executor . getQueue ( ) . size ( ) ) ,
executor . getKeepAliveTime ( TimeUnit . MILLISECONDS ) ,
rejectedType ) ;
ThreadPoolChangeHandler . changePool ( executor , coreSize , maxSize , queueType , capacity , keepAliveTime ) ;
changePoolInfo ( executor , coreSize , maxSize , queueType , capacity , keepAliveTim e, rejectedTyp e) ;
ThreadPoolExecutor afterExecutor = GlobalThreadPoolManage . getExecutorService ( threadPoolId ) . getPool ( ) ;
ThreadPoolExecutor afterExecutor = GlobalThreadPoolManage . getExecutorService ( threadPoolId ) . getPool ( ) ;
printLog ( "[🚀] Changed thread pool. " ,
printLog ( "[🚀] Changed thread pool. " ,
afterExecutor . getCorePoolSize ( ) , afterExecutor . getMaximumPoolSize ( ) , queueType , afterExecutor . getQueue ( ) . remainingCapacity ( ) , afterExecutor . getKeepAliveTime ( TimeUnit . MILLISECONDS ) ) ;
afterExecutor . getCorePoolSize ( ) ,
afterExecutor . getMaximumPoolSize ( ) ,
queueType ,
( afterExecutor . getQueue ( ) . remainingCapacity ( ) + afterExecutor . getQueue ( ) . size ( ) ) ,
afterExecutor . getKeepAliveTime ( TimeUnit . MILLISECONDS ) ,
rejectedType ) ;
}
}
private static void printLog ( String prefixMsg , Integer coreSize , Integer maxSize , Integer queueType , Integer capacity , Long keepAliveTime ) {
private static void printLog ( String prefixMsg , Integer coreSize , Integer maxSize , Integer queueType , Integer capacity , Long keepAliveTime , Integer rejectedType ) {
log . info ( "{} coreSize :: {}, maxSize :: {}, queueType :: {}, capacity :: {}, keepAliveTime :: {}" , prefixMsg , coreSize , maxSize , queueType , capacity , keepAliveTime ) ;
log . info ( "{} coreSize :: {}, maxSize :: {}, queueType :: {}, capacity :: {}, keepAliveTime :: {}, rejectedType:: {}" , prefixMsg , coreSize , maxSize , queueType , capacity , keepAliveTime , rejectedType ) ;
}
public static void changePoolInfo ( ThreadPoolExecutor executor , Integer coreSize , Integer maxSize , Integer queueType , Integer capacity , Integer keepAliveTime , Integer rejectedType ) {
if ( coreSize ! = null ) {
executor . setCorePoolSize ( coreSize ) ;
}
if ( maxSize ! = null ) {
executor . setMaximumPoolSize ( maxSize ) ;
}
if ( capacity ! = null & & Objects . equals ( QueueTypeEnum . RESIZABLE_LINKED_BLOCKING_QUEUE . type , queueType ) ) {
if ( executor . getQueue ( ) instanceof ResizableCapacityLinkedBlockIngQueue ) {
ResizableCapacityLinkedBlockIngQueue queue = ( ResizableCapacityLinkedBlockIngQueue ) executor . getQueue ( ) ;
queue . setCapacity ( capacity ) ;
} else {
log . warn ( "[Pool change] The queue length cannot be modified. Queue type mismatch. Current queue type :: {}" , executor . getQueue ( ) . getClass ( ) . getSimpleName ( ) ) ;
}
}
if ( keepAliveTime ! = null ) {
executor . setKeepAliveTime ( keepAliveTime , TimeUnit . SECONDS ) ;
}
if ( rejectedType ! = null ) {
executor . setRejectedExecutionHandler ( RejectedTypeEnum . createPolicy ( queueType ) ) ;
}
}
}
}
}