@ -21,18 +21,16 @@ import cn.hippo4j.agent.core.registry.AgentThreadPoolExecutorHolder;
import cn.hippo4j.agent.core.registry.AgentThreadPoolInstanceRegistry ;
import cn.hippo4j.agent.core.util.ThreadPoolPropertyKey ;
import cn.hippo4j.agent.plugin.spring.common.conf.SpringBootConfig ;
import cn.hippo4j.common.model.executor.ExecutorProperties ;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum ;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum ;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue ;
import cn.hippo4j.common.model.executor.ExecutorProperties ;
import cn.hippo4j.common.toolkit.CollectionUtil ;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil ;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties ;
import cn.hippo4j.config.springboot.starter.parser.ConfigFileTypeEnum ;
import cn.hippo4j.config.springboot.starter.parser.ConfigParserHandler ;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor ;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest ;
import cn.hippo4j.threadpool.dynamic.api.ThreadPoolDynamicRefresh ;
import cn.hippo4j.threadpool.dynamic.mode.config.parser.ConfigFileTypeEnum ;
import cn.hippo4j.threadpool.dynamic.mode.config.parser.ConfigParserHandler ;
import com.ctrip.framework.apollo.ConfigChangeListener ;
import com.ctrip.framework.apollo.ConfigFile ;
import com.ctrip.framework.apollo.ConfigService ;
@ -43,19 +41,12 @@ import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext ;
import org.springframework.context.ConfigurableApplicationContext ;
import java.util.HashMap ;
import java.util.List ;
import java.util.Map ;
import java.util.Objects ;
import java.util.Optional ;
import java.util.Properties ;
import java.util.* ;
import java.util.concurrent.RejectedExecutionHandler ;
import java.util.concurrent.ThreadPoolExecutor ;
import java.util.concurrent.TimeUnit ;
import static cn.hippo4j.agent.core.conf.Constants.SPRING_BOOT_CONFIG_PREFIX ;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER ;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT ;
/ * *
* Abstract dynamic thread poo change handler spring
@ -93,8 +84,6 @@ public abstract class AbstractDynamicThreadPoolChangeHandlerSpring implements Th
LOGGER . info ( "[Hippo4j-Agent] Dynamic thread pool refresher, add apollo listener success. namespace: {}" , namespace ) ;
}
protected abstract BootstrapConfigProperties bindProperties ( Map < Object , Object > configInfo , ApplicationContext applicationContext ) ;
private void dynamicRefresh ( String configContent , Map < String , Object > newValueChangeMap , ApplicationContext context ) {
try {
String configFileType = SpringBootConfig . Spring . Dynamic . Thread_Pool . CONFIG_FILE_TYPE ;
@ -104,34 +93,24 @@ public abstract class AbstractDynamicThreadPoolChangeHandlerSpring implements Th
if ( CollectionUtil . isNotEmpty ( newValueChangeMap ) ) {
Optional . ofNullable ( afterConfigMap ) . ifPresent ( each - > each . putAll ( newValueChangeMap ) ) ;
}
BootstrapConfigProperties afterConfigProperties = bindProperties ( afterConfigMap , context ) ;
List < ExecutorProperties > executors = afterConfigProperties . getExecutors ( ) ;
for ( ExecutorProperties afterProperties : executors ) {
String threadPoolId = afterProperties . getThreadPoolId ( ) ;
AgentThreadPoolExecutorHolder holder = AgentThreadPoolInstanceRegistry . getInstance ( ) . getHolder ( threadPoolId ) ;
if ( holder . isEmpty ( ) | | holder . getExecutor ( ) = = null ) {
continue ;
}
ExecutorProperties beforeProperties = convert ( holder . getProperties ( ) ) ;
if ( ! checkConsistency ( threadPoolId , beforeProperties , afterProperties ) ) {
continue ;
}
dynamicRefreshPool ( beforeProperties , afterProperties ) ;
holder . setProperties ( failDefaultExecutorProperties ( beforeProperties , afterProperties ) ) ; // do refresh.
ChangeParameterNotifyRequest changeRequest = buildChangeRequest ( beforeProperties , afterProperties ) ;
LOGGER . info ( CHANGE_THREAD_POOL_TEXT ,
threadPoolId ,
String . format ( CHANGE_DELIMITER , beforeProperties . getCorePoolSize ( ) , changeRequest . getNowCorePoolSize ( ) ) ,
String . format ( CHANGE_DELIMITER , beforeProperties . getMaximumPoolSize ( ) , changeRequest . getNowMaximumPoolSize ( ) ) ,
String . format ( CHANGE_DELIMITER , beforeProperties . getQueueCapacity ( ) , changeRequest . getNowQueueCapacity ( ) ) ,
String . format ( CHANGE_DELIMITER , beforeProperties . getKeepAliveTime ( ) , changeRequest . getNowKeepAliveTime ( ) ) ,
String . format ( CHANGE_DELIMITER , beforeProperties . getExecuteTimeOut ( ) , changeRequest . getNowExecuteTimeOut ( ) ) ,
String . format ( CHANGE_DELIMITER , beforeProperties . getRejectedHandler ( ) , changeRequest . getNowRejectedName ( ) ) ,
String . format ( CHANGE_DELIMITER , beforeProperties . getAllowCoreThreadTimeOut ( ) , changeRequest . getNowAllowsCoreThreadTimeOut ( ) ) ) ;
}
// TODO
/ *
* BootstrapConfigProperties afterConfigProperties = bindProperties ( afterConfigMap , context ) ;
*
* List < ExecutorProperties > executors = afterConfigProperties . getExecutors ( ) ; for ( ExecutorProperties afterProperties : executors ) { String threadPoolId =
* afterProperties . getThreadPoolId ( ) ; AgentThreadPoolExecutorHolder holder = AgentThreadPoolInstanceRegistry . getInstance ( ) . getHolder ( threadPoolId ) ; if ( holder . isEmpty ( ) | |
* holder . getExecutor ( ) = = null ) { continue ; } ExecutorProperties beforeProperties = convert ( holder . getProperties ( ) ) ;
*
* if ( ! checkConsistency ( threadPoolId , beforeProperties , afterProperties ) ) { continue ; }
*
* dynamicRefreshPool ( beforeProperties , afterProperties ) ; holder . setProperties ( failDefaultExecutorProperties ( beforeProperties , afterProperties ) ) ; // do refresh.
* ChangeParameterNotifyRequest changeRequest = buildChangeRequest ( beforeProperties , afterProperties ) ; LOGGER . info ( CHANGE_THREAD_POOL_TEXT , threadPoolId , String . format ( CHANGE_DELIMITER ,
* beforeProperties . getCorePoolSize ( ) , changeRequest . getNowCorePoolSize ( ) ) , String . format ( CHANGE_DELIMITER , beforeProperties . getMaximumPoolSize ( ) , changeRequest . getNowMaximumPoolSize ( ) ) ,
* String . format ( CHANGE_DELIMITER , beforeProperties . getQueueCapacity ( ) , changeRequest . getNowQueueCapacity ( ) ) , String . format ( CHANGE_DELIMITER , beforeProperties . getKeepAliveTime ( ) ,
* changeRequest . getNowKeepAliveTime ( ) ) , String . format ( CHANGE_DELIMITER , beforeProperties . getExecuteTimeOut ( ) , changeRequest . getNowExecuteTimeOut ( ) ) , String . format ( CHANGE_DELIMITER ,
* beforeProperties . getRejectedHandler ( ) , changeRequest . getNowRejectedName ( ) ) , String . format ( CHANGE_DELIMITER , beforeProperties . getAllowCoreThreadTimeOut ( ) ,
* changeRequest . getNowAllowsCoreThreadTimeOut ( ) ) ) ; }
* /
} catch ( Exception ex ) {
LOGGER . error ( "[Hippo4j-Agent] config mode dynamic refresh failed." , ex ) ;
}
@ -183,7 +162,7 @@ public abstract class AbstractDynamicThreadPoolChangeHandlerSpring implements Th
* Fail default executor properties .
*
* @param beforeProperties old properties
* @param afterProperties new properties
* @param afterProperties new properties
* @return executor properties
* /
private Properties failDefaultExecutorProperties ( ExecutorProperties beforeProperties , ExecutorProperties afterProperties ) {
@ -236,27 +215,20 @@ public abstract class AbstractDynamicThreadPoolChangeHandlerSpring implements Th
* @param afterProperties new properties
* @return instance
* /
private ChangeParameterNotifyRequest buildChangeRequest ( ExecutorProperties beforeProperties , ExecutorProperties afterProperties ) {
ChangeParameterNotifyRequest changeParameterNotifyRequest = ChangeParameterNotifyRequest . builder ( )
. beforeCorePoolSize ( beforeProperties . getCorePoolSize ( ) )
. beforeMaximumPoolSize ( beforeProperties . getMaximumPoolSize ( ) )
. beforeAllowsCoreThreadTimeOut ( beforeProperties . getAllowCoreThreadTimeOut ( ) )
. beforeKeepAliveTime ( beforeProperties . getKeepAliveTime ( ) )
. beforeQueueCapacity ( beforeProperties . getQueueCapacity ( ) )
. beforeRejectedName ( beforeProperties . getRejectedHandler ( ) )
. beforeExecuteTimeOut ( beforeProperties . getExecuteTimeOut ( ) )
. blockingQueueName ( afterProperties . getBlockingQueue ( ) )
. nowCorePoolSize ( Optional . ofNullable ( afterProperties . getCorePoolSize ( ) ) . orElse ( beforeProperties . getCorePoolSize ( ) ) )
. nowMaximumPoolSize ( Optional . ofNullable ( afterProperties . getMaximumPoolSize ( ) ) . orElse ( beforeProperties . getMaximumPoolSize ( ) ) )
. nowAllowsCoreThreadTimeOut ( Optional . ofNullable ( afterProperties . getAllowCoreThreadTimeOut ( ) ) . orElse ( beforeProperties . getAllowCoreThreadTimeOut ( ) ) )
. nowKeepAliveTime ( Optional . ofNullable ( afterProperties . getKeepAliveTime ( ) ) . orElse ( beforeProperties . getKeepAliveTime ( ) ) )
. nowQueueCapacity ( Optional . ofNullable ( afterProperties . getQueueCapacity ( ) ) . orElse ( beforeProperties . getQueueCapacity ( ) ) )
. nowRejectedName ( Optional . ofNullable ( afterProperties . getRejectedHandler ( ) ) . orElse ( beforeProperties . getRejectedHandler ( ) ) )
. nowExecuteTimeOut ( Optional . ofNullable ( afterProperties . getExecuteTimeOut ( ) ) . orElse ( beforeProperties . getExecuteTimeOut ( ) ) )
. build ( ) ;
changeParameterNotifyRequest . setThreadPoolId ( beforeProperties . getThreadPoolId ( ) ) ;
return changeParameterNotifyRequest ;
}
/ *
* private ChangeParameterNotifyRequest buildChangeRequest ( ExecutorProperties beforeProperties , ExecutorProperties afterProperties ) { ChangeParameterNotifyRequest changeParameterNotifyRequest =
* ChangeParameterNotifyRequest . builder ( ) . beforeCorePoolSize ( beforeProperties . getCorePoolSize ( ) ) . beforeMaximumPoolSize ( beforeProperties . getMaximumPoolSize ( ) )
* . beforeAllowsCoreThreadTimeOut ( beforeProperties . getAllowCoreThreadTimeOut ( ) ) . beforeKeepAliveTime ( beforeProperties . getKeepAliveTime ( ) ) . beforeQueueCapacity ( beforeProperties . getQueueCapacity ( ) )
* . beforeRejectedName ( beforeProperties . getRejectedHandler ( ) ) . beforeExecuteTimeOut ( beforeProperties . getExecuteTimeOut ( ) ) . blockingQueueName ( afterProperties . getBlockingQueue ( ) )
* . nowCorePoolSize ( Optional . ofNullable ( afterProperties . getCorePoolSize ( ) ) . orElse ( beforeProperties . getCorePoolSize ( ) ) )
* . nowMaximumPoolSize ( Optional . ofNullable ( afterProperties . getMaximumPoolSize ( ) ) . orElse ( beforeProperties . getMaximumPoolSize ( ) ) )
* . nowAllowsCoreThreadTimeOut ( Optional . ofNullable ( afterProperties . getAllowCoreThreadTimeOut ( ) ) . orElse ( beforeProperties . getAllowCoreThreadTimeOut ( ) ) )
* . nowKeepAliveTime ( Optional . ofNullable ( afterProperties . getKeepAliveTime ( ) ) . orElse ( beforeProperties . getKeepAliveTime ( ) ) )
* . nowQueueCapacity ( Optional . ofNullable ( afterProperties . getQueueCapacity ( ) ) . orElse ( beforeProperties . getQueueCapacity ( ) ) )
* . nowRejectedName ( Optional . ofNullable ( afterProperties . getRejectedHandler ( ) ) . orElse ( beforeProperties . getRejectedHandler ( ) ) )
* . nowExecuteTimeOut ( Optional . ofNullable ( afterProperties . getExecuteTimeOut ( ) ) . orElse ( beforeProperties . getExecuteTimeOut ( ) ) ) . build ( ) ;
* changeParameterNotifyRequest . setThreadPoolId ( beforeProperties . getThreadPoolId ( ) ) ; return changeParameterNotifyRequest ; }
* /
/ * *
* Check consistency .