Supplementary Methods Notes (#891)

* Add method and field annotations

* Supplementary Methods Notes

* Change the thread pool running data assignment
pull/892/head
马称 Ma Chen 2 years ago committed by GitHub
parent 563d509787
commit b6eda7b93e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -17,7 +17,10 @@
package cn.hippo4j.common.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.io.Serializable;
@ -27,6 +30,9 @@ import java.io.Serializable;
*/
@Getter
@Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ThreadPoolRunStateInfo extends ThreadPoolBaseInfo implements Serializable {
/**

@ -17,7 +17,7 @@
package cn.hippo4j.core.executor;
import cn.hippo4j.core.executor.support.CommonDynamicThreadPool;
import cn.hippo4j.core.provider.CommonDynamicThreadPoolProviderFactory;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@ -63,7 +63,7 @@ public class DynamicThreadPoolWrapper implements DisposableBean {
private ThreadPoolExecutor executor;
public DynamicThreadPoolWrapper(String threadPoolId) {
this(threadPoolId, CommonDynamicThreadPool.getInstance(threadPoolId));
this(threadPoolId, CommonDynamicThreadPoolProviderFactory.getInstance(threadPoolId));
}
public DynamicThreadPoolWrapper(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {

@ -35,18 +35,18 @@ import java.util.concurrent.ThreadPoolExecutor;
public abstract class AbstractThreadPoolRuntime {
/**
* Supplement.
* Supplemental thread pool runtime information.
*
* @param threadPoolRunStateInfo
* @return
* @param threadPoolRunStateInfo thread-pool run state info
* @return thread-pool run state info
*/
public abstract ThreadPoolRunStateInfo supplement(ThreadPoolRunStateInfo threadPoolRunStateInfo);
/**
* Get pool run state.
*
* @param threadPoolId
* @return
* @param threadPoolId thread-pool id
* @return thread-pool run state info
*/
public ThreadPoolRunStateInfo getPoolRunState(String threadPoolId) {
DynamicThreadPoolWrapper executorService = GlobalThreadPoolManage.getExecutorService(threadPoolId);
@ -57,56 +57,34 @@ public abstract class AbstractThreadPoolRuntime {
/**
* Get pool run state.
*
* @param threadPoolId
* @param executor
* @return
* @param threadPoolId thread-pool id
* @param executor executor
* @return thread-pool run state info
*/
public ThreadPoolRunStateInfo getPoolRunState(String threadPoolId, Executor executor) {
ThreadPoolRunStateInfo stateInfo = new ThreadPoolRunStateInfo();
ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;
// 核心线程数
int corePoolSize = pool.getCorePoolSize();
// 最大线程数
int maximumPoolSize = pool.getMaximumPoolSize();
// 线程池当前线程数 (有锁)
int poolSize = pool.getPoolSize();
// 活跃线程数 (有锁)
int activeCount = pool.getActiveCount();
// 同时进入池中的最大线程数 (有锁)
int largestPoolSize = pool.getLargestPoolSize();
// 线程池中执行任务总数量 (有锁)
long completedTaskCount = pool.getCompletedTaskCount();
// 当前负载
String currentLoad = CalculateUtil.divide(activeCount, maximumPoolSize) + "";
// 峰值负载
String peakLoad = CalculateUtil.divide(largestPoolSize, maximumPoolSize) + "";
BlockingQueue<Runnable> queue = pool.getQueue();
// 队列元素个数
int queueSize = queue.size();
// 队列类型
String queueType = queue.getClass().getSimpleName();
// 队列剩余容量
int remainingCapacity = queue.remainingCapacity();
// 队列容量
int queueCapacity = queueSize + remainingCapacity;
stateInfo.setCoreSize(corePoolSize);
stateInfo.setTpId(threadPoolId);
stateInfo.setPoolSize(poolSize);
stateInfo.setMaximumSize(maximumPoolSize);
stateInfo.setActiveSize(activeCount);
stateInfo.setCurrentLoad(currentLoad);
stateInfo.setPeakLoad(peakLoad);
stateInfo.setQueueType(queueType);
stateInfo.setQueueSize(queueSize);
stateInfo.setQueueCapacity(queueCapacity);
stateInfo.setQueueRemainingCapacity(remainingCapacity);
stateInfo.setLargestPoolSize(largestPoolSize);
stateInfo.setCompletedTaskCount(completedTaskCount);
long rejectCount =
pool instanceof DynamicThreadPoolExecutor ? ((DynamicThreadPoolExecutor) pool).getRejectCountNum() : -1L;
stateInfo.setRejectCount(rejectCount);
stateInfo.setClientLastRefreshTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
stateInfo.setTimestamp(System.currentTimeMillis());
ThreadPoolExecutor actualExecutor = (ThreadPoolExecutor) executor;
int activeCount = actualExecutor.getActiveCount();
int largestPoolSize = actualExecutor.getLargestPoolSize();
BlockingQueue<Runnable> blockingQueue = actualExecutor.getQueue();
long rejectCount = actualExecutor instanceof DynamicThreadPoolExecutor ? ((DynamicThreadPoolExecutor) actualExecutor).getRejectCountNum() : -1L;
ThreadPoolRunStateInfo stateInfo = ThreadPoolRunStateInfo.builder()
.tpId(threadPoolId)
.activeSize(activeCount)
.poolSize(actualExecutor.getPoolSize())
.completedTaskCount(actualExecutor.getCompletedTaskCount())
.largestPoolSize(largestPoolSize)
.currentLoad(CalculateUtil.divide(activeCount, actualExecutor.getMaximumPoolSize()) + "")
.clientLastRefreshTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")))
.peakLoad(CalculateUtil.divide(largestPoolSize, actualExecutor.getMaximumPoolSize()) + "")
.queueSize(blockingQueue.size())
.queueRemainingCapacity(blockingQueue.remainingCapacity())
.rejectCount(rejectCount)
.timestamp(System.currentTimeMillis())
.build();
stateInfo.setCoreSize(actualExecutor.getCorePoolSize());
stateInfo.setMaximumSize(actualExecutor.getMaximumPoolSize());
stateInfo.setQueueType(blockingQueue.getClass().getSimpleName());
stateInfo.setQueueCapacity(blockingQueue.size() + blockingQueue.remainingCapacity());
return supplement(stateInfo);
}
}

@ -50,9 +50,9 @@ public class ThreadPoolRunStateHandler extends AbstractThreadPoolRuntime {
long used = MemoryUtil.heapMemoryUsed();
long max = MemoryUtil.heapMemoryMax();
String memoryProportion = StringUtil.newBuilder(
"已分配: ",
"Allocation: ",
ByteConvertUtil.getPrintSize(used),
" / 最大可用: ",
" / Maximum available: ",
ByteConvertUtil.getPrintSize(max));
poolRunStateInfo.setCurrentLoad(poolRunStateInfo.getCurrentLoad() + "%");
poolRunStateInfo.setPeakLoad(poolRunStateInfo.getPeakLoad() + "%");

@ -40,10 +40,10 @@ public class ThreadPoolStatusHandler {
private static final AtomicBoolean EXCEPTION_FLAG = new AtomicBoolean(Boolean.TRUE);
/**
* Get thread pool state.
* Get thread-pool state.
*
* @param executor
* @return
* @param executor executor
* @return thread-pool state
*/
public static String getThreadPoolState(ThreadPoolExecutor executor) {
if (EXCEPTION_FLAG.get()) {

@ -34,19 +34,30 @@ import java.util.concurrent.*;
public class AbstractBuildThreadPoolTemplate {
/**
* Thread pool construction initialization parameters.
* Thread-pool construction initialization parameters.
*
* @return
* @return thread-pool init param
*/
protected static ThreadPoolInitParam initParam() {
throw new UnsupportedOperationException();
}
/**
* Build pool.
*
* @return thread-pool executor
*/
public static ThreadPoolExecutor buildPool() {
ThreadPoolInitParam initParam = initParam();
return buildPool(initParam);
}
/**
* Build pool.
*
* @param initParam init param
* @return thread-pool executor
*/
public static ThreadPoolExecutor buildPool(ThreadPoolInitParam initParam) {
Assert.notNull(initParam);
ThreadPoolExecutor executorService;
@ -65,11 +76,22 @@ public class AbstractBuildThreadPoolTemplate {
return executorService;
}
/**
* Build a fast-consuming task thread pool.
*
* @return fast thread-pool executor
*/
public static ThreadPoolExecutor buildFastPool() {
ThreadPoolInitParam initParam = initParam();
return buildFastPool(initParam);
}
/**
* Build a fast-consuming task thread pool.
*
* @param initParam init param
* @return fast thread-pool executor
*/
public static ThreadPoolExecutor buildFastPool(ThreadPoolInitParam initParam) {
TaskQueue<Runnable> taskQueue = new TaskQueue(initParam.getCapacity());
FastThreadPoolExecutor fastThreadPoolExecutor;
@ -89,6 +111,12 @@ public class AbstractBuildThreadPoolTemplate {
return fastThreadPoolExecutor;
}
/**
* Build a dynamic monitor thread-pool.
*
* @param initParam init param
* @return dynamic monitor thread-pool
*/
public static DynamicThreadPoolExecutor buildDynamicPool(ThreadPoolInitParam initParam) {
Assert.notNull(initParam);
DynamicThreadPoolExecutor dynamicThreadPoolExecutor;
@ -113,6 +141,9 @@ public class AbstractBuildThreadPoolTemplate {
return dynamicThreadPoolExecutor;
}
/**
* Thread-pool init param.
*/
@Data
@Accessors(chain = true)
public static class ThreadPoolInitParam {

@ -27,26 +27,30 @@ import java.util.concurrent.Executor;
public interface DynamicThreadPoolAdapter {
/**
* Match.
* Check if the object contains thread pool information.
*
* @param executor
* @return
* @param executor objects where there may be instances
* of dynamic thread pools
* @return matching results
*/
boolean match(Object executor);
/**
* Unwrap.
* Get the dynamic thread pool reference in the object.
*
* @param executor
* @return
* @param executor objects where there may be instances
* of dynamic thread pools
* @return get the real dynamic thread pool instance
*/
DynamicThreadPoolExecutor unwrap(Object executor);
/**
* Replace.
* If the {@link DynamicThreadPoolAdapter#match(Object)} conditions are met,
* the thread pool is replaced with a dynamic thread pool.
*
* @param executor
* @param dynamicThreadPoolExecutor
* @param executor objects where there may be instances
* of dynamic thread pools
* @param dynamicThreadPoolExecutor dynamic thread-pool executor
*/
void replace(Object executor, Executor dynamicThreadPoolExecutor);
}

@ -38,20 +38,22 @@ public class DynamicThreadPoolAdapterChoose {
}
/**
* Match.
* Check if the object contains thread pool information.
*
* @param executor
* @return
* @param executor objects where there may be instances
* of dynamic thread pools
* @return matching results
*/
public static boolean match(Object executor) {
return DYNAMIC_THREAD_POOL_ADAPTERS.stream().anyMatch(each -> each.match(executor));
}
/**
* Unwrap.
* Get the dynamic thread pool reference in the object.
*
* @param executor
* @return
* @param executor objects where there may be instances
* of dynamic thread pools
* @return get the real dynamic thread pool instance
*/
public static DynamicThreadPoolExecutor unwrap(Object executor) {
Optional<DynamicThreadPoolAdapter> dynamicThreadPoolAdapterOptional = DYNAMIC_THREAD_POOL_ADAPTERS.stream().filter(each -> each.match(executor)).findFirst();
@ -59,9 +61,12 @@ public class DynamicThreadPoolAdapterChoose {
}
/**
* Replace.
* If the {@link DynamicThreadPoolAdapter#match(Object)} conditions are met,
* the thread pool is replaced with a dynamic thread pool.
*
* @param executor
* @param executor objects where there may be instances
* of dynamic thread pools
* @param dynamicThreadPoolExecutor dynamic thread-pool executor
*/
public static void replace(Object executor, Executor dynamicThreadPoolExecutor) {
Optional<DynamicThreadPoolAdapter> dynamicThreadPoolAdapterOptional = DYNAMIC_THREAD_POOL_ADAPTERS.stream().filter(each -> each.match(executor)).findFirst();

@ -33,8 +33,8 @@ public abstract class AbstractDynamicThreadPoolService implements DynamicThreadP
/**
* Build dynamic thread-pool executor.
*
* @param registerParameter
* @return
* @param registerParameter register parameter
* @return dynamic thread-pool executor
*/
public ThreadPoolExecutor buildDynamicThreadPoolExecutor(DynamicThreadPoolRegisterParameter registerParameter) {
ThreadPoolExecutor dynamicThreadPoolExecutor = ThreadPoolBuilder.builder()

@ -29,8 +29,8 @@ public interface DynamicThreadPoolService {
/**
* Registering dynamic thread pools at runtime.
*
* @param registerWrapper
* @return
* @param registerWrapper register wrapper
* @return dynamic thread-pool executor
*/
ThreadPoolExecutor registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper);
}

@ -15,18 +15,25 @@
* limitations under the License.
*/
package cn.hippo4j.core.executor.support;
package cn.hippo4j.core.provider;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import java.util.concurrent.TimeUnit;
/**
* Common dynamic thread-pool.
* Common dynamic thread-pool provider factory.
*/
public class CommonDynamicThreadPool {
public class CommonDynamicThreadPoolProviderFactory {
/**
* Get the public dynamic thread pool instance.
*
* @param threadPoolId thread-pool id
* @return dynamic thread-pool executor
*/
public static DynamicThreadPoolExecutor getInstance(String threadPoolId) {
DynamicThreadPoolExecutor dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) ThreadPoolBuilder.builder()
.dynamicPool()

@ -29,9 +29,9 @@ import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.CommonDynamicThreadPool;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose;
import cn.hippo4j.core.provider.CommonDynamicThreadPoolProviderFactory;
import cn.hippo4j.core.toolkit.DynamicThreadPoolAnnotationUtil;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import lombok.AllArgsConstructor;
@ -114,7 +114,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
log.error("Failed to initialize thread pool configuration. error: {}", ex);
} finally {
if (Objects.isNull(dynamicThreadPoolWrapper.getExecutor())) {
dynamicThreadPoolWrapper.setExecutor(CommonDynamicThreadPool.getInstance(threadPoolId));
dynamicThreadPoolWrapper.setExecutor(CommonDynamicThreadPoolProviderFactory.getInstance(threadPoolId));
}
dynamicThreadPoolWrapper.setInitFlag(Boolean.TRUE);
}

@ -33,9 +33,9 @@ import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.CommonDynamicThreadPool;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose;
import cn.hippo4j.core.provider.CommonDynamicThreadPoolProviderFactory;
import cn.hippo4j.core.toolkit.DynamicThreadPoolAnnotationUtil;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import cn.hippo4j.springboot.starter.config.BootstrapProperties;
@ -55,7 +55,9 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static cn.hippo4j.common.constant.Constants.*;
import static cn.hippo4j.common.constant.Constants.ITEM_ID;
import static cn.hippo4j.common.constant.Constants.NAMESPACE;
import static cn.hippo4j.common.constant.Constants.TP_ID;
/**
* Dynamic thread-pool post processor.
@ -189,12 +191,12 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
GlobalThreadPoolManage.dynamicRegister(registerWrapper);
}
} catch (Exception ex) {
newDynamicThreadPoolExecutor = executor != null ? executor : CommonDynamicThreadPool.getInstance(threadPoolId);
newDynamicThreadPoolExecutor = executor != null ? executor : CommonDynamicThreadPoolProviderFactory.getInstance(threadPoolId);
dynamicThreadPoolWrapper.setExecutor(newDynamicThreadPoolExecutor);
log.error("Failed to initialize thread pool configuration. error message: {}", ex.getMessage());
} finally {
if (Objects.isNull(executor)) {
dynamicThreadPoolWrapper.setExecutor(CommonDynamicThreadPool.getInstance(threadPoolId));
dynamicThreadPoolWrapper.setExecutor(CommonDynamicThreadPoolProviderFactory.getInstance(threadPoolId));
}
}
GlobalThreadPoolManage.register(dynamicThreadPoolWrapper.getThreadPoolId(), threadPoolParameterInfo, dynamicThreadPoolWrapper);

Loading…
Cancel
Save