@ -15,22 +15,22 @@
* limitations under the License .
* /
package cn.hippo4j. message.service ;
package cn.hippo4j. threadpool.alarm.handler ;
import cn.hippo4j.core.api.ThreadPoolCheckAlarm ;
import cn.hippo4j.common.executor.ThreadPoolExecutorHolder ;
import cn.hippo4j.common.executor.ThreadPoolExecutorRegistry ;
import cn.hippo4j.common.toolkit.CalculateUtil ;
import cn.hippo4j.common.toolkit.ReflectUtil ;
import cn.hippo4j.common.toolkit.StringUtil ;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor ;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper ;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage ;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder ;
import cn.hippo4j.core.toolkit.ExecutorTraceContextUtil ;
import cn.hippo4j.core.toolkit.IdentifyUtil ;
import cn.hippo4j.message.enums.NotifyTypeEnum ;
import cn.hippo4j.message.request.AlarmNotifyRequest ;
import cn.hippo4j.threadpool.alarm.api.ThreadPoolCheckAlarm ;
import cn.hippo4j.threadpool.alarm.toolkit.ExecutorTraceContextUtil ;
import cn.hippo4j.threadpool.message.core.request.AlarmNotifyRequest ;
import cn.hippo4j.threadpool.message.api.NotifyTypeEnum ;
import cn.hippo4j.threadpool.message.core.service.GlobalNotifyAlarmManage ;
import cn.hippo4j.threadpool.message.core.service.ThreadPoolNotifyAlarm ;
import cn.hippo4j.threadpool.message.core.service.ThreadPoolSendMessageService ;
import lombok.RequiredArgsConstructor ;
import lombok.extern.slf4j.Slf4j ;
import org.springframework.beans.factory.annotation.Value ;
import java.util.List ;
import java.util.Objects ;
@ -40,8 +40,16 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler ;
import java.util.concurrent.ScheduledExecutorService ;
import java.util.concurrent.ScheduledThreadPoolExecutor ;
import java.util.concurrent.ThreadFactory ;
import java.util.concurrent.ThreadPoolExecutor ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.atomic.AtomicInteger ;
import static cn.hippo4j.common.propertie.EnvironmentProperties.active ;
import static cn.hippo4j.common.propertie.EnvironmentProperties.applicationName ;
import static cn.hippo4j.common.propertie.EnvironmentProperties.checkStateInterval ;
import static cn.hippo4j.common.propertie.EnvironmentProperties.itemId ;
import static cn.hippo4j.common.propertie.IdentifyProperties.IDENTIFY ;
/ * *
* Default thread - pool check alarm handler .
@ -50,46 +58,42 @@ import java.util.concurrent.TimeUnit;
@RequiredArgsConstructor
public class DefaultThreadPoolCheckAlarmHandler implements Runnable , ThreadPoolCheckAlarm {
private final Hippo4jSendMessageService hippo4jSendMessageService ;
@Value ( "${spring.profiles.active:UNKNOWN}" )
private String active ;
@Value ( "${spring.dynamic.thread-pool.item-id:}" )
private String itemId ;
@Value ( "${spring.application.name:UNKNOWN}" )
private String applicationName ;
@Value ( "${spring.dynamic.thread-pool.check-state-interval:5}" )
private Integer checkStateInterval ;
private final ThreadPoolSendMessageService threadPoolSendMessageService ;
private final ScheduledExecutorService alarmNotifyExecutor = new ScheduledThreadPoolExecutor (
1 ,
r - > new Thread ( r , "client.alarm.notify" ) ) ;
private final ExecutorService asyncAlarmNotifyExecutor = ThreadPoolBuilder . builder ( )
. poolThreadSize ( 2 , 4 )
. threadFactory ( "client.execute.timeout.alarm" )
. allowCoreThreadTimeOut ( true )
. keepAliveTime ( 60L , TimeUnit . SECONDS )
. workQueue ( new LinkedBlockingQueue ( 4096 ) )
. rejected ( new ThreadPoolExecutor . AbortPolicy ( ) )
. build ( ) ;
private final ExecutorService asyncAlarmNotifyExecutor = new ThreadPoolExecutor (
2 ,
4 ,
60L ,
TimeUnit . SECONDS ,
new LinkedBlockingQueue < > ( 4096 ) ,
new ThreadFactory ( ) {
private final AtomicInteger count = new AtomicInteger ( ) ;
@Override
public Thread newThread ( Runnable r ) {
return new Thread ( "client.execute.timeout.alarm_" + count . incrementAndGet ( ) ) ;
}
} ,
new ThreadPoolExecutor . AbortPolicy ( ) ) ;
@Override
public void run ( String . . . args ) throws Exception {
public void scheduleExecute( ) {
alarmNotifyExecutor . scheduleWithFixedDelay ( this , 0 , checkStateInterval , TimeUnit . SECONDS ) ;
}
@Override
public void run ( ) {
List < String > listThreadPoolId = GlobalThreadPoolManage. listThreadPool Id( ) ;
List < String > listThreadPoolId = ThreadPoolExecutorRegistry. listThreadPoolExecutor Id( ) ;
listThreadPoolId . forEach ( threadPoolId - > {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage . get ( threadPoolId ) ;
if ( threadPoolNotifyAlarm ! = null & & threadPoolNotifyAlarm . getAlarm ( ) ) {
DynamicThreadPoolWrapper wrapper = GlobalThreadPoolManage . getExecutorService ( threadPoolId ) ;
ThreadPoolExecutor executor = wrapp er. getExecutor ( ) ;
ThreadPoolExecutorHolder executorHolder = ThreadPoolExecutorRegistry . getHolder ( threadPoolId ) ;
ThreadPoolExecutor executor = executorHold er. getExecutor ( ) ;
checkPoolCapacityAlarm ( threadPoolId , executor ) ;
checkPoolActivityAlarm ( threadPoolId , executor ) ;
}
@ -116,7 +120,7 @@ public class DefaultThreadPoolCheckAlarmHandler implements Runnable, ThreadPoolC
if ( isSend ) {
AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyRequest ( threadPoolExecutor ) ;
alarmNotifyRequest . setThreadPoolId ( threadPoolId ) ;
hippo4j SendMessageService. sendAlarmMessage ( NotifyTypeEnum . CAPACITY , alarmNotifyRequest ) ;
threadPool SendMessageService. sendAlarmMessage ( NotifyTypeEnum . CAPACITY , alarmNotifyRequest ) ;
}
}
@ -139,7 +143,7 @@ public class DefaultThreadPoolCheckAlarmHandler implements Runnable, ThreadPoolC
if ( isSend ) {
AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyRequest ( threadPoolExecutor ) ;
alarmNotifyRequest . setThreadPoolId ( threadPoolId ) ;
hippo4j SendMessageService. sendAlarmMessage ( NotifyTypeEnum . ACTIVITY , alarmNotifyRequest ) ;
threadPool SendMessageService. sendAlarmMessage ( NotifyTypeEnum . ACTIVITY , alarmNotifyRequest ) ;
}
}
@ -155,11 +159,11 @@ public class DefaultThreadPoolCheckAlarmHandler implements Runnable, ThreadPoolC
if ( Objects . isNull ( alarmConfig ) | | ! alarmConfig . getAlarm ( ) ) {
return ;
}
ThreadPoolExecutor threadPoolExecutor = GlobalThreadPoolManage. getExecutorService ( threadPoolId ) . getExecutor ( ) ;
if ( threadPoolExecutor instanceof DynamicThreadPoolExecutor ) {
ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutorRegistry. getHolder ( threadPoolId ) . getExecutor ( ) ;
if ( Objects. equals ( threadPoolExecutor . getClass ( ) . getName ( ) , "cn.hippo4j.core.executor.DynamicThreadPoolExecutor" ) ) {
AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyRequest ( threadPoolExecutor ) ;
alarmNotifyRequest . setThreadPoolId ( threadPoolId ) ;
hippo4j SendMessageService. sendAlarmMessage ( NotifyTypeEnum . REJECT , alarmNotifyRequest ) ;
threadPool SendMessageService. sendAlarmMessage ( NotifyTypeEnum . REJECT , alarmNotifyRequest ) ;
}
} ;
asyncAlarmNotifyExecutor . execute ( checkPoolRejectedAlarmTask ) ;
@ -179,21 +183,19 @@ public class DefaultThreadPoolCheckAlarmHandler implements Runnable, ThreadPoolC
if ( Objects . isNull ( alarmConfig ) | | ! alarmConfig . getAlarm ( ) ) {
return ;
}
if ( threadPoolExecutor instanceof DynamicThreadPoolExecutor ) {
try {
AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyRequest ( threadPoolExecutor ) ;
alarmNotifyRequest . setThreadPoolId ( threadPoolId ) ;
alarmNotifyRequest . setExecuteTime ( executeTime ) ;
alarmNotifyRequest . setExecuteTimeOut ( executeTimeOut ) ;
String executeTimeoutTrace = ExecutorTraceContextUtil . getAndRemoveTimeoutTrace ( ) ;
if ( StringUtil . isNotBlank ( executeTimeoutTrace ) ) {
alarmNotifyRequest . setExecuteTimeoutTrace ( executeTimeoutTrace ) ;
}
Runnable task = ( ) - > hippo4jSendMessageService . sendAlarmMessage ( NotifyTypeEnum . TIMEOUT , alarmNotifyRequest ) ;
asyncAlarmNotifyExecutor . execute ( task ) ;
} catch ( Throwable ex ) {
log . error ( "Send thread pool execution timeout alarm error." , ex ) ;
try {
AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyRequest ( threadPoolExecutor ) ;
alarmNotifyRequest . setThreadPoolId ( threadPoolId ) ;
alarmNotifyRequest . setExecuteTime ( executeTime ) ;
alarmNotifyRequest . setExecuteTimeOut ( executeTimeOut ) ;
String executeTimeoutTrace = ExecutorTraceContextUtil . getAndRemoveTimeoutTrace ( ) ;
if ( StringUtil . isNotBlank ( executeTimeoutTrace ) ) {
alarmNotifyRequest . setExecuteTimeoutTrace ( executeTimeoutTrace ) ;
}
Runnable task = ( ) - > threadPoolSendMessageService . sendAlarmMessage ( NotifyTypeEnum . TIMEOUT , alarmNotifyRequest ) ;
asyncAlarmNotifyExecutor . execute ( task ) ;
} catch ( Throwable ex ) {
log . error ( "Send thread pool execution timeout alarm error." , ex ) ;
}
}
@ -206,13 +208,17 @@ public class DefaultThreadPoolCheckAlarmHandler implements Runnable, ThreadPoolC
public AlarmNotifyRequest buildAlarmNotifyRequest ( ThreadPoolExecutor threadPoolExecutor ) {
BlockingQueue < Runnable > blockingQueue = threadPoolExecutor . getQueue ( ) ;
RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor . getRejectedExecutionHandler ( ) ;
long rejectCount = threadPoolExecutor instanceof DynamicThreadPoolExecutor
? ( ( DynamicThreadPoolExecutor ) threadPoolExecutor ) . getRejectCountNum ( )
: - 1L ;
long rejectCount = - 1L ;
if ( Objects . equals ( threadPoolExecutor . getClass ( ) . getName ( ) , "cn.hippo4j.core.executor.DynamicThreadPoolExecutor" ) ) {
Object actualRejectCountNum = ReflectUtil . invoke ( threadPoolExecutor , "getRejectCountNum" ) ;
if ( actualRejectCountNum ! = null ) {
rejectCount = ( long ) actualRejectCountNum ;
}
}
return AlarmNotifyRequest . builder ( )
. appName ( StringUtil . isBlank ( itemId ) ? applicationName : itemId )
. active ( active . toUpperCase ( ) )
. identify ( IdentifyUtil . getIdentify ( ) )
. identify ( I DENTIFY )
. corePoolSize ( threadPoolExecutor . getCorePoolSize ( ) )
. maximumPoolSize ( threadPoolExecutor . getMaximumPoolSize ( ) )
. poolSize ( threadPoolExecutor . getPoolSize ( ) )