Merge remote-tracking branch 'upstream/develop' into develop

pull/998/head
weihu 3 years ago
commit 74351432a2

@ -20,9 +20,10 @@ coverage:
project:
default:
target: auto
# adjust accordingly based on how flaky your tests are
# this allows a 10% drop from the previous base commit coverage
threshold: 10%
patch:
target: auto
threshold: 10%
ignore:
- "hippo4j-example/.*"
- "docs/.*"

@ -61,11 +61,18 @@ sidebar_position: 2
<td align="center" >-</td>
<td align="center" >17855368071@163.com</td>
</tr>
<tr>
<td align="center"><a href="https://github.com/Createsequence"><img src="https://avatars.githubusercontent.com/u/49221670?v=4" width="64px;"/></a></td>
<td align="center">黄成兴</td>
<td align="center" ><a href="https://github.com/Createsequence">Createsequence</a></td>
<td align="center" ><a href="https://blog.xiajibagao.top">Createsequence's Blog</a></td>
<td align="center" >841396397@qq.com</td>
</tr>
</table>
## 成为核心开发者
持续对 Hippo-4J 进行贡献, 粗略评估,完成 10 次 PR 贡献即可成为核心开发者。 其中包括完成 2 个 [good pro issue](https://github.com/opengoofy/hippo4j/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+pro+issue%22) 或以上,以及 6 个 [good first issue](https://github.com/opengoofy/hippo4j/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22) 或以上。
持续对 Hippo-4J 进行贡献, 粗略评估,完成 10 次 PR 贡献即可成为核心开发者。 其中包括完成 2 个 [good pro issue](https://github.com/opengoofy/hippo4j/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+pro+issue%22) 或以上,以及若干个 [good first issue](https://github.com/opengoofy/hippo4j/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22)。
:::note
会根据 PR 质量提供个性化评估,有可能一个或两个质量较高 PR 即可成为核心开发者。参考:[重构 DynamicThreadPoolExecutor 功能扩展逻辑](https://github.com/opengoofy/hippo4j/pull/854)

@ -1,5 +1,5 @@
---
sidebar_position: 4
sidebar_position: 5
---
# 更新日志

@ -61,11 +61,18 @@ sidebar_position: 2
<td align="center" >-</td>
<td align="center" >17855368071@163.com</td>
</tr>
<tr>
<td align="center"><a href="https://github.com/Createsequence"><img src="https://avatars.githubusercontent.com/u/49221670?v=4" width="64px;"/></a></td>
<td align="center">黄成兴</td>
<td align="center" ><a href="https://github.com/Createsequence">Createsequence</a></td>
<td align="center" ><a href="https://blog.xiajibagao.top">Createsequence's Blog</a></td>
<td align="center" >841396397@qq.com</td>
</tr>
</table>
## 成为核心开发者
持续对 Hippo-4J 进行贡献, 粗略评估,完成 10 次 PR 贡献即可成为核心开发者。 其中包括完成 2 个 [good pro issue](https://github.com/opengoofy/hippo4j/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+pro+issue%22) 或以上,以及 6 个 [good first issue](https://github.com/opengoofy/hippo4j/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22) 或以上。
持续对 Hippo-4J 进行贡献, 粗略评估,完成 10 次 PR 贡献即可成为核心开发者。 其中包括完成 2 个 [good pro issue](https://github.com/opengoofy/hippo4j/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+pro+issue%22) 或以上,以及若干个 [good first issue](https://github.com/opengoofy/hippo4j/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22)。
:::note
会根据 PR 质量提供个性化评估,有可能一个或两个质量较高 PR 即可成为核心开发者。参考:[重构 DynamicThreadPoolExecutor 功能扩展逻辑](https://github.com/opengoofy/hippo4j/pull/854)

@ -61,11 +61,18 @@ sidebar_position: 2
<td align="center" >-</td>
<td align="center" >17855368071@163.com</td>
</tr>
<tr>
<td align="center"><a href="https://github.com/Createsequence"><img src="https://avatars.githubusercontent.com/u/49221670?v=4" width="64px;"/></a></td>
<td align="center">黄成兴</td>
<td align="center" ><a href="https://github.com/Createsequence">Createsequence</a></td>
<td align="center" ><a href="https://blog.xiajibagao.top">Createsequence's Blog</a></td>
<td align="center" >841396397@qq.com</td>
</tr>
</table>
## 成为核心开发者
持续对 Hippo-4J 进行贡献, 粗略评估,完成 10 次 PR 贡献即可成为核心开发者。 其中包括完成 2 个 [good pro issue](https://github.com/opengoofy/hippo4j/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+pro+issue%22) 或以上,以及 6 个 [good first issue](https://github.com/opengoofy/hippo4j/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22) 或以上。
持续对 Hippo-4J 进行贡献, 粗略评估,完成 10 次 PR 贡献即可成为核心开发者。 其中包括完成 2 个 [good pro issue](https://github.com/opengoofy/hippo4j/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+pro+issue%22) 或以上,以及若干个 [good first issue](https://github.com/opengoofy/hippo4j/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22)。
:::note
会根据 PR 质量提供个性化评估,有可能一个或两个质量较高 PR 即可成为核心开发者。参考:[重构 DynamicThreadPoolExecutor 功能扩展逻辑](https://github.com/opengoofy/hippo4j/pull/854)

@ -61,11 +61,18 @@ sidebar_position: 2
<td align="center" >-</td>
<td align="center" >17855368071@163.com</td>
</tr>
<tr>
<td align="center"><a href="https://github.com/Createsequence"><img src="https://avatars.githubusercontent.com/u/49221670?v=4" width="64px;"/></a></td>
<td align="center">黄成兴</td>
<td align="center" ><a href="https://github.com/Createsequence">Createsequence</a></td>
<td align="center" ><a href="https://blog.xiajibagao.top">Createsequence's Blog</a></td>
<td align="center" >841396397@qq.com</td>
</tr>
</table>
## 成为核心开发者
持续对 Hippo-4J 进行贡献, 粗略评估,完成 10 次 PR 贡献即可成为核心开发者。 其中包括完成 2 个 [good pro issue](https://github.com/opengoofy/hippo4j/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+pro+issue%22) 或以上,以及 6 个 [good first issue](https://github.com/opengoofy/hippo4j/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22) 或以上。
持续对 Hippo-4J 进行贡献, 粗略评估,完成 10 次 PR 贡献即可成为核心开发者。 其中包括完成 2 个 [good pro issue](https://github.com/opengoofy/hippo4j/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+pro+issue%22) 或以上,以及若干个 [good first issue](https://github.com/opengoofy/hippo4j/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22)。
:::note
会根据 PR 质量提供个性化评估,有可能一个或两个质量较高 PR 即可成为核心开发者。参考:[重构 DynamicThreadPoolExecutor 功能扩展逻辑](https://github.com/opengoofy/hippo4j/pull/854)

@ -69,7 +69,7 @@ public class DubboThreadPoolAdapter implements ThreadPoolAdapter, ApplicationLis
@Override
public List<ThreadPoolAdapterState> getThreadPoolStates() {
List<ThreadPoolAdapterState> threadPoolAdapterStates = new ArrayList<>();
DUBBO_PROTOCOL_EXECUTOR.forEach((kel, val) -> threadPoolAdapterStates.add(getThreadPoolState(String.valueOf(val))));
DUBBO_PROTOCOL_EXECUTOR.forEach((key, val) -> threadPoolAdapterStates.add(getThreadPoolState(String.valueOf(key))));
return threadPoolAdapterStates;
}

@ -17,7 +17,10 @@
package cn.hippo4j.common.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.io.Serializable;
@ -27,6 +30,9 @@ import java.io.Serializable;
*/
@Getter
@Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ThreadPoolRunStateInfo extends ThreadPoolBaseInfo implements Serializable {
/**

@ -21,6 +21,8 @@ import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import java.util.Objects;
/**
* Content util.
*/

@ -26,7 +26,7 @@ import java.lang.management.MemoryUsage;
/**
* memory util<br>
* the obtained information is not invalid, after a long wait, obtain it again
* the obtained information is not real time effective, after a long wait, please get it again
*
* @author liuwenhao
*/

@ -250,4 +250,18 @@ public class ReflectUtil {
throw new IllegalException(e);
}
}
/**
* get instance
*
* @param cls the class
* @return new Instance
*/
public static Object createInstance(Class<?> cls) {
try {
return cls.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new IllegalException(e);
}
}
}

@ -24,7 +24,29 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Dynamic thread pool.
* An annotation that enhances the functionality of the jdk acoustic thread pool,
* with the following list of enhancements.
* <ul>
* <li>Dynamic change at runtime</li>
* <li>Determine whether an alarm is required at runtime</li>
* <li>Provide observable monitoring indicators</li>
* <li>......</li>
* </ur>
*
* <p>If you use Server mode, you can view the thread pool operation in the built-in console.
* <p>If you use Config mode, you can observe with Prometheus and Grafana.
*
* <p>The annotation is normally marked on the
* spring bean defined by {@link java.util.concurrent.ThreadPoolExecutor}.
*
* <p>Can also be marked on the following types:
*
* @see java.util.concurrent.Executor
* @see java.util.concurrent.ExecutorService
* @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
* @see com.alibaba.ttl.threadpool.ExecutorTtlWrapper
* @see com.alibaba.ttl.threadpool.ExecutorServiceTtlWrapper
* @since 1.0
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)

@ -45,7 +45,7 @@ import java.util.concurrent.atomic.AtomicLong;
public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean {
/**
* wait for tasks to complete on shutdown
* Wait for tasks to complete on shutdown
*/
@Getter
@Setter
@ -54,23 +54,23 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
/**
* Creates a new {@code DynamicThreadPoolExecutor} with the given initial parameters.
*
* @param threadPoolId thread-pool id
* @param executeTimeOut execute time out
* @param threadPoolId thread-pool id
* @param executeTimeOut execute time out
* @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
* @param awaitTerminationMillis await termination millis
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param blockingQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor creates a new thread
* @param rejectedExecutionHandler the handler to use when execution is blocked because the thread bounds and queue capacities are reached
* @param awaitTerminationMillis await termination millis
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param blockingQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor creates a new thread
* @param rejectedExecutionHandler the handler to use when execution is blocked because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
@ -80,27 +80,26 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
* or {@code threadFactory} or {@code handler} is null
*/
public DynamicThreadPoolExecutor(
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
long executeTimeOut, boolean waitForTasksToCompleteOnShutdown, long awaitTerminationMillis,
@NonNull BlockingQueue<Runnable> blockingQueue,
@NonNull String threadPoolId,
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler rejectedExecutionHandler) {
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
long executeTimeOut, boolean waitForTasksToCompleteOnShutdown, long awaitTerminationMillis,
@NonNull BlockingQueue<Runnable> blockingQueue,
@NonNull String threadPoolId,
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler rejectedExecutionHandler) {
super(
threadPoolId, new DefaultThreadPoolPluginManager(),
corePoolSize, maximumPoolSize, keepAliveTime, unit,
blockingQueue, threadFactory, rejectedExecutionHandler);
log.info("Initializing ExecutorService {}", threadPoolId);
this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown;
// init default plugins
// Init default plugins.
new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis)
.doRegister(this);
}
/**
* Invoked by the containing {@code BeanFactory} on destruction of a bean.
*
*/
@Override
public void destroy() {
@ -128,7 +127,7 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
/**
* Set support param.
*
* @param awaitTerminationMillis await termination millis
* @param awaitTerminationMillis await termination millis
* @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
* @deprecated use {@link ThreadPoolExecutorShutdownPlugin}
*/
@ -238,5 +237,4 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
public void setRedundancyHandler(RejectedExecutionHandler handler) {
setRejectedExecutionHandler(handler);
}
}

@ -17,7 +17,7 @@
package cn.hippo4j.core.executor;
import cn.hippo4j.core.executor.support.CommonDynamicThreadPool;
import cn.hippo4j.core.provider.CommonDynamicThreadPoolProviderFactory;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@ -63,7 +63,7 @@ public class DynamicThreadPoolWrapper implements DisposableBean {
private ThreadPoolExecutor executor;
public DynamicThreadPoolWrapper(String threadPoolId) {
this(threadPoolId, CommonDynamicThreadPool.getInstance(threadPoolId));
this(threadPoolId, CommonDynamicThreadPoolProviderFactory.getInstance(threadPoolId));
}
public DynamicThreadPoolWrapper(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {

@ -43,43 +43,42 @@ import java.util.concurrent.*;
public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements ThreadPoolPluginSupport {
/**
* thread pool id
* Thread pool id
*/
@Getter
@NonNull
private final String threadPoolId;
/**
* action aware registry
* Action aware registry
*/
@Getter
private final ThreadPoolPluginManager threadPoolPluginManager;
/**
* handler wrapper, any changes to the current instance {@link RejectedExecutionHandler} should be made through this wrapper
* Handler wrapper, any changes to the current instance {@link RejectedExecutionHandler} should be made through this wrapper
*/
private final RejectedAwareHandlerWrapper handlerWrapper;
/**
* Creates a new {@code ExtensibleThreadPoolExecutor} with the given initial parameters.
*
* @param threadPoolId thread-pool id
* @param threadPoolPluginManager action aware registry
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @param threadPoolId thread-pool id
* @param threadPoolPluginManager action aware registry
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
@ -89,20 +88,18 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
* or {@code threadFactory} or {@code handler} is null
*/
public ExtensibleThreadPoolExecutor(
@NonNull String threadPoolId,
@NonNull ThreadPoolPluginManager threadPoolPluginManager,
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
@NonNull BlockingQueue<Runnable> workQueue,
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler handler) {
@NonNull String threadPoolId,
@NonNull ThreadPoolPluginManager threadPoolPluginManager,
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
@NonNull BlockingQueue<Runnable> workQueue,
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
// pool extended info
// Pool extended info.
this.threadPoolId = threadPoolId;
this.threadPoolPluginManager = threadPoolPluginManager;
// proxy handler to support Aware callback
// Proxy handler to support Aware callback.
while (handler instanceof RejectedAwareHandlerWrapper) {
handler = ((RejectedAwareHandlerWrapper) handler).getHandler();
}
@ -115,7 +112,7 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
*
* <p><b>Before calling the parent class method, {@link ExecuteAwarePlugin#beforeExecute} will be called first.
*
* @param thread the thread that will run task {@code r}
* @param thread the thread that will run task {@code r}
* @param runnable the task that will be executed
*/
@Override
@ -145,9 +142,9 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
*
* <p><b>After calling the superclass method, {@link ExecuteAwarePlugin#afterExecute} will be called last.
*
* @param runnable the runnable that has completed
* @param runnable the runnable that has completed
* @param throwable the exception that caused termination, or null if
* execution completed normally
* execution completed normally
*/
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
@ -191,7 +188,7 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
}
/**
* {@inheritDoc}.
* {@inheritDoc}
*
* <p><b>Before calling the superclass method, {@link ShutdownAwarePlugin#afterTerminated} will be called first.
*/
@ -291,22 +288,21 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
private static class RejectedAwareHandlerWrapper implements RejectedExecutionHandler {
/**
* thread-pool action aware registry
* Thread-pool action aware registry
*/
private final ThreadPoolPluginManager registry;
/**
* original target
* Original target
*/
@NonNull
@Setter
@Getter
private RejectedExecutionHandler handler;
/**
* Call {@link RejectedAwarePlugin#beforeRejectedExecution}, then reject the task
* Call {@link RejectedAwarePlugin#beforeRejectedExecution}, then reject the task.
*
* @param r the runnable task requested to be executed
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
*/
@Override
@ -315,6 +311,5 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
rejectedAwarePluginList.forEach(aware -> aware.beforeRejectedExecution(r, executor));
handler.rejectedExecution(r, executor);
}
}
}

@ -29,7 +29,6 @@ import cn.hippo4j.message.request.AlarmNotifyRequest;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.service.Hippo4jSendMessageService;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
@ -37,7 +36,14 @@ import org.springframework.boot.CommandLineRunner;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Thread-pool notify alarm handler.
@ -46,7 +52,6 @@ import java.util.concurrent.*;
@RequiredArgsConstructor
public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner {
@NonNull
private final Hippo4jSendMessageService hippo4jSendMessageService;
@Value("${spring.profiles.active:UNKNOWN}")
@ -96,8 +101,8 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
/**
* Check thread pool capacity alarm.
*
* @param threadPoolId
* @param threadPoolExecutor
* @param threadPoolId thread-pool id
* @param threadPoolExecutor thread-pool executor
*/
public void checkPoolCapacityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
ThreadPoolNotifyAlarm alarmConfig = GlobalNotifyAlarmManage.get(threadPoolId);
@ -119,12 +124,12 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
/**
* Check thread pool activity alarm.
*
* @param threadPoolId
* @param threadPoolExecutor
* @param threadPoolId thread-pool id
* @param threadPoolExecutor thread-pool executor
*/
public void checkPoolActivityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
ThreadPoolNotifyAlarm alarmConfig = GlobalNotifyAlarmManage.get(threadPoolId);
if (Objects.isNull(alarmConfig) || !alarmConfig.getAlarm() || alarmConfig.getCapacityAlarm() <= 0) {
if (Objects.isNull(alarmConfig) || !alarmConfig.getAlarm() || alarmConfig.getActiveAlarm() <= 0) {
return;
}
int activeCount = threadPoolExecutor.getActiveCount();
@ -141,7 +146,7 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
/**
* Async send rejected alarm.
*
* @param threadPoolId
* @param threadPoolId thread-pool id
*/
public void asyncSendRejectedAlarm(String threadPoolId) {
Runnable checkPoolRejectedAlarmTask = () -> {
@ -162,10 +167,10 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
/**
* Async send execute time out alarm.
*
* @param threadPoolId
* @param executeTime
* @param executeTimeOut
* @param threadPoolExecutor
* @param threadPoolId thread-pool id
* @param executeTime execute time
* @param executeTimeOut execute time-out
* @param threadPoolExecutor thread-pool executor
*/
public void asyncSendExecuteTimeOutAlarm(String threadPoolId, long executeTime, long executeTimeOut, ThreadPoolExecutor threadPoolExecutor) {
ThreadPoolNotifyAlarm alarmConfig = GlobalNotifyAlarmManage.get(threadPoolId);
@ -193,7 +198,7 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
/**
* Send pool config change.
*
* @param request
* @param request change parameter notify request
*/
public void sendPoolConfigChange(ChangeParameterNotifyRequest request) {
request.setActive(active.toUpperCase());
@ -206,7 +211,7 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
/**
* Build alarm notify request.
*
* @param threadPoolExecutor
* @param threadPoolExecutor thread-pool executor
* @return
*/
public AlarmNotifyRequest buildAlarmNotifyRequest(ThreadPoolExecutor threadPoolExecutor) {

@ -35,18 +35,18 @@ import java.util.concurrent.ThreadPoolExecutor;
public abstract class AbstractThreadPoolRuntime {
/**
* Supplement.
* Supplemental thread pool runtime information.
*
* @param threadPoolRunStateInfo
* @return
* @param threadPoolRunStateInfo thread-pool run state info
* @return thread-pool run state info
*/
public abstract ThreadPoolRunStateInfo supplement(ThreadPoolRunStateInfo threadPoolRunStateInfo);
/**
* Get pool run state.
*
* @param threadPoolId
* @return
* @param threadPoolId thread-pool id
* @return thread-pool run state info
*/
public ThreadPoolRunStateInfo getPoolRunState(String threadPoolId) {
DynamicThreadPoolWrapper executorService = GlobalThreadPoolManage.getExecutorService(threadPoolId);
@ -57,56 +57,34 @@ public abstract class AbstractThreadPoolRuntime {
/**
* Get pool run state.
*
* @param threadPoolId
* @param executor
* @return
* @param threadPoolId thread-pool id
* @param executor executor
* @return thread-pool run state info
*/
public ThreadPoolRunStateInfo getPoolRunState(String threadPoolId, Executor executor) {
ThreadPoolRunStateInfo stateInfo = new ThreadPoolRunStateInfo();
ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;
// 核心线程数
int corePoolSize = pool.getCorePoolSize();
// 最大线程数
int maximumPoolSize = pool.getMaximumPoolSize();
// 线程池当前线程数 (有锁)
int poolSize = pool.getPoolSize();
// 活跃线程数 (有锁)
int activeCount = pool.getActiveCount();
// 同时进入池中的最大线程数 (有锁)
int largestPoolSize = pool.getLargestPoolSize();
// 线程池中执行任务总数量 (有锁)
long completedTaskCount = pool.getCompletedTaskCount();
// 当前负载
String currentLoad = CalculateUtil.divide(activeCount, maximumPoolSize) + "";
// 峰值负载
String peakLoad = CalculateUtil.divide(largestPoolSize, maximumPoolSize) + "";
BlockingQueue<Runnable> queue = pool.getQueue();
// 队列元素个数
int queueSize = queue.size();
// 队列类型
String queueType = queue.getClass().getSimpleName();
// 队列剩余容量
int remainingCapacity = queue.remainingCapacity();
// 队列容量
int queueCapacity = queueSize + remainingCapacity;
stateInfo.setCoreSize(corePoolSize);
stateInfo.setTpId(threadPoolId);
stateInfo.setPoolSize(poolSize);
stateInfo.setMaximumSize(maximumPoolSize);
stateInfo.setActiveSize(activeCount);
stateInfo.setCurrentLoad(currentLoad);
stateInfo.setPeakLoad(peakLoad);
stateInfo.setQueueType(queueType);
stateInfo.setQueueSize(queueSize);
stateInfo.setQueueCapacity(queueCapacity);
stateInfo.setQueueRemainingCapacity(remainingCapacity);
stateInfo.setLargestPoolSize(largestPoolSize);
stateInfo.setCompletedTaskCount(completedTaskCount);
long rejectCount =
pool instanceof DynamicThreadPoolExecutor ? ((DynamicThreadPoolExecutor) pool).getRejectCountNum() : -1L;
stateInfo.setRejectCount(rejectCount);
stateInfo.setClientLastRefreshTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
stateInfo.setTimestamp(System.currentTimeMillis());
ThreadPoolExecutor actualExecutor = (ThreadPoolExecutor) executor;
int activeCount = actualExecutor.getActiveCount();
int largestPoolSize = actualExecutor.getLargestPoolSize();
BlockingQueue<Runnable> blockingQueue = actualExecutor.getQueue();
long rejectCount = actualExecutor instanceof DynamicThreadPoolExecutor ? ((DynamicThreadPoolExecutor) actualExecutor).getRejectCountNum() : -1L;
ThreadPoolRunStateInfo stateInfo = ThreadPoolRunStateInfo.builder()
.tpId(threadPoolId)
.activeSize(activeCount)
.poolSize(actualExecutor.getPoolSize())
.completedTaskCount(actualExecutor.getCompletedTaskCount())
.largestPoolSize(largestPoolSize)
.currentLoad(CalculateUtil.divide(activeCount, actualExecutor.getMaximumPoolSize()) + "")
.clientLastRefreshTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")))
.peakLoad(CalculateUtil.divide(largestPoolSize, actualExecutor.getMaximumPoolSize()) + "")
.queueSize(blockingQueue.size())
.queueRemainingCapacity(blockingQueue.remainingCapacity())
.rejectCount(rejectCount)
.timestamp(System.currentTimeMillis())
.build();
stateInfo.setCoreSize(actualExecutor.getCorePoolSize());
stateInfo.setMaximumSize(actualExecutor.getMaximumPoolSize());
stateInfo.setQueueType(blockingQueue.getClass().getSimpleName());
stateInfo.setQueueCapacity(blockingQueue.size() + blockingQueue.remainingCapacity());
return supplement(stateInfo);
}
}

@ -50,9 +50,9 @@ public class ThreadPoolRunStateHandler extends AbstractThreadPoolRuntime {
long used = MemoryUtil.heapMemoryUsed();
long max = MemoryUtil.heapMemoryMax();
String memoryProportion = StringUtil.newBuilder(
"已分配: ",
"Allocation: ",
ByteConvertUtil.getPrintSize(used),
" / 最大可用: ",
" / Maximum available: ",
ByteConvertUtil.getPrintSize(max));
poolRunStateInfo.setCurrentLoad(poolRunStateInfo.getCurrentLoad() + "%");
poolRunStateInfo.setPeakLoad(poolRunStateInfo.getPeakLoad() + "%");

@ -40,10 +40,10 @@ public class ThreadPoolStatusHandler {
private static final AtomicBoolean EXCEPTION_FLAG = new AtomicBoolean(Boolean.TRUE);
/**
* Get thread pool state.
* Get thread-pool state.
*
* @param executor
* @return
* @param executor executor
* @return thread-pool state
*/
public static String getThreadPoolState(ThreadPoolExecutor executor) {
if (EXCEPTION_FLAG.get()) {

@ -34,19 +34,30 @@ import java.util.concurrent.*;
public class AbstractBuildThreadPoolTemplate {
/**
* Thread pool construction initialization parameters.
* Thread-pool construction initialization parameters.
*
* @return
* @return thread-pool init param
*/
protected static ThreadPoolInitParam initParam() {
throw new UnsupportedOperationException();
}
/**
* Build pool.
*
* @return thread-pool executor
*/
public static ThreadPoolExecutor buildPool() {
ThreadPoolInitParam initParam = initParam();
return buildPool(initParam);
}
/**
* Build pool.
*
* @param initParam init param
* @return thread-pool executor
*/
public static ThreadPoolExecutor buildPool(ThreadPoolInitParam initParam) {
Assert.notNull(initParam);
ThreadPoolExecutor executorService;
@ -65,11 +76,22 @@ public class AbstractBuildThreadPoolTemplate {
return executorService;
}
/**
* Build a fast-consuming task thread pool.
*
* @return fast thread-pool executor
*/
public static ThreadPoolExecutor buildFastPool() {
ThreadPoolInitParam initParam = initParam();
return buildFastPool(initParam);
}
/**
* Build a fast-consuming task thread pool.
*
* @param initParam init param
* @return fast thread-pool executor
*/
public static ThreadPoolExecutor buildFastPool(ThreadPoolInitParam initParam) {
TaskQueue<Runnable> taskQueue = new TaskQueue(initParam.getCapacity());
FastThreadPoolExecutor fastThreadPoolExecutor;
@ -89,6 +111,12 @@ public class AbstractBuildThreadPoolTemplate {
return fastThreadPoolExecutor;
}
/**
* Build a dynamic monitor thread-pool.
*
* @param initParam init param
* @return dynamic monitor thread-pool
*/
public static DynamicThreadPoolExecutor buildDynamicPool(ThreadPoolInitParam initParam) {
Assert.notNull(initParam);
DynamicThreadPoolExecutor dynamicThreadPoolExecutor;
@ -113,6 +141,9 @@ public class AbstractBuildThreadPoolTemplate {
return dynamicThreadPoolExecutor;
}
/**
* Thread-pool init param.
*/
@Data
@Accessors(chain = true)
public static class ThreadPoolInitParam {

@ -41,8 +41,16 @@ public class FastThreadPoolExecutor extends ThreadPoolExecutorTemplate {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
/**
* Statistics on the number of tasks submitted by the fast consumption thread pool
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
/**
* Get submitted task count.
*
* @return submitted task count
*/
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}

@ -17,6 +17,8 @@
package cn.hippo4j.core.executor.support;
import lombok.Setter;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@ -28,16 +30,13 @@ public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable>
private static final long serialVersionUID = -2635853580887179627L;
@Setter
private FastThreadPoolExecutor executor;
public TaskQueue(int capacity) {
super(capacity);
}
public void setExecutor(FastThreadPoolExecutor exec) {
executor = exec;
}
@Override
public boolean offer(Runnable runnable) {
int currentPoolThreadSize = executor.getPoolSize();
@ -54,10 +53,21 @@ public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable>
return super.offer(runnable);
}
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
/**
* Retry offer.
*
* @param runnable submit thread pool task
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return
* @throws InterruptedException
*/
public boolean retryOffer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Actuator closed!");
}
return super.offer(o, timeout, unit);
return super.offer(runnable, timeout, unit);
}
}

@ -17,6 +17,11 @@
package cn.hippo4j.core.executor.support;
import cn.hippo4j.common.design.builder.Builder;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.toolkit.Assert;
import org.springframework.core.task.TaskDecorator;
import java.math.BigDecimal;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
@ -25,12 +30,6 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import cn.hippo4j.common.design.builder.Builder;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.toolkit.Assert;
import org.springframework.core.task.TaskDecorator;
/**
* Thread-pool builder.
*/
@ -74,42 +73,89 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
private Boolean allowCoreThreadTimeOut = false;
/**
* Calculate core num.
*
* @return core num
*/
private Integer calculateCoreNum() {
int cpuCoreNum = Runtime.getRuntime().availableProcessors();
return new BigDecimal(cpuCoreNum).divide(new BigDecimal("0.2")).intValue();
}
/**
* Is fast pool.
*
* @param isFastPool is fast pool
* @return thread-pool builder
*/
public ThreadPoolBuilder isFastPool(Boolean isFastPool) {
this.isFastPool = isFastPool;
return this;
}
/**
* Dynamic pool.
*
* @return thread-pool builder
*/
public ThreadPoolBuilder dynamicPool() {
this.isDynamicPool = true;
return this;
}
/**
* Thread factory.
*
* @param threadNamePrefix thread name prefix
* @return thread-pool builder
*/
public ThreadPoolBuilder threadFactory(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
return this;
}
/**
* Thread factory.
*
* @param threadFactory thread factory
* @return thread-pool builder
*/
public ThreadPoolBuilder threadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}
/**
* Thread factory.
*
* @param threadNamePrefix thread name prefix
* @param isDaemon is daemon
* @return thread-pool builder
*/
public ThreadPoolBuilder threadFactory(String threadNamePrefix, Boolean isDaemon) {
this.threadNamePrefix = threadNamePrefix;
this.isDaemon = isDaemon;
return this;
}
/**
* Core pool size.
*
* @param corePoolSize core pool size
* @return thread-pool builder
*/
public ThreadPoolBuilder corePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
return this;
}
/**
* Max pool num.
*
* @param maxPoolSize max pool num
* @return thread-pool builder
*/
public ThreadPoolBuilder maxPoolNum(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
if (maxPoolSize < this.corePoolSize) {
@ -118,6 +164,11 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
return this;
}
/**
* Single pool.
*
* @return thread-pool builder
*/
public ThreadPoolBuilder singlePool() {
int singleNum = 1;
this.corePoolSize = singleNum;
@ -125,6 +176,12 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
return this;
}
/**
* Single pool.
*
* @param threadNamePrefix thread name prefix
* @return thread-pool builder
*/
public ThreadPoolBuilder singlePool(String threadNamePrefix) {
int singleNum = 1;
this.corePoolSize = singleNum;
@ -133,128 +190,245 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
return this;
}
/**
* Pool thread size.
*
* @param corePoolSize core pool size
* @param maxPoolSize max pool size
* @return thread-pool builder
*/
public ThreadPoolBuilder poolThreadSize(int corePoolSize, int maxPoolSize) {
this.corePoolSize = corePoolSize;
this.maxPoolSize = maxPoolSize;
return this;
}
/**
* Keep alive time.
*
* @param keepAliveTime keep alive time
* @return thread-pool builder
*/
public ThreadPoolBuilder keepAliveTime(long keepAliveTime) {
this.keepAliveTime = keepAliveTime;
return this;
}
/**
* Time unit.
*
* @param timeUnit time unit
* @return thread-pool builder
*/
public ThreadPoolBuilder timeUnit(TimeUnit timeUnit) {
this.timeUnit = timeUnit;
return this;
}
/**
* Execute time-out.
*
* @param executeTimeOut execute time-out
* @return thread-pool builder
*/
public ThreadPoolBuilder executeTimeOut(long executeTimeOut) {
this.executeTimeOut = executeTimeOut;
return this;
}
/**
* Keep alive time.
*
* @param keepAliveTime keep alive time
* @param timeUnit time unit
* @return thread-pool builder
*/
public ThreadPoolBuilder keepAliveTime(long keepAliveTime, TimeUnit timeUnit) {
this.keepAliveTime = keepAliveTime;
this.timeUnit = timeUnit;
return this;
}
/**
* Capacity.
*
* @param capacity capacity
* @return thread-pool builder
*/
public ThreadPoolBuilder capacity(int capacity) {
this.capacity = capacity;
return this;
}
/**
* Work queue.
*
* @param queueType queue type
* @param capacity capacity
* @return thread-pool builder
*/
public ThreadPoolBuilder workQueue(BlockingQueueTypeEnum queueType, int capacity) {
this.blockingQueueType = queueType;
this.capacity = capacity;
return this;
}
/**
* Rejected.
*
* @param rejectedExecutionHandler rejected execution handler
* @return thread-pool builder
*/
public ThreadPoolBuilder rejected(RejectedExecutionHandler rejectedExecutionHandler) {
this.rejectedExecutionHandler = rejectedExecutionHandler;
return this;
}
/**
* Work queue.
*
* @param blockingQueueType blocking queue type
* @return thread-pool builder
*/
public ThreadPoolBuilder workQueue(BlockingQueueTypeEnum blockingQueueType) {
this.blockingQueueType = blockingQueueType;
return this;
}
/**
* Work queue.
*
* @param workQueue work queue
* @return thread-pool builder
*/
public ThreadPoolBuilder workQueue(BlockingQueue workQueue) {
this.workQueue = workQueue;
return this;
}
/**
* Thread-pool id.
*
* @param threadPoolId thread-pool id
* @return thread-pool builder
*/
public ThreadPoolBuilder threadPoolId(String threadPoolId) {
this.threadPoolId = threadPoolId;
return this;
}
/**
* Task decorator.
*
* @param taskDecorator task decorator
* @return thread-pool builder
*/
public ThreadPoolBuilder taskDecorator(TaskDecorator taskDecorator) {
this.taskDecorator = taskDecorator;
return this;
}
/**
* Await termination millis.
*
* @param awaitTerminationMillis await termination millis
* @return thread-pool builder
*/
public ThreadPoolBuilder awaitTerminationMillis(long awaitTerminationMillis) {
this.awaitTerminationMillis = awaitTerminationMillis;
return this;
}
/**
* Wait for tasks to complete on shutdown.
*
* @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
* @return thread-pool builder
*/
public ThreadPoolBuilder waitForTasksToCompleteOnShutdown(boolean waitForTasksToCompleteOnShutdown) {
this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown;
return this;
}
/**
* Dynamic support.
*
* @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
* @param awaitTerminationMillis await termination millis
* @return thread-pool builder
*/
public ThreadPoolBuilder dynamicSupport(boolean waitForTasksToCompleteOnShutdown, long awaitTerminationMillis) {
this.awaitTerminationMillis = awaitTerminationMillis;
this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown;
return this;
}
/**
* Allow core thread time-out.
*
* @param allowCoreThreadTimeOut core thread time-out
* @return thread-pool builder
*/
public ThreadPoolBuilder allowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
return this;
}
@Override
public ThreadPoolExecutor build() {
if (isDynamicPool) {
return buildDynamicPool(this);
}
return isFastPool ? buildFastPool(this) : buildPool(this);
}
/**
* Builder design pattern implementation.
*
* @return thread-pool builder
*/
public static ThreadPoolBuilder builder() {
return new ThreadPoolBuilder();
}
/**
* Create dynamic thread pool by thread pool id
* Create dynamic thread pool by thread pool id.
*
* @param threadPoolId threadPoolId
* @return ThreadPoolExecutor
* @param threadPoolId thread-pool id
* @return dynamic thread-pool executor
*/
public static ThreadPoolExecutor buildDynamicPoolById(String threadPoolId) {
return ThreadPoolBuilder.builder()
.threadFactory(threadPoolId)
.threadPoolId(threadPoolId)
.dynamicPool()
.build();
return ThreadPoolBuilder.builder().threadFactory(threadPoolId).threadPoolId(threadPoolId).dynamicPool().build();
}
/**
* Build a normal thread-pool with {@code builder}.
*
* @param builder thread-pool builder
* @return normal thread-pool
*/
private static ThreadPoolExecutor buildPool(ThreadPoolBuilder builder) {
return AbstractBuildThreadPoolTemplate.buildPool(buildInitParam(builder));
}
/**
* Build a fast thread-pool with {@code builder}.
*
* @param builder thread-pool builder
* @return fast thread-pool executor
*/
private static ThreadPoolExecutor buildFastPool(ThreadPoolBuilder builder) {
return AbstractBuildThreadPoolTemplate.buildFastPool(buildInitParam(builder));
}
/**
* Build a dynamic thread-pool with {@code builder}.
*
* @param builder thread-pool builder
* @return dynamic thread-pool executor
*/
private static ThreadPoolExecutor buildDynamicPool(ThreadPoolBuilder builder) {
return AbstractBuildThreadPoolTemplate.buildDynamicPool(buildInitParam(builder));
}
/**
* Build thread-pool initialization parameters via {@code builder}.
*
* @param builder thread-pool builder
* @return thread-pool init param
*/
private static AbstractBuildThreadPoolTemplate.ThreadPoolInitParam buildInitParam(ThreadPoolBuilder builder) {
AbstractBuildThreadPoolTemplate.ThreadPoolInitParam initParam;
if (builder.threadFactory == null) {
@ -289,4 +463,12 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
}
return initParam;
}
@Override
public ThreadPoolExecutor build() {
if (isDynamicPool) {
return buildDynamicPool(this);
}
return isFastPool ? buildFastPool(this) : buildPool(this);
}
}

@ -36,10 +36,6 @@ public class ThreadPoolExecutorTemplate extends ThreadPoolExecutor {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
private Exception clientTrace() {
return new Exception("Tread task root stack trace.");
}
@Override
public void execute(final Runnable command) {
super.execute(wrap(command, clientTrace()));
@ -55,6 +51,22 @@ public class ThreadPoolExecutorTemplate extends ThreadPoolExecutor {
return super.submit(wrap(task, clientTrace()));
}
/**
* Client trace.
*
* @return exception
*/
private Exception clientTrace() {
return new Exception("Tread task root stack trace.");
}
/**
* Wrapping thread pool tasks.
*
* @param task task
* @param clientStack client stack
* @return wrapped runnable
*/
private Runnable wrap(final Runnable task, final Exception clientStack) {
return () -> {
try {
@ -66,6 +78,14 @@ public class ThreadPoolExecutorTemplate extends ThreadPoolExecutor {
};
}
/**
* Wrapping thread pool tasks.
*
* @param task task
* @param clientStack client stack
* @param <T> computed result
* @return wrapped runnable
*/
private <T> Callable<T> wrap(final Callable<T> task, final Exception clientStack) {
return () -> {
try {

@ -27,26 +27,30 @@ import java.util.concurrent.Executor;
public interface DynamicThreadPoolAdapter {
/**
* Match.
* Check if the object contains thread pool information.
*
* @param executor
* @return
* @param executor objects where there may be instances
* of dynamic thread pools
* @return matching results
*/
boolean match(Object executor);
/**
* Unwrap.
* Get the dynamic thread pool reference in the object.
*
* @param executor
* @return
* @param executor objects where there may be instances
* of dynamic thread pools
* @return get the real dynamic thread pool instance
*/
DynamicThreadPoolExecutor unwrap(Object executor);
/**
* Replace.
* If the {@link DynamicThreadPoolAdapter#match(Object)} conditions are met,
* the thread pool is replaced with a dynamic thread pool.
*
* @param executor
* @param dynamicThreadPoolExecutor
* @param executor objects where there may be instances
* of dynamic thread pools
* @param dynamicThreadPoolExecutor dynamic thread-pool executor
*/
void replace(Object executor, Executor dynamicThreadPoolExecutor);
}

@ -38,20 +38,22 @@ public class DynamicThreadPoolAdapterChoose {
}
/**
* Match.
* Check if the object contains thread pool information.
*
* @param executor
* @return
* @param executor objects where there may be instances
* of dynamic thread pools
* @return matching results
*/
public static boolean match(Object executor) {
return DYNAMIC_THREAD_POOL_ADAPTERS.stream().anyMatch(each -> each.match(executor));
}
/**
* Unwrap.
* Get the dynamic thread pool reference in the object.
*
* @param executor
* @return
* @param executor objects where there may be instances
* of dynamic thread pools
* @return get the real dynamic thread pool instance
*/
public static DynamicThreadPoolExecutor unwrap(Object executor) {
Optional<DynamicThreadPoolAdapter> dynamicThreadPoolAdapterOptional = DYNAMIC_THREAD_POOL_ADAPTERS.stream().filter(each -> each.match(executor)).findFirst();
@ -59,9 +61,12 @@ public class DynamicThreadPoolAdapterChoose {
}
/**
* Replace.
* If the {@link DynamicThreadPoolAdapter#match(Object)} conditions are met,
* the thread pool is replaced with a dynamic thread pool.
*
* @param executor
* @param executor objects where there may be instances
* of dynamic thread pools
* @param dynamicThreadPoolExecutor dynamic thread-pool executor
*/
public static void replace(Object executor, Executor dynamicThreadPoolExecutor) {
Optional<DynamicThreadPoolAdapter> dynamicThreadPoolAdapterOptional = DYNAMIC_THREAD_POOL_ADAPTERS.stream().filter(each -> each.match(executor)).findFirst();

@ -33,8 +33,8 @@ public abstract class AbstractDynamicThreadPoolService implements DynamicThreadP
/**
* Build dynamic thread-pool executor.
*
* @param registerParameter
* @return
* @param registerParameter register parameter
* @return dynamic thread-pool executor
*/
public ThreadPoolExecutor buildDynamicThreadPoolExecutor(DynamicThreadPoolRegisterParameter registerParameter) {
ThreadPoolExecutor dynamicThreadPoolExecutor = ThreadPoolBuilder.builder()

@ -29,8 +29,8 @@ public interface DynamicThreadPoolService {
/**
* Registering dynamic thread pools at runtime.
*
* @param registerWrapper
* @return
* @param registerWrapper register wrapper
* @return dynamic thread-pool executor
*/
ThreadPoolExecutor registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper);
}

@ -47,6 +47,9 @@ public class DynamicThreadPoolBannerHandler implements InitializingBean {
printBanner();
}
/**
* Print banner.
*/
private void printBanner() {
String banner = " __ __ ___ ___ __ \n" +
" | |--.|__|.-----..-----..-----.| | | |__|\n" +
@ -67,6 +70,11 @@ public class DynamicThreadPoolBannerHandler implements InitializingBean {
}
}
/**
* Get version.
*
* @return hippo4j version
*/
public static String getVersion() {
final Package pkg = DynamicThreadPoolBannerHandler.class.getPackage();
return pkg != null ? pkg.getImplementationVersion() : "";

@ -27,7 +27,7 @@ public interface ExecuteAwarePlugin extends ThreadPoolPlugin {
/**
* Callback before task execution.
*
* @param thread thread of executing task
* @param thread thread of executing task
* @param runnable task
* @see ExtensibleThreadPoolExecutor#beforeExecute
*/
@ -37,12 +37,10 @@ public interface ExecuteAwarePlugin extends ThreadPoolPlugin {
/**
* Callback after task execution.
*
* @param runnable runnable
* @param runnable runnable
* @param throwable exception thrown during execution
* @see ExtensibleThreadPoolExecutor#afterExecute
*/
default void afterExecute(Runnable runnable, Throwable throwable) {
// do nothing
}
}

@ -31,19 +31,19 @@ import java.util.List;
public class PluginRuntime {
/**
* plugin id
* Plugin id
*/
private final String pluginId;
/**
* runtime info
* Runtime info
*/
private final List<Info> infoList = new ArrayList<>();
/**
* Add a runtime info item.
*
* @param name name
* @param name name
* @param value value
* @return runtime info item
*/
@ -52,12 +52,21 @@ public class PluginRuntime {
return this;
}
/**
* Plugin runtime info.
*/
@Getter
@RequiredArgsConstructor
public static class Info {
/**
* Name
*/
private final String name;
/**
* Value
*/
private final Object value;
}
}

@ -31,7 +31,5 @@ public interface RejectedAwarePlugin extends ThreadPoolPlugin {
* @param executor executor
*/
default void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
// do nothing
}
}

@ -35,19 +35,17 @@ public interface ShutdownAwarePlugin extends ThreadPoolPlugin {
* @see ThreadPoolExecutor#shutdownNow()
*/
default void beforeShutdown(ThreadPoolExecutor executor) {
// do nothing
}
/**
* Callback after pool shutdown.
*
* @param executor executor
* @param executor executor
* @param remainingTasks remainingTasks, or empty if no tasks left or {@link ThreadPoolExecutor#shutdown()} called
* @see ThreadPoolExecutor#shutdown()
* @see ThreadPoolExecutor#shutdownNow()
*/
default void afterShutdown(ThreadPoolExecutor executor, List<Runnable> remainingTasks) {
// do nothing
}
/**
@ -57,7 +55,5 @@ public interface ShutdownAwarePlugin extends ThreadPoolPlugin {
* @see ThreadPoolExecutor#terminated()
*/
default void afterTerminated(ExtensibleThreadPoolExecutor executor) {
// do nothing
}
}

@ -43,7 +43,7 @@ public interface TaskAwarePlugin extends ThreadPoolPlugin {
* Callback during the {@link java.util.concurrent.RunnableFuture} task create in thread-pool.
*
* @param executor executor
* @param future original task
* @param future original task
* @return Tasks that really need to be performed
* @see ThreadPoolExecutor#newTaskFor(Callable)
*/
@ -61,5 +61,4 @@ public interface TaskAwarePlugin extends ThreadPoolPlugin {
default Runnable beforeTaskExecute(Runnable runnable) {
return runnable;
}
}

@ -57,7 +57,6 @@ public interface ThreadPoolPlugin {
* @see ThreadPoolPluginManager#register
*/
default void start() {
// do nothing
}
/**
@ -67,7 +66,6 @@ public interface ThreadPoolPlugin {
* @see ThreadPoolPluginManager#clear
*/
default void stop() {
// do nothing
}
/**
@ -78,5 +76,4 @@ public interface ThreadPoolPlugin {
default PluginRuntime getPluginRuntime() {
return new PluginRuntime(getId());
}
}

@ -36,7 +36,7 @@ import java.util.Optional;
public abstract class AbstractTaskTimerPlugin implements ExecuteAwarePlugin {
/**
* start times of executed tasks
* Start times of executed tasks
*/
private final ThreadLocal<Long> startTimes = new ThreadLocal<>();
@ -55,7 +55,7 @@ public abstract class AbstractTaskTimerPlugin implements ExecuteAwarePlugin {
/**
* Record the total time for the worker thread to complete the task, and update the time record.
*
* @param runnable runnable
* @param runnable runnable
* @param throwable exception thrown during execution
*/
@Override
@ -84,5 +84,4 @@ public abstract class AbstractTaskTimerPlugin implements ExecuteAwarePlugin {
* @param taskExecuteTime execute time of task
*/
protected abstract void processTaskTime(long taskExecuteTime);
}

@ -45,7 +45,7 @@ public class TaskDecoratorPlugin implements TaskAwarePlugin {
}
/**
* decorators
* Decorators
*/
@Getter
private final List<TaskDecorator> decorators = new ArrayList<>();
@ -77,7 +77,7 @@ public class TaskDecoratorPlugin implements TaskAwarePlugin {
}
/**
* Add a decorator
* Add a decorator.
*
* @param decorator decorator
*/
@ -87,19 +87,16 @@ public class TaskDecoratorPlugin implements TaskAwarePlugin {
}
/**
* Clear all decorators
*
* Clear all decorators.
*/
public void clearDecorators() {
decorators.clear();
}
/**
* Remove decorators
*
* Remove decorators.
*/
public void removeDecorator(TaskDecorator decorator) {
decorators.remove(decorator);
}
}

@ -43,7 +43,7 @@ public class TaskRejectCountRecordPlugin implements RejectedAwarePlugin {
}
/**
* rejection count
* Rejection count
*/
@Setter
@Getter
@ -72,12 +72,11 @@ public class TaskRejectCountRecordPlugin implements RejectedAwarePlugin {
}
/**
* Get reject count num
* Get reject count num.
*
* @return reject count num
*/
public Long getRejectCountNum() {
return rejectCount.get();
}
}

@ -34,27 +34,27 @@ public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin {
public static final String PLUGIN_NAME = "task-time-record-plugin";
/**
* Lock instance.
* Lock instance
*/
private final ReadWriteLock lock = new ReentrantReadWriteLock();
/**
* Total execution milli time of all tasks.
* Total execution milli time of all tasks
*/
private long totalTaskTimeMillis = 0L;
/**
* Maximum task milli execution time, default -1.
* Maximum task milli execution time, default -1
*/
private long maxTaskTimeMillis = -1L;
/**
* Minimal task milli execution time, default -1.
* Minimal task milli execution time, default -1
*/
private long minTaskTimeMillis = -1L;
/**
* Count of completed task.
* Count of completed task
*/
private long taskCount = 0L;
@ -137,27 +137,27 @@ public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin {
public static class Summary {
/**
* Total execution nano time of all tasks.
* Total execution nano time of all tasks
*/
private final long totalTaskTimeMillis;
/**
* Maximum task nano execution time.
* Maximum task nano execution time
*/
private final long maxTaskTimeMillis;
/**
* Minimal task nano execution time.
* Minimal task nano execution time
*/
private final long minTaskTimeMillis;
/**
* Count of completed task.
* Count of completed task
*/
private final long taskCount;
/**
* Get the avg task time in milliseconds.
* Get the avg task time in milliseconds
*
* @return avg task time
*/
@ -165,7 +165,5 @@ public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin {
long totalTaskCount = getTaskCount();
return totalTaskCount > 0L ? getTotalTaskTimeMillis() / totalTaskCount : -1;
}
}
}

@ -35,16 +35,19 @@ public class TaskTimeoutNotifyAlarmPlugin extends AbstractTaskTimerPlugin {
public static final String PLUGIN_NAME = "task-timeout-notify-alarm-plugin";
/**
* threadPoolId
* Thread-pool id
*/
private final String threadPoolId;
/**
* Execute time-out
*/
@Getter
@Setter
private Long executeTimeOut;
/**
* thread-pool
* Thread-pool executor
*/
private final ThreadPoolExecutor threadPoolExecutor;
@ -74,5 +77,4 @@ public class TaskTimeoutNotifyAlarmPlugin extends AbstractTaskTimerPlugin {
.ifPresent(handler -> handler.asyncSendExecuteTimeOutAlarm(
threadPoolId, taskExecuteTime, executeTimeOut, threadPoolExecutor));
}
}

@ -54,7 +54,7 @@ public class ThreadPoolExecutorShutdownPlugin implements ShutdownAwarePlugin {
}
/**
* await termination millis
* Await termination millis
*/
@Setter
public long awaitTerminationMillis;
@ -80,7 +80,7 @@ public class ThreadPoolExecutorShutdownPlugin implements ShutdownAwarePlugin {
* cancel the remaining tasks,
* then wait for pool to terminate according {@link #awaitTerminationMillis} if necessary.
*
* @param executor executor
* @param executor executor
* @param remainingTasks remainingTasks
*/
@Override
@ -141,5 +141,4 @@ public class ThreadPoolExecutorShutdownPlugin implements ShutdownAwarePlugin {
Thread.currentThread().interrupt();
}
}
}

@ -52,32 +52,32 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
/**
* lock of this instance
* Lock of this instance
*/
private final ReadWriteLock instanceLock = new ReentrantReadWriteLock();
/**
* Registered {@link ThreadPoolPlugin}.
* Registered {@link ThreadPoolPlugin}
*/
private final Map<String, ThreadPoolPlugin> registeredPlugins = new ConcurrentHashMap<>(16);
/**
* Registered {@link TaskAwarePlugin}.
* Registered {@link TaskAwarePlugin}
*/
private final List<TaskAwarePlugin> taskAwarePluginList = new CopyOnWriteArrayList<>();
/**
* Registered {@link ExecuteAwarePlugin}.
* Registered {@link ExecuteAwarePlugin}
*/
private final List<ExecuteAwarePlugin> executeAwarePluginList = new CopyOnWriteArrayList<>();
/**
* Registered {@link RejectedAwarePlugin}.
* Registered {@link RejectedAwarePlugin}
*/
private final List<RejectedAwarePlugin> rejectedAwarePluginList = new CopyOnWriteArrayList<>();
/**
* Registered {@link ShutdownAwarePlugin}.
* Registered {@link ShutdownAwarePlugin}
*/
private final List<ShutdownAwarePlugin> shutdownAwarePluginList = new CopyOnWriteArrayList<>();
@ -115,10 +115,7 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
try {
String id = plugin.getId();
Assert.isTrue(!isRegistered(id), "The plugin with id [" + id + "] has been registered");
// register plugin
registeredPlugins.put(id, plugin);
// quick index
if (plugin instanceof TaskAwarePlugin) {
taskAwarePluginList.add((TaskAwarePlugin) plugin);
}
@ -171,7 +168,6 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
Optional.ofNullable(pluginId)
.map(registeredPlugins::remove)
.ifPresent(plugin -> {
// remove quick index if necessary
if (plugin instanceof TaskAwarePlugin) {
taskAwarePluginList.remove(plugin);
}
@ -227,10 +223,10 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
}
/**
* Get {@link ThreadPoolPlugin}
* Get {@link ThreadPoolPlugin}.
*
* @param pluginId plugin id
* @param <A> plugin type
* @param <A> plugin type
* @return {@link ThreadPoolPlugin}, null if unregister
*/
@Override
@ -314,5 +310,4 @@ public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
readLock.unlock();
}
}
}

@ -38,12 +38,12 @@ public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistr
public static final String REGISTRAR_NAME = "DefaultThreadPoolPluginRegistrar";
/**
* execute time out
* Execute time out
*/
private long executeTimeOut;
/**
* await termination millis
* Await termination millis
*/
private long awaitTerminationMillis;
@ -64,14 +64,10 @@ public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistr
*/
@Override
public void doRegister(ThreadPoolPluginSupport support) {
// callback when task execute
support.register(new TaskDecoratorPlugin());
support.register(new TaskTimeoutNotifyAlarmPlugin(support.getThreadPoolId(), executeTimeOut, support.getThreadPoolExecutor()));
// callback when task rejected
support.register(new TaskRejectCountRecordPlugin());
support.register(new TaskRejectNotifyAlarmPlugin());
// callback when pool shutdown
support.register(new ThreadPoolExecutorShutdownPlugin(awaitTerminationMillis));
}
}

@ -32,7 +32,7 @@ import java.util.Optional;
public class EmptyThreadPoolPluginManager implements ThreadPoolPluginManager {
/**
* default instance
* Default instance
*/
public static final EmptyThreadPoolPluginManager INSTANCE = new EmptyThreadPoolPluginManager();
@ -41,7 +41,6 @@ public class EmptyThreadPoolPluginManager implements ThreadPoolPluginManager {
*/
@Override
public void clear() {
// do nothing
}
/**
@ -64,7 +63,6 @@ public class EmptyThreadPoolPluginManager implements ThreadPoolPluginManager {
*/
@Override
public void register(ThreadPoolPlugin plugin) {
// do nothing
}
/**
@ -90,17 +88,16 @@ public class EmptyThreadPoolPluginManager implements ThreadPoolPluginManager {
}
/**
* Unregister {@link ThreadPoolPlugin}
* Unregister {@link ThreadPoolPlugin}.
*
* @param pluginId plugin id
*/
@Override
public void unregister(String pluginId) {
// do nothing
}
/**
* Get {@link ThreadPoolPlugin}
* Get {@link ThreadPoolPlugin}.
*
* @param pluginId plugin id
* @return {@link ThreadPoolPlugin}
@ -150,5 +147,4 @@ public class EmptyThreadPoolPluginManager implements ThreadPoolPluginManager {
public Collection<TaskAwarePlugin> getTaskAwarePluginList() {
return Collections.emptyList();
}
}

@ -54,7 +54,7 @@ public interface ThreadPoolPluginManager {
Collection<ThreadPoolPlugin> getAllPlugins();
/**
* Register a {@link ThreadPoolPlugin}
* Register a {@link ThreadPoolPlugin}.
*
* @param plugin plugin
* @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()}
@ -80,17 +80,17 @@ public interface ThreadPoolPluginManager {
boolean isRegistered(String pluginId);
/**
* Unregister {@link ThreadPoolPlugin}
* Unregister {@link ThreadPoolPlugin}.
*
* @param pluginId plugin id
*/
void unregister(String pluginId);
/**
* Get {@link ThreadPoolPlugin}
* Get {@link ThreadPoolPlugin}.
*
* @param pluginId plugin id
* @param <A> target aware type
* @param pluginId plugin id
* @param <A> target aware type
* @return {@link ThreadPoolPlugin}
* @throws ClassCastException thrown when the object obtained by name cannot be converted to target type
*/
@ -129,7 +129,7 @@ public interface ThreadPoolPluginManager {
/**
* Get plugin of type.
*
* @param pluginId plugin id
* @param pluginId plugin id
* @param pluginType plugin type
* @return target plugin
*/
@ -172,5 +172,4 @@ public interface ThreadPoolPluginManager {
return getPlugin(pluginId)
.map(ThreadPoolPlugin::getPluginRuntime);
}
}

@ -38,5 +38,4 @@ public interface ThreadPoolPluginRegistrar {
* @param support thread pool plugin manager delegate
*/
void doRegister(ThreadPoolPluginSupport support);
}

@ -94,7 +94,7 @@ public interface ThreadPoolPluginSupport extends ThreadPoolPluginManager {
}
/**
* Unregister {@link ThreadPoolPlugin}
* Unregister {@link ThreadPoolPlugin}.
*
* @param pluginId name
*/
@ -114,7 +114,7 @@ public interface ThreadPoolPluginSupport extends ThreadPoolPluginManager {
}
/**
* Get {@link ThreadPoolPlugin}
* Get {@link ThreadPoolPlugin}.
*
* @param pluginId target name
* @return {@link ThreadPoolPlugin}, null if unregister
@ -164,5 +164,4 @@ public interface ThreadPoolPluginSupport extends ThreadPoolPluginManager {
default Collection<TaskAwarePlugin> getTaskAwarePluginList() {
return getThreadPoolPluginManager().getTaskAwarePluginList();
}
}

@ -15,18 +15,25 @@
* limitations under the License.
*/
package cn.hippo4j.core.executor.support;
package cn.hippo4j.core.provider;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import java.util.concurrent.TimeUnit;
/**
* Common dynamic thread-pool.
* Common dynamic thread-pool provider factory.
*/
public class CommonDynamicThreadPool {
public class CommonDynamicThreadPoolProviderFactory {
/**
* Get the public dynamic thread pool instance.
*
* @param threadPoolId thread-pool id
* @return dynamic thread-pool executor
*/
public static DynamicThreadPoolExecutor getInstance(String threadPoolId) {
DynamicThreadPoolExecutor dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) ThreadPoolBuilder.builder()
.dynamicPool()

@ -34,10 +34,19 @@ import java.util.concurrent.atomic.AtomicLong;
@AllArgsConstructor
public class RejectedProxyInvocationHandler implements InvocationHandler {
/**
* Target object
*/
private final Object target;
/**
* Thread-pool id
*/
private final String threadPoolId;
/**
* Reject count
*/
private final AtomicLong rejectCount;
@Override

@ -33,9 +33,9 @@ public class RejectedProxyUtil {
/**
* Proxy rejected execution.
*
* @param rejectedExecutionHandler
* @param threadPoolId
* @param rejectedNum
* @param rejectedExecutionHandler rejected execution handler
* @param threadPoolId thread-pool id
* @param rejectedNum rejected num
* @return
*/
public static RejectedExecutionHandler createProxy(RejectedExecutionHandler rejectedExecutionHandler, String threadPoolId, AtomicLong rejectedNum) {

@ -34,7 +34,7 @@ public class ExecutorTraceContextUtil {
/**
* Get and remove.
*
* @return
* @return timeout trace
*/
public static String getAndRemoveTimeoutTrace() {
String val = MDC.get(EXECUTE_TIMEOUT_TRACE_KEY);
@ -45,7 +45,7 @@ public class ExecutorTraceContextUtil {
/**
* Put execute timeout trace.
*
* @param trace
* @param trace trace
*/
public static void putExecuteTimeoutTrace(String trace) {
MDC.put(EXECUTE_TIMEOUT_TRACE, trace);
@ -54,7 +54,7 @@ public class ExecutorTraceContextUtil {
/**
* Set execute timeout trace key.
*
* @param key
* @param key trace key
*/
public static void setExecuteTimeoutTraceKey(String key) {
EXECUTE_TIMEOUT_TRACE_KEY = key;

@ -38,14 +38,20 @@ import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL;
*/
public class IdentifyUtil {
private static String IDENTIFY;
public static final String CLIENT_IDENTIFICATION_VALUE = IdUtil.simpleUUID();
static {
DynamicThreadPoolServiceLoader.register(ClientNetworkService.class);
}
/**
* Identify
*/
private static String IDENTIFY;
/**
* Client identification value
*/
public static final String CLIENT_IDENTIFICATION_VALUE = IdUtil.simpleUUID();
/**
* Generate identify.
*

@ -27,14 +27,29 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class SystemClock {
/**
* Period
*/
private final int period;
/**
* Now
*/
private final AtomicLong now;
/**
* Thread name
*/
private static final String THREAD_NAME = "system.clock";
/**
* Instance holder.
*/
private static class InstanceHolder {
/**
* System clock instance
*/
private static final SystemClock INSTANCE = new SystemClock(1);
}
@ -44,10 +59,18 @@ public class SystemClock {
scheduleClockUpdating();
}
/**
* Instance.
*
* @return System clock instance
*/
private static SystemClock instance() {
return InstanceHolder.INSTANCE;
}
/**
* Schedule clock updating.
*/
private void scheduleClockUpdating() {
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, runnable -> {
Thread thread = new Thread(runnable, THREAD_NAME);
@ -57,10 +80,20 @@ public class SystemClock {
scheduler.scheduleAtFixedRate(() -> now.set(System.currentTimeMillis()), period, period, TimeUnit.MILLISECONDS);
}
/**
* Current time millis.
*
* @return current time millis
*/
private long currentTimeMillis() {
return now.get();
}
/**
* Now.
*
* @return current time millis
*/
public static long now() {
return instance().currentTimeMillis();
}

@ -75,7 +75,7 @@ public class InetUtils implements Closeable {
try {
int lowest = Integer.MAX_VALUE;
for (Enumeration<NetworkInterface> nics = NetworkInterface
.getNetworkInterfaces(); nics.hasMoreElements();) {
.getNetworkInterfaces(); nics.hasMoreElements(); ) {
NetworkInterface ifc = nics.nextElement();
if (ifc.isUp()) {
this.log.trace("Testing interface: " + ifc.getDisplayName());
@ -84,7 +84,6 @@ public class InetUtils implements Closeable {
} else {
continue;
}
// @formatter:off
if (!ignoreInterface(ifc.getDisplayName())) {
for (Enumeration<InetAddress> addrs = ifc
.getInetAddresses(); addrs.hasMoreElements(); ) {
@ -98,7 +97,6 @@ public class InetUtils implements Closeable {
}
}
}
// @formatter:on
}
}
} catch (IOException ex) {
@ -150,7 +148,6 @@ public class InetUtils implements Closeable {
public HostInfo convertAddress(final InetAddress address) {
HostInfo hostInfo = new HostInfo();
Future<String> result = this.executorService.submit(address::getHostName);
String hostname;
try {
hostname = result.get(this.properties.getTimeoutSeconds(), TimeUnit.SECONDS);

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin;
import org.junit.Assert;
import org.junit.Test;
/**
* test for {@link PluginRuntime}
*/
public class PluginRuntimeTest {
@Test
public void test() {
PluginRuntime runtime = new PluginRuntime("test");
Assert.assertEquals("test", runtime.getPluginId());
Assert.assertTrue(runtime.getInfoList().isEmpty());
runtime.addInfo("item", "item");
PluginRuntime.Info info = runtime.getInfoList().get(0);
Assert.assertEquals("item", info.getName());
Assert.assertEquals("item", info.getValue());
}
}

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
import lombok.Getter;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* test for default method of {@link ThreadPoolPlugin} and it's subclass
*/
public class ThreadPoolPluginTest {
@Test
public void testDefaultMethod() {
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginManager(),
1, 1, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
executor.register(new TestTaskAwarePlugin());
executor.register(new TestExecuteAwarePlugin());
executor.register(new TestRejectedAwarePlugin());
executor.register(new TestShutdownAwarePlugin());
AtomicInteger count = new AtomicInteger(0);
executor.submit(() -> {
ThreadUtil.sleep(100L);
return count.incrementAndGet();
});
executor.submit(() -> {
ThreadUtil.sleep(100L);
count.incrementAndGet();
});
executor.submit(count::incrementAndGet, 2);
// waiting for shutdown
executor.shutdown();
while (!executor.isTerminated()) {
}
Assert.assertEquals(2, count.get());
}
@Getter
private final static class TestTaskAwarePlugin implements TaskAwarePlugin {
private final String id = this.getClass().getSimpleName();
}
@Getter
private final static class TestExecuteAwarePlugin implements ExecuteAwarePlugin {
private final String id = this.getClass().getSimpleName();
}
@Getter
private final static class TestRejectedAwarePlugin implements RejectedAwarePlugin {
private final String id = this.getClass().getSimpleName();
}
@Getter
private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin {
private final String id = this.getClass().getSimpleName();
}
}

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.PluginRuntime;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.core.task.TaskDecorator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* test for {@link TaskDecoratorPlugin}
*/
public class TaskDecoratorPluginTest {
private final AtomicInteger taskExecuteCount = new AtomicInteger(0);
@Test
public void testGetId() {
Assert.assertEquals(TaskDecoratorPlugin.PLUGIN_NAME, new TaskDecoratorPlugin().getId());
}
@Test
public void testGetRuntime() {
ThreadPoolPlugin plugin = new TaskDecoratorPlugin();
PluginRuntime runtime = new TaskDecoratorPlugin().getPluginRuntime();
Assert.assertNotNull(runtime);
Assert.assertEquals(plugin.getId(), runtime.getPluginId());
}
@Test
public void testAddDecorator() {
TaskDecoratorPlugin plugin = new TaskDecoratorPlugin();
plugin.addDecorator(runnable -> runnable);
plugin.addDecorator(runnable -> runnable);
Assert.assertEquals(2, plugin.getDecorators().size());
}
@Test
public void testRemoveDecorator() {
TaskDecoratorPlugin plugin = new TaskDecoratorPlugin();
TaskDecorator decorator = runnable -> runnable;
plugin.addDecorator(decorator);
plugin.removeDecorator(decorator);
Assert.assertTrue(plugin.getDecorators().isEmpty());
}
@Test
public void testClear() {
TaskDecoratorPlugin plugin = new TaskDecoratorPlugin();
TaskDecorator decorator = runnable -> runnable;
plugin.addDecorator(decorator);
plugin.addDecorator(decorator);
plugin.clearDecorators();
Assert.assertTrue(plugin.getDecorators().isEmpty());
}
@Test
public void testBeforeTaskExecute() {
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginManager(),
5, 5, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
TaskDecoratorPlugin plugin = new TaskDecoratorPlugin();
plugin.addDecorator(runnable -> () -> {
taskExecuteCount.incrementAndGet();
runnable.run();
});
plugin.addDecorator(runnable -> () -> {
taskExecuteCount.incrementAndGet();
runnable.run();
});
executor.register(plugin);
executor.execute(() -> {
});
ThreadUtil.sleep(500L);
Assert.assertEquals(2, taskExecuteCount.get());
}
}

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* test for {@link TaskRejectCountRecordPlugin}
*/
public class TaskRejectCountRecordPluginTest {
@Test
public void testGetId() {
Assert.assertEquals(TaskRejectCountRecordPlugin.PLUGIN_NAME, new TaskRejectCountRecordPlugin().getId());
}
@Test
public void testGetRuntime() {
Assert.assertNotNull(new TaskRejectCountRecordPlugin().getPluginRuntime());
}
@Test
public void testGetRejectCountNum() {
TaskRejectCountRecordPlugin plugin = new TaskRejectCountRecordPlugin();
Assert.assertEquals((Long) 0L, plugin.getRejectCountNum());
}
@Test
public void testGetRejectCount() {
TaskRejectCountRecordPlugin plugin = new TaskRejectCountRecordPlugin();
Assert.assertEquals(0L, plugin.getRejectCount().get());
}
@Test
public void testSetRejectCount() {
TaskRejectCountRecordPlugin plugin = new TaskRejectCountRecordPlugin();
AtomicLong atomicLong = new AtomicLong(0);
plugin.setRejectCount(atomicLong);
Assert.assertSame(atomicLong, plugin.getRejectCount());
}
@Test
public void testBeforeRejectedExecution() {
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginManager(),
1, 1, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
TaskRejectCountRecordPlugin plugin = new TaskRejectCountRecordPlugin();
executor.register(plugin);
executor.submit(() -> ThreadUtil.sleep(500L));
executor.submit(() -> ThreadUtil.sleep(500L));
executor.submit(() -> ThreadUtil.sleep(500L));
ThreadUtil.sleep(500L);
Assert.assertEquals((Long) 1L, plugin.getRejectCountNum());
}
}

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
import lombok.RequiredArgsConstructor;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* test for {@link TaskRejectNotifyAlarmPlugin}
*/
public class TaskRejectNotifyAlarmPluginTest {
@Test
public void testGetId() {
Assert.assertEquals(TaskRejectNotifyAlarmPlugin.PLUGIN_NAME, new TaskRejectNotifyAlarmPlugin().getId());
}
@Test
public void testGetRuntime() {
Assert.assertNotNull(new TaskRejectNotifyAlarmPlugin().getPluginRuntime());
}
@Test
public void testBeforeRejectedExecution() {
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginManager(),
1, 1, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
AtomicInteger rejectCount = new AtomicInteger(0);
executor.register(new TestPlugin(rejectCount, executor));
executor.submit(() -> ThreadUtil.sleep(200L));
executor.submit(() -> ThreadUtil.sleep(200L));
executor.submit(() -> ThreadUtil.sleep(200L));
// waiting for shutdown
executor.shutdown();
while (!executor.isTerminated()) {
}
Assert.assertEquals(1, rejectCount.get());
}
@RequiredArgsConstructor
private static class TestPlugin extends TaskRejectNotifyAlarmPlugin {
private final AtomicInteger count;
private final ThreadPoolExecutor targetExecutor;
/**
* Callback before task is rejected.
*
* @param runnable task
* @param executor executor
*/
@Override
public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
count.incrementAndGet();
Assert.assertEquals(targetExecutor, executor);
super.beforeRejectedExecution(runnable, executor);
}
}
}

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* test for {@link TaskTimeRecordPlugin}
*/
public class TaskTimeRecordPluginTest {
@Test
public void testGetId() {
Assert.assertEquals(TaskTimeRecordPlugin.PLUGIN_NAME, new TaskTimeRecordPlugin().getId());
}
@Test
public void testGetRuntime() {
Assert.assertNotNull(new TaskTimeRecordPlugin().getPluginRuntime());
}
@Test
public void testSummarize() {
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginManager(),
3, 3, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
TaskTimeRecordPlugin plugin = new TaskTimeRecordPlugin();
executor.register(plugin);
executor.submit(() -> ThreadUtil.sleep(1000L));
executor.submit(() -> ThreadUtil.sleep(3000L));
executor.submit(() -> ThreadUtil.sleep(2000L));
// waiting for shutdown
executor.shutdown();
while (!executor.isTerminated()) {
}
TaskTimeRecordPlugin.Summary summary = plugin.summarize();
Assert.assertEquals(1, summary.getMinTaskTimeMillis() / 1000L);
Assert.assertEquals(3, summary.getMaxTaskTimeMillis() / 1000L);
Assert.assertEquals(2, summary.getAvgTaskTimeMillis() / 1000L);
Assert.assertEquals(6, summary.getTotalTaskTimeMillis() / 1000L);
}
}

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* test for {@link TaskTimeoutNotifyAlarmPlugin}
*/
public class TaskTimeoutNotifyAlarmPluginTest {
private final ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginManager(),
5, 5, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.AbortPolicy());
private final TaskTimeoutNotifyAlarmPlugin plugin = new TaskTimeoutNotifyAlarmPlugin(
executor.getThreadPoolId(), 100L, executor);
@Test
public void testGetId() {
Assert.assertEquals(TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME, plugin.getId());
}
@Test
public void testGetRuntime() {
Assert.assertNotNull(plugin.getPluginRuntime());
}
@Test
public void testGetExecuteTimeOut() {
Assert.assertEquals(100L, plugin.getExecuteTimeOut().longValue());
}
@Test
public void testSetExecuteTimeOut() {
plugin.setExecuteTimeOut(200L);
Assert.assertEquals(200L, plugin.getExecuteTimeOut().longValue());
}
@Test
public void testProcessTaskTime() {
executor.register(plugin);
AtomicInteger count = new AtomicInteger(0);
executor.submit(() -> {
count.incrementAndGet();
ThreadUtil.sleep(100L);
});
executor.submit(() -> {
count.incrementAndGet();
ThreadUtil.sleep(300L);
});
// waiting for shutdown
executor.shutdown();
while (!executor.isTerminated()) {
}
Assert.assertEquals(2, count.get());
}
}

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* test for {@link ThreadPoolExecutorShutdownPlugin}
*/
public class ThreadPoolExecutorShutdownPluginTest {
@Test
public void testGetId() {
Assert.assertEquals(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, new ThreadPoolExecutorShutdownPlugin(1000L).getId());
}
@Test
public void testGetRuntime() {
Assert.assertNotNull(new ThreadPoolExecutorShutdownPlugin(1000L).getPluginRuntime());
}
@Test
public void testGetAwaitTerminationMillis() {
ThreadPoolExecutorShutdownPlugin plugin = new ThreadPoolExecutorShutdownPlugin(1000L);
Assert.assertEquals(1000L, plugin.getAwaitTerminationMillis());
}
@Test
public void testSetAwaitTerminationMillis() {
ThreadPoolExecutorShutdownPlugin plugin = new ThreadPoolExecutorShutdownPlugin(1000L);
plugin.setAwaitTerminationMillis(5000L);
Assert.assertEquals(5000L, plugin.getAwaitTerminationMillis());
}
public ExtensibleThreadPoolExecutor getExecutor(ThreadPoolPlugin plugin) {
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginManager(),
2, 2, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
executor.register(plugin);
return executor;
}
private static Callable<Integer> getCallable(AtomicInteger completedCount) {
return () -> {
ThreadUtil.sleep(1000L);
return completedCount.incrementAndGet();
};
}
@Test
public void testShutdown() {
ExtensibleThreadPoolExecutor executor = getExecutor(
new ThreadPoolExecutorShutdownPlugin(2000L));
AtomicInteger completedCount = new AtomicInteger(0);
executor.submit(getCallable(completedCount));
executor.submit(getCallable(completedCount));
executor.submit(getCallable(completedCount));
executor.shutdownNow();
Assert.assertEquals(2, completedCount.get());
}
}

@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.*;
import lombok.Getter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* test for {@link DefaultThreadPoolPluginManager}
*/
public class DefaultThreadPoolPluginManagerTest {
private DefaultThreadPoolPluginManager manager;
@Before
public void initRegistry() {
manager = new DefaultThreadPoolPluginManager();
}
@Test
public void testRegister() {
manager.register(new TestShutdownAwarePlugin());
Assert.assertEquals(1, manager.getAllPlugins().size());
}
@Test
public void testGetAllPlugins() {
manager.register(new TestExecuteAwarePlugin());
manager.register(new TestRejectedAwarePlugin());
Assert.assertEquals(2, manager.getAllPlugins().size());
}
@Test
public void testClear() {
manager.register(new TestExecuteAwarePlugin());
manager.clear();
Assert.assertTrue(manager.getAllPlugins().isEmpty());
}
@Test
public void testTryRegister() {
Assert.assertTrue(manager.tryRegister(new TestExecuteAwarePlugin()));
Assert.assertFalse(manager.tryRegister(new TestExecuteAwarePlugin()));
}
@Test
public void testIsRegistered() {
Assert.assertFalse(manager.isRegistered(TestExecuteAwarePlugin.class.getSimpleName()));
manager.register(new TestExecuteAwarePlugin());
Assert.assertTrue(manager.isRegistered(TestExecuteAwarePlugin.class.getSimpleName()));
}
@Test
public void testUnregister() {
manager.register(new TestExecuteAwarePlugin());
manager.unregister(TestExecuteAwarePlugin.class.getSimpleName());
Assert.assertFalse(manager.isRegistered(TestExecuteAwarePlugin.class.getSimpleName()));
}
@Test
public void testGetPlugin() {
ThreadPoolPlugin plugin = new TestExecuteAwarePlugin();
manager.register(plugin);
Assert.assertSame(plugin, manager.getPlugin(plugin.getId()).orElse(null));
}
@Test
public void testGetRejectedAwarePluginList() {
manager.register(new TestRejectedAwarePlugin());
Assert.assertEquals(1, manager.getRejectedAwarePluginList().size());
}
@Test
public void testGetShutdownAwarePluginList() {
manager.register(new TestShutdownAwarePlugin());
Assert.assertEquals(1, manager.getShutdownAwarePluginList().size());
}
@Test
public void testGetTaskAwarePluginList() {
manager.register(new TestTaskAwarePlugin());
Assert.assertEquals(1, manager.getTaskAwarePluginList().size());
}
@Test
public void testGetExecuteAwarePluginList() {
manager.register(new TestExecuteAwarePlugin());
Assert.assertEquals(1, manager.getExecuteAwarePluginList().size());
}
@Test
public void testGetAllPluginsOfType() {
manager.register(new TestExecuteAwarePlugin());
manager.register(new TestRejectedAwarePlugin());
Assert.assertEquals(1, manager.getAllPluginsOfType(TestExecuteAwarePlugin.class).size());
Assert.assertEquals(1, manager.getAllPluginsOfType(TestRejectedAwarePlugin.class).size());
Assert.assertEquals(2, manager.getAllPluginsOfType(ThreadPoolPlugin.class).size());
}
@Test
public void testGetAllPluginRuntimes() {
manager.register(new TestExecuteAwarePlugin());
manager.register(new TestRejectedAwarePlugin());
Assert.assertEquals(2, manager.getAllPluginRuntimes().size());
}
@Test
public void testGetPluginRuntime() {
manager.register(new TestExecuteAwarePlugin());
Assert.assertTrue(manager.getRuntime(TestExecuteAwarePlugin.class.getSimpleName()).isPresent());
}
@Test
public void testGetPluginOfType() {
manager.register(new TestExecuteAwarePlugin());
Assert.assertTrue(manager.getPluginOfType(TestExecuteAwarePlugin.class.getSimpleName(), TestExecuteAwarePlugin.class).isPresent());
Assert.assertTrue(manager.getPluginOfType(TestExecuteAwarePlugin.class.getSimpleName(), ThreadPoolPlugin.class).isPresent());
Assert.assertFalse(manager.getPluginOfType(TestExecuteAwarePlugin.class.getSimpleName(), RejectedAwarePlugin.class).isPresent());
}
@Getter
private final static class TestTaskAwarePlugin implements TaskAwarePlugin {
private final String id = this.getClass().getSimpleName();
}
@Getter
private final static class TestExecuteAwarePlugin implements ExecuteAwarePlugin {
private final String id = this.getClass().getSimpleName();
}
@Getter
private final static class TestRejectedAwarePlugin implements RejectedAwarePlugin {
private final String id = this.getClass().getSimpleName();
}
@Getter
private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin {
private final String id = this.getClass().getSimpleName();
}
}

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.impl.*;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* test for {@link DefaultThreadPoolPluginRegistrar}
*/
public class DefaultThreadPoolPluginRegistrarTest {
@Test
public void testGetId() {
ThreadPoolPluginRegistrar registrar = new DefaultThreadPoolPluginRegistrar();
Assert.assertEquals(registrar.getClass().getSimpleName(), registrar.getId());
}
@Test
public void testDoRegister() {
ThreadPoolPluginRegistrar registrar = new DefaultThreadPoolPluginRegistrar(100L, 100L);
ThreadPoolPluginManager manager = new DefaultThreadPoolPluginManager();
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", manager,
5, 5, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.AbortPolicy());
registrar.doRegister(executor);
Assert.assertTrue(manager.getPlugin(TaskDecoratorPlugin.PLUGIN_NAME).isPresent());
Assert.assertTrue(manager.getPlugin(TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME).isPresent());
Assert.assertTrue(manager.getPlugin(TaskRejectCountRecordPlugin.PLUGIN_NAME).isPresent());
Assert.assertTrue(manager.getPlugin(TaskRejectNotifyAlarmPlugin.PLUGIN_NAME).isPresent());
Assert.assertTrue(manager.getPlugin(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME).isPresent());
}
}

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import lombok.Getter;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.Optional;
/**
* test for {@link cn.hippo4j.core.plugin.manager.EmptyThreadPoolPluginManager}
*/
public class EmptyThreadPoolPluginManagerTest {
private final ThreadPoolPluginManager manager = EmptyThreadPoolPluginManager.INSTANCE;
@Test
public void testEmpty() {
Assert.assertSame(manager, ThreadPoolPluginManager.empty());
}
@Test
public void testGetAllPlugins() {
Assert.assertEquals(Collections.emptyList(), manager.getAllPluginRuntimes());
}
@Test
public void testClear() {
manager.clear();
Assert.assertTrue(isEmpty(manager));
}
@Test
public void testRegister() {
manager.register(new TestPlugin());
Assert.assertTrue(isEmpty(manager));
}
@Test
public void testTryRegister() {
Assert.assertFalse(manager.tryRegister(new TestPlugin()));
}
@Test
public void testIsRegistered() {
manager.register(new TestPlugin());
Assert.assertFalse(manager.isRegistered(TestPlugin.class.getSimpleName()));
}
@Test
public void testUnregister() {
manager.register(new TestPlugin());
manager.unregister(TestPlugin.class.getSimpleName());
Assert.assertTrue(isEmpty(manager));
}
@Test
public void testGetPlugin() {
Assert.assertSame(Optional.empty(), manager.getPlugin(""));
}
@Test
public void testGetRejectedAwarePluginList() {
Assert.assertEquals(Collections.emptyList(), manager.getRejectedAwarePluginList());
}
@Test
public void testGetShutdownAwarePluginList() {
Assert.assertEquals(Collections.emptyList(), manager.getShutdownAwarePluginList());
}
@Test
public void testGetTaskAwarePluginList() {
Assert.assertEquals(Collections.emptyList(), manager.getTaskAwarePluginList());
}
@Test
public void testGetExecuteAwarePluginList() {
Assert.assertEquals(Collections.emptyList(), manager.getExecuteAwarePluginList());
}
private static boolean isEmpty(ThreadPoolPluginManager manager) {
return manager.getAllPlugins().isEmpty();
}
@Getter
private static class TestPlugin implements ThreadPoolPlugin {
private final String id = TestPlugin.class.getSimpleName();
}
}

@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.*;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* test for default method of {@link ThreadPoolPluginSupport}
*/
public class ThreadPoolPluginSupportTest {
private final ThreadPoolPluginManager manager = new DefaultThreadPoolPluginManager();
private final ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", manager,
5, 5, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.AbortPolicy());
private final ThreadPoolPluginSupport support = new TestSupport(executor.getThreadPoolId(), executor, manager);
@Test
public void testGetThreadPoolId() {
Assert.assertEquals(executor.getThreadPoolId(), support.getThreadPoolId());
}
@Test
public void testGetThreadPoolPluginManager() {
Assert.assertEquals(manager, support.getThreadPoolPluginManager());
}
@Getter
@RequiredArgsConstructor
private static class TestSupport implements ThreadPoolPluginSupport {
private final String threadPoolId;
private final ExtensibleThreadPoolExecutor threadPoolExecutor;
private final ThreadPoolPluginManager threadPoolPluginManager;
}
// ================ default delegate method ================
@Test
public void testRegister() {
support.register(new TestShutdownAwarePlugin());
Assert.assertEquals(1, support.getAllPlugins().size());
}
@Test
public void testGetAllPlugins() {
support.register(new TestExecuteAwarePlugin());
support.register(new TestRejectedAwarePlugin());
Assert.assertEquals(2, support.getAllPlugins().size());
}
@Test
public void testClear() {
support.register(new TestExecuteAwarePlugin());
support.clear();
Assert.assertTrue(support.getAllPlugins().isEmpty());
}
@Test
public void testTryRegister() {
Assert.assertTrue(support.tryRegister(new TestExecuteAwarePlugin()));
Assert.assertFalse(support.tryRegister(new TestExecuteAwarePlugin()));
}
@Test
public void testIsRegistered() {
Assert.assertFalse(support.isRegistered(TestExecuteAwarePlugin.class.getSimpleName()));
support.register(new TestExecuteAwarePlugin());
Assert.assertTrue(support.isRegistered(TestExecuteAwarePlugin.class.getSimpleName()));
}
@Test
public void testUnregister() {
support.register(new TestExecuteAwarePlugin());
support.unregister(TestExecuteAwarePlugin.class.getSimpleName());
Assert.assertFalse(support.isRegistered(TestExecuteAwarePlugin.class.getSimpleName()));
}
@Test
public void testGetPlugin() {
ThreadPoolPlugin plugin = new TestExecuteAwarePlugin();
support.register(plugin);
Assert.assertSame(plugin, support.getPlugin(plugin.getId()).orElse(null));
}
@Test
public void testGetRejectedAwarePluginList() {
support.register(new TestRejectedAwarePlugin());
Assert.assertEquals(1, support.getRejectedAwarePluginList().size());
}
@Test
public void testGetShutdownAwarePluginList() {
support.register(new TestShutdownAwarePlugin());
Assert.assertEquals(1, support.getShutdownAwarePluginList().size());
}
@Test
public void testGetTaskAwarePluginList() {
support.register(new TestTaskAwarePlugin());
Assert.assertEquals(1, support.getTaskAwarePluginList().size());
}
@Test
public void testGetExecuteAwarePluginList() {
support.register(new TestExecuteAwarePlugin());
Assert.assertEquals(1, support.getExecuteAwarePluginList().size());
}
@Test
public void testGetAllPluginsOfType() {
support.register(new TestExecuteAwarePlugin());
support.register(new TestRejectedAwarePlugin());
Assert.assertEquals(1, support.getAllPluginsOfType(TestExecuteAwarePlugin.class).size());
Assert.assertEquals(1, support.getAllPluginsOfType(TestRejectedAwarePlugin.class).size());
Assert.assertEquals(2, support.getAllPluginsOfType(ThreadPoolPlugin.class).size());
}
@Test
public void testGetAllPluginRuntimes() {
support.register(new TestExecuteAwarePlugin());
support.register(new TestRejectedAwarePlugin());
Assert.assertEquals(2, support.getAllPluginRuntimes().size());
}
@Test
public void testGetPluginRuntime() {
support.register(new TestExecuteAwarePlugin());
Assert.assertTrue(support.getRuntime(TestExecuteAwarePlugin.class.getSimpleName()).isPresent());
}
@Test
public void testGetPluginOfType() {
support.register(new TestExecuteAwarePlugin());
Assert.assertTrue(support.getPluginOfType(TestExecuteAwarePlugin.class.getSimpleName(), TestExecuteAwarePlugin.class).isPresent());
Assert.assertTrue(support.getPluginOfType(TestExecuteAwarePlugin.class.getSimpleName(), ThreadPoolPlugin.class).isPresent());
Assert.assertFalse(support.getPluginOfType(TestExecuteAwarePlugin.class.getSimpleName(), RejectedAwarePlugin.class).isPresent());
}
@Getter
private final static class TestTaskAwarePlugin implements TaskAwarePlugin {
private final String id = this.getClass().getSimpleName();
}
@Getter
private final static class TestExecuteAwarePlugin implements ExecuteAwarePlugin {
private final String id = this.getClass().getSimpleName();
}
@Getter
private final static class TestRejectedAwarePlugin implements RejectedAwarePlugin {
private final String id = this.getClass().getSimpleName();
}
@Getter
private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin {
private final String id = this.getClass().getSimpleName();
}
}

@ -17,9 +17,9 @@
package cn.hippo4j.message.service;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.message.api.NotifyConfigBuilder;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.message.dto.AlarmControlDTO;
import cn.hippo4j.message.dto.NotifyConfigDTO;
import cn.hippo4j.message.enums.NotifyTypeEnum;
@ -28,7 +28,7 @@ import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.CommandLineRunner;
import java.util.HashMap;
import java.util.List;
@ -39,7 +39,7 @@ import java.util.Map;
*/
@Slf4j
@RequiredArgsConstructor
public class Hippo4jBaseSendMessageService implements Hippo4jSendMessageService, InitializingBean {
public class Hippo4jBaseSendMessageService implements Hippo4jSendMessageService, CommandLineRunner {
private final NotifyConfigBuilder notifyConfigBuilder;
@ -133,7 +133,7 @@ public class Hippo4jBaseSendMessageService implements Hippo4jSendMessageService,
}
@Override
public void afterPropertiesSet() throws Exception {
public void run(String... args) throws Exception {
Map<String, SendMessageHandler> sendMessageHandlerMap =
ApplicationContextHolder.getBeansOfType(SendMessageHandler.class);
sendMessageHandlerMap.values().forEach(each -> sendMessageHandlers.put(each.getType(), each));

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.client;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
import java.io.Closeable;
/**
* the client for RPC, Explain the role of the client in the request
*/
public interface Client extends Closeable {
/**
* Start the client and try to send and receive data
*/
Response connection(Request request);
}

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.client;
import cn.hippo4j.config.rpc.handler.Connection;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
/**
* Applicable to client connections
*/
public interface ClientConnection extends Connection {
/**
* Establish a connection and process
*
* @param request Request information
*/
Response connect(Request request);
/**
* Get timeout, ms
*/
long timeout();
/**
* SET timeout, ms
*/
void setTimeout(long timeout);
}

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.client;
import cn.hippo4j.config.rpc.exception.TimeOutException;
import cn.hippo4j.config.rpc.process.ActivePostProcess;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
import cn.hippo4j.config.rpc.support.NettyConnectPool;
import cn.hippo4j.config.rpc.support.NettyConnectPoolHolder;
import cn.hippo4j.config.rpc.support.ResultHolder;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.web.exception.IllegalException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
/**
* Client implemented using netty
*/
@Slf4j
public class NettyClientConnection implements ClientConnection {
String host;
Integer port;
// Obtain the connection timeout period. The default value is 30s
long timeout = 30000L;
Channel channel;
EventLoopGroup worker = new NioEventLoopGroup();
List<ActivePostProcess> activeProcesses;
ChannelFuture future;
NettyConnectPool connectionPool;
public NettyClientConnection(String host, int port,
List<ActivePostProcess> activeProcesses) {
Assert.notNull(worker);
this.host = host;
this.port = port;
this.activeProcesses = activeProcesses;
this.connectionPool = NettyConnectPoolHolder.getPool(host, port, timeout, worker);
}
public NettyClientConnection(String host, int port) {
this(host, port, new LinkedList<>());
}
@Override
public Response connect(Request request) {
preHandlers(request);
this.channel = connectionPool.acquire(timeout);
try {
String key = request.getKey();
this.future = channel.writeAndFlush(request);
log.info("Call successful, target address is {}:{}, request key is {}", host, port, key);
// Wait for execution to complete
ResultHolder.put(key, Thread.currentThread());
LockSupport.parkNanos(timeout() * 1000000);
Response response = ResultHolder.get(key);
if (response == null) {
throw new TimeOutException("Timeout waiting for server-side response");
}
postHandlers(request, response);
log.info("The response from {}:{} was received successfully with the response key {}.", host, port, key);
return response;
} catch (Exception ex) {
afterCompletions(request, null, ex);
throw new IllegalException(ex);
} finally {
connectionPool.release(this.channel);
}
}
@Override
public long timeout() {
return timeout;
}
@Override
public void setTimeout(long timeout) {
this.timeout = timeout;
}
@Override
public void close() {
if (this.channel == null) {
return;
}
worker.shutdownGracefully();
this.future.channel().close();
this.channel.close();
}
private void preHandlers(Request request) {
for (ActivePostProcess process : activeProcesses) {
process.preHandler(request);
}
}
private void postHandlers(Request request, Response response) {
for (ActivePostProcess process : activeProcesses) {
process.postHandler(request, response);
}
}
private void afterCompletions(Request request, Response response, Exception e) {
for (ActivePostProcess process : activeProcesses) {
process.afterCompletion(request, response, e);
}
}
}

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.client;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
import java.io.IOException;
/**
* The client, which provides a closing mechanism, maintains a persistent connection if not closed
*/
public class RPCClient implements Client {
ClientConnection clientConnection;
public RPCClient(ClientConnection clientConnection) {
this.clientConnection = clientConnection;
}
@Override
public Response connection(Request request) {
return clientConnection.connect(request);
}
/**
* Close the client and release all connections.
*
* @throws IOException exception
*/
@Override
public void close() throws IOException {
clientConnection.close();
}
}

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.coder;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamClass;
import java.io.OutputStream;
/**
* object OutputStream
*/
public class CompactObjectOutputStream extends ObjectOutputStream {
static final int TYPE_FAT_DESCRIPTOR = 0;
static final int TYPE_THIN_DESCRIPTOR = 1;
public CompactObjectOutputStream(OutputStream out) throws IOException {
super(out);
}
@Override
protected void writeStreamHeader() throws IOException {
writeByte(STREAM_VERSION);
}
@Override
protected void writeClassDescriptor(ObjectStreamClass desc) throws IOException {
Class<?> clazz = desc.forClass();
if (clazz.isPrimitive() || clazz.isArray() || clazz.isInterface() || desc.getSerialVersionUID() == 0) {
write(TYPE_FAT_DESCRIPTOR);
super.writeClassDescriptor(desc);
} else {
write(TYPE_THIN_DESCRIPTOR);
writeUTF(desc.getName());
}
}
}

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.coder;
import cn.hippo4j.config.rpc.exception.CoderException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.serialization.ClassResolver;
import io.netty.handler.codec.serialization.ObjectDecoder;
/**
* According to the decoder for java objects implemented by ObjectDecoder,
* it is necessary to ensure that the transmitted objects can be serialized
*/
public class NettyDecoder extends ObjectDecoder {
public NettyDecoder(ClassResolver classResolver) {
super(classResolver);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) {
ByteBuf byteBuf = in.retainedDuplicate();
try {
Object o = super.decode(ctx, in);
if (o == null) {
return byteBuf;
} else {
return o;
}
} catch (Exception e) {
throw new CoderException("The encoding is abnormal, which may be caused by the failure of the transfer object to be deserialized");
}
}
}

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.coder;
import cn.hippo4j.config.rpc.exception.CoderException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.io.ObjectOutputStream;
import java.io.Serializable;
/**
* this is a encoder, For custom gluing and unpacking<br>
* {@link io.netty.handler.codec.serialization.ObjectEncoder}
*/
public class NettyEncoder extends MessageToByteEncoder<Serializable> {
private static final byte[] BYTE = new byte[4];
@Override
protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
int startIndex = out.writerIndex();
try (ByteBufOutputStream outPut = new ByteBufOutputStream(out)) {
outPut.write(BYTE);
try (ObjectOutputStream outputStream = new CompactObjectOutputStream(outPut)) {
outputStream.writeObject(msg);
outputStream.flush();
}
} catch (Exception e) {
throw new CoderException("The encoding is abnormal, which may be caused by the transfer object being unable to be serialized");
}
int endIndex = out.writerIndex();
out.setInt(startIndex, endIndex - startIndex - 4);
}
}

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.discovery;
import java.net.InetSocketAddress;
/**
* The adaptation layer of different service centers is used to know
* the host of different services through the registration center
*/
public interface DiscoveryAdapter {
/**
* get InetSocketAddress served in the registry
*
* @param name server name
* @return InetSocketAddress
*/
InetSocketAddress getSocketAddress(String name);
}

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.exception;
/**
* During decoding and encoding, if an exception occurs, an exception of type {@link CoderException} is thrown,
* which is not different from a {@link RuntimeException}, but is more explicit about the type of exception
*/
public class CoderException extends RuntimeException {
private static final long serialVersionUID = 8247610319171014183L;
public CoderException() {
super();
}
public CoderException(String message) {
super(message);
}
public CoderException(Throwable e) {
super(e.getMessage(), e);
}
public CoderException(String message, Throwable throwable) {
super(message, throwable);
}
public CoderException(String message, Throwable throwable, boolean enableSuppression, boolean writableStackTrace) {
super(message, throwable, enableSuppression, writableStackTrace);
}
}

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.exception;
/**
* If an exception occurs during the connection between the server and the client, an exception of type
* {@link ConnectionException} is thrown, which is not different from {@link RuntimeException}, but is more explicit
* about the type of exception
*/
public class ConnectionException extends RuntimeException {
private static final long serialVersionUID = 8247610319171014183L;
public ConnectionException() {
super();
}
public ConnectionException(String message) {
super(message);
}
public ConnectionException(Throwable e) {
super(e.getMessage(), e);
}
public ConnectionException(String message, Throwable throwable) {
super(message, throwable);
}
public ConnectionException(String message, Throwable throwable, boolean enableSuppression, boolean writableStackTrace) {
super(message, throwable, enableSuppression, writableStackTrace);
}
}

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.exception;
/**
* If there is a timeout between the server and the client, you will get a {@link TimeOutException},
* which is not different from {@link RuntimeException}, but it will be more explicit about the type of exception, right
*/
public class TimeOutException extends RuntimeException {
private static final long serialVersionUID = 8247610319171014183L;
public TimeOutException() {
super();
}
public TimeOutException(String message) {
super(message);
}
public TimeOutException(Throwable e) {
super(e.getMessage(), e);
}
public TimeOutException(String message, Throwable throwable) {
super(message, throwable);
}
public TimeOutException(String message, Throwable throwable, boolean enableSuppression, boolean writableStackTrace) {
super(message, throwable, enableSuppression, writableStackTrace);
}
}

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.handler;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
/**
* The handler in each connection, where the specific behavior of the connection
* must be specified, such as serialization and parsing, requesting and receiving
* requests, and so on
*/
public interface ConnectHandler {
/**
* Processing after receiving the request
*
* @param request request
*/
default Response handler(Request request) {
return null;
}
/**
* Processing after receiving Response
*
* @param response response
*/
default void handler(Response response) {
//
}
}

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.handler;
import java.io.Closeable;
/**
* Represents a network request connection and provides IO layer support
*/
public interface Connection extends Closeable {
}

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.handler;
import cn.hippo4j.config.rpc.coder.NettyDecoder;
import cn.hippo4j.config.rpc.coder.NettyEncoder;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import lombok.extern.slf4j.Slf4j;
/**
* Processing by the client connection pool handler to clean the buffer and define new connection properties
*/
@Slf4j
public class NettyClientPoolHandler implements ChannelPoolHandler {
@Override
public void channelReleased(Channel ch) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER);
log.info("The connection buffer has been emptied of data");
}
@Override
public void channelAcquired(Channel ch) {
// NO SOMETHING
}
@Override
public void channelCreated(Channel ch) {
NioSocketChannel channel = (NioSocketChannel) ch;
channel.config()
.setTcpNoDelay(false);
ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(new NettyEncoder());
ch.pipeline().addLast(new NettyClientTakeHandler());
}
}

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.handler;
import cn.hippo4j.config.rpc.exception.ConnectionException;
import cn.hippo4j.config.rpc.support.ResultHolder;
import cn.hippo4j.config.rpc.response.Response;
import cn.hippo4j.common.web.exception.IllegalException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Interconnect with the netty mediation layer
*/
public class NettyClientTakeHandler extends ChannelInboundHandlerAdapter implements ConnectHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
Response response = (Response) msg;
handler(response);
ctx.flush();
} catch (Exception e) {
ctx.close();
throw new IllegalException(e);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
Channel channel = ctx.channel();
if (channel.isActive()) {
ctx.close();
} else {
throw new ConnectionException(cause);
}
}
@Override
public void handler(Response response) {
ResultHolder.put(response.getKey(), response);
ResultHolder.wake(response.getKey());
}
}

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.handler;
import cn.hippo4j.config.rpc.exception.ConnectionException;
import cn.hippo4j.config.rpc.process.ActivePostProcess;
import cn.hippo4j.config.rpc.response.DefaultResponse;
import cn.hippo4j.config.rpc.support.ClassRegistry;
import cn.hippo4j.config.rpc.support.Instance;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.toolkit.ReflectUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.lang.reflect.Method;
import java.util.LinkedList;
import java.util.List;
/**
* netty adaptation layer
*/
public class NettyServerTakeHandler extends ChannelInboundHandlerAdapter implements ConnectHandler {
List<ActivePostProcess> processes;
Instance instance;
public NettyServerTakeHandler(List<ActivePostProcess> processes, Instance instance) {
this.processes = processes;
this.instance = instance;
}
public NettyServerTakeHandler(Instance instance) {
this(new LinkedList<>(), instance);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (!(msg instanceof Request)) {
return;
}
Request request = (Request) msg;
Response response = handler(request);
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();
if (channel.isActive()) {
ctx.close();
} else {
throw new ConnectionException(cause);
}
}
@Override
public Response handler(Request request) {
if (!preHandlers(request)) {
return null;
}
try {
Class<?> cls = ClassRegistry.get(request.getClassName());
Method method = ReflectUtil.getMethodByName(cls, request.getMethodName(), request.getParameterTypes());
Assert.notNull(method);
Object invoke = ReflectUtil.invoke(instance.getInstance(cls), method, request.getParameters());
Response response = new DefaultResponse(request.getKey(), invoke.getClass(), invoke);
postHandlers(request, response);
return response;
} catch (Exception e) {
Response response = new DefaultResponse(request.getKey(), e, e.getMessage());
afterCompletions(request, response, e);
return response;
}
}
private boolean preHandlers(Request request) {
for (ActivePostProcess process : processes) {
if (!process.preHandler(request)) {
return false;
}
}
return true;
}
private void postHandlers(Request request, Response response) {
for (ActivePostProcess process : processes) {
process.postHandler(request, response);
}
}
private void afterCompletions(Request request, Response response, Exception e) {
for (ActivePostProcess process : processes) {
process.afterCompletion(request, response, e);
}
}
}

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.process;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
/**
* Callback while the connection is in progress
*/
public interface ActivePostProcess {
/**
* Client: After establishing a connection and before passing parameters<br>
* Server: Receives parameters and performs pre-call operations<br>
*
* @param request request
* @return Whether to continue the execution. If it is a client, the returned value does not affect subsequent execution
*/
default boolean preHandler(Request request) {
return true;
}
/**
* Client: Action after receiving a response<br>
* Server: performs the operation after the call<br>
*
* @param request request
* @param response response
*/
default void postHandler(Request request, Response response) {
// NO SOMETHING
}
/**
* Called when an exception or resource is cleaned
*
* @param request request
* @param response response
* @param e Exception
*/
default void afterCompletion(Request request, Response response, Exception e) {
// NO SOMETHING
}
}

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.request;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Objects;
/**
* default request<br>
* Use the fully qualified name key of the interface and override equals and hashCode
*/
public final class DefaultRequest implements Request {
String key;
String className;
String methodName;
Class<?>[] parameterTypes;
transient Object[] parameters;
public DefaultRequest(String key, String className, String methodName, Class<?>[] parameterTypes, Object[] parameters) {
this.key = key;
this.className = className;
this.methodName = methodName;
this.parameterTypes = parameterTypes;
this.parameters = parameters;
}
@Override
public String getKey() {
return key;
}
@Override
public String getClassName() {
return className;
}
@Override
public String getMethodName() {
return methodName;
}
@Override
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
@Override
public Object[] getParameters() {
return parameters;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
DefaultRequest that = (DefaultRequest) o;
return Objects.equals(key, that.key)
&& Objects.equals(className, that.className)
&& Objects.equals(methodName, that.methodName);
}
@Override
public int hashCode() {
return Objects.hash(key, className, methodName);
}
/**
* Redefine the behavior of serialization, that is, re-acquire the initially serialized
* data from the stream and re-serialize it. Simple serialization will result in the
* loss of the field identified by transient.
*/
private void writeObject(ObjectOutputStream s) throws IOException {
s.defaultWriteObject();
if (parameters == null) {
return;
}
// 序列化属性 parameters
for (Object parameter : parameters) {
s.writeObject(parameter);
}
}
/**
* Redefine the deserialization behavior, and sequentially deserialize the data specified during
* serialization, because there is data that is not deserialized during initial deserialization,
* such as fields defined by transient
*/
private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
s.defaultReadObject();
if (parameterTypes == null) {
return;
}
// 反序列化属性 parameters
int length = parameterTypes.length;
Object[] a = new Object[length];
for (int i = 0; i < length; i++) {
a[i] = s.readObject();
}
this.parameters = a;
}
}

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.request;
import java.io.Serializable;
/**
* request
*/
public interface Request extends Serializable {
/**
* The unique identity of the current request
*/
String getKey();
/**
* The Class name of the current request
*/
String getClassName();
/**
* The Method name of the current request
*/
String getMethodName();
/**
* The parameter type of the current request
*/
Class<?>[] getParameterTypes();
/**
* The parameters of the current request
*/
Object[] getParameters();
}

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.response;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Objects;
/**
* default request<br>
* Use the fully qualified name key of the interface and override equals and hashCode
*/
public class DefaultResponse implements Response {
String key;
Class<?> cls;
transient Object obj;
Throwable throwable;
String errMsg;
public DefaultResponse(String key, Class<?> cls, Object obj, Throwable throwable, String errMsg) {
this.key = key;
this.cls = cls;
this.obj = obj;
this.throwable = throwable;
this.errMsg = errMsg;
}
public DefaultResponse(String key, Throwable throwable, String errMsg) {
this(key, null, null, throwable, errMsg);
}
public DefaultResponse(String key, Class<?> cls, Object obj) {
this(key, cls, obj, null, null);
}
@Override
public String getKey() {
return key;
}
@Override
public Class<?> getCls() {
return cls;
}
@Override
public Object getObj() {
return obj;
}
@Override
public Throwable getThrowable() {
return throwable;
}
@Override
public String getErrMsg() {
return errMsg;
}
@Override
public boolean isErr() {
return throwable != null || errMsg != null;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
DefaultResponse that = (DefaultResponse) o;
return Objects.equals(key, that.key) && Objects.equals(cls, that.cls);
}
@Override
public int hashCode() {
return Objects.hash(key, cls);
}
/**
* Redefine the behavior of serialization, that is, re-acquire the initially serialized
* data from the stream and re-serialize it. Simple serialization will result in the
* loss of the field identified by transient.
*/
private void writeObject(ObjectOutputStream s) throws IOException {
s.defaultWriteObject();
if (obj == null) {
return;
}
// 序列化属性 obj
s.writeObject(this.obj);
}
/**
* Redefine the deserialization behavior, and sequentially deserialize the data specified during
* serialization, because there is data that is not deserialized during initial deserialization,
* such as fields defined by transient
*/
private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
s.defaultReadObject();
// 反序列化属性 obj
this.obj = s.readObject();
}
}

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.response;
import java.io.Serializable;
/**
* Response
*/
public interface Response extends Serializable {
/**
* The unique identity of the current Response
*/
String getKey();
/**
* The class of the current Response, The target of deserialization
*/
Class<?> getCls();
/**
* The results of this request can be obtained, The source of deserialization
*/
Object getObj();
/**
* The Throwable of the current Response
*/
Throwable getThrowable();
/**
* the error message
*/
String getErrMsg();
/**
* Whether the current request has an error
*/
boolean isErr();
}

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.server;
import cn.hippo4j.config.rpc.coder.NettyDecoder;
import cn.hippo4j.config.rpc.coder.NettyEncoder;
import cn.hippo4j.config.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.config.rpc.process.ActivePostProcess;
import cn.hippo4j.config.rpc.support.Instance;
import cn.hippo4j.common.toolkit.Assert;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import java.util.List;
/**
* adapter to the netty server
*/
@Slf4j
public class NettyServerConnection implements ServerConnection {
Integer port;
EventLoopGroup leader;
EventLoopGroup worker;
Class<? extends ServerChannel> socketChannelCls = NioServerSocketChannel.class;
List<ActivePostProcess> processes;
Instance instance;
ChannelFuture future;
public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List<ActivePostProcess> processes, Instance instance) {
Assert.notNull(processes);
Assert.notNull(instance);
Assert.notNull(leader);
Assert.notNull(worker);
this.leader = leader;
this.worker = worker;
this.processes = processes;
this.instance = instance;
}
public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, Instance instance) {
this(leader, worker, new LinkedList<>(), instance);
}
public NettyServerConnection(List<ActivePostProcess> processes, Instance instance) {
this(new NioEventLoopGroup(), new NioEventLoopGroup(), processes, instance);
}
public NettyServerConnection(Instance instance) {
this(new NioEventLoopGroup(), new NioEventLoopGroup(), new LinkedList<>(), instance);
}
@Override
public void bind(int port) {
ServerBootstrap server = new ServerBootstrap();
server.group(leader, worker)
.channel(socketChannelCls)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(new NettyEncoder());
ch.pipeline().addLast(new NettyServerTakeHandler(processes, instance));
}
});
try {
this.future = server.bind(port);
log.info("The server is started and can receive requests. The listening port is {}", port);
this.port = port;
this.future.channel().closeFuture().sync();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
@Override
public void close() {
if (port == null) {
return;
}
leader.shutdownGracefully();
worker.shutdownGracefully();
this.future.channel().close();
log.info("The server is shut down and no more requests are received. The release port is {}", port);
}
}

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.server;
import java.io.IOException;
/**
* Server Implementation
*/
public class RPCServer implements Server {
int port;
ServerConnection serverConnection;
public RPCServer(int port, ServerConnection serverConnection) {
this.port = port;
this.serverConnection = serverConnection;
}
@Override
public void bind() {
serverConnection.bind(port);
}
/**
* Shut down the server and release the port
*/
@Override
public void close() throws IOException {
serverConnection.close();
}
}

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.server;
import java.io.Closeable;
/**
* the service for RPC, Explain the role of the service in the request
*/
public interface Server extends Closeable {
/**
* Start the server. Attempt to listen on the port and receive the request.<br>
* If the port being processed is already bound, an exception is thrown
*/
void bind();
}

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.server;
import cn.hippo4j.config.rpc.handler.Connection;
/**
* This applies to server-side connections
*/
public interface ServerConnection extends Connection {
/**
* Bind ports and process them
*/
void bind(int port);
}

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* the registration center for Client and Server
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ClassRegistry {
private static final Map<String, Class<?>> serverRegister = new ConcurrentHashMap<>();
/**
* get a Obj in Registry center <br>
*
* @param s key
* @return t element
*/
public static Class<?> get(String s) {
return serverRegister.get(s);
}
/**
* add the element to Registry Table <br>
* if the key already exists, failure, and return before the value of the key. <br>
* if success return the element
*
* @param s key
* @param cls element
* @return final mapped value
*/
public static Class<?> set(String s, Class<?> cls) {
return serverRegister.putIfAbsent(s, cls);
}
/**
* add the element to Registry Table <br>
* if the key already exists, failure, replace it
*
* @param s key
* @param cls element
*/
public static Class<?> put(String s, Class<?> cls) {
return serverRegister.put(s, cls);
}
/**
* clear
*/
public static void clear() {
serverRegister.clear();
}
}

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.common.web.exception.IllegalException;
/**
* Simply creating an instance of a class by its name and its specific type,
* and then throwing an exception if it is an interface, is not elegant
*/
public class DefaultInstance implements Instance {
@Override
public Object getInstance(Class<?> cls) {
return ReflectUtil.createInstance(cls);
}
@Override
public Object getInstance(String name) {
try {
Class<?> cls = Class.forName(name);
return getInstance(cls);
} catch (ClassNotFoundException e) {
throw new IllegalException(e);
}
}
}

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
/**
* Instance interface to get an instance
*/
public interface Instance {
/**
* get a instance
*
* @param cls Class object
* @return Information about instances created or found
*/
Object getInstance(Class<?> cls);
/**
* Gets an instance of a class with a recognizable identity,
* which can be the fully qualified name of class. It can also be a unique name in a container
*
* @param name Identifying name
* @return Information about instances created or found
*/
Object getInstance(String name);
}

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
import cn.hippo4j.config.rpc.exception.ConnectionException;
import cn.hippo4j.config.rpc.handler.NettyClientPoolHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.pool.FixedChannelPool;
import io.netty.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
/**
* This parameter applies only to the connection pool of netty
*/
@Slf4j
public class NettyConnectPool {
ChannelHealthChecker healthCheck = ChannelHealthChecker.ACTIVE;
FixedChannelPool.AcquireTimeoutAction acquireTimeoutAction = FixedChannelPool.AcquireTimeoutAction.NEW;
int maxPendingAcquires = Integer.MAX_VALUE;
ChannelPoolHandler handler = new NettyClientPoolHandler();
ChannelPool pool;
String host;
int port;
public NettyConnectPool(String host, int port, int maxConnect,
long timeout, EventLoopGroup worker,
Class<? extends Channel> socketChannelCls) {
InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(host, port);
Bootstrap bootstrap = new Bootstrap()
.group(worker)
.channel(socketChannelCls)
.remoteAddress(socketAddress);
this.host = host;
this.port = port;
this.pool = new FixedChannelPool(bootstrap, handler, healthCheck, acquireTimeoutAction,
timeout, maxConnect, maxPendingAcquires, true, true);
log.info("The connection pool is established with the connection target {}:{}", host, port);
NettyConnectPoolHolder.createPool(host, port, this);
}
public Channel acquire(long timeoutMillis) {
try {
Future<Channel> fch = pool.acquire();
return fch.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new ConnectionException("Failed to get the connection", e);
}
}
public Future<Channel> acquire() {
try {
return pool.acquire();
} catch (Exception e) {
throw new ConnectionException("Failed to get the connection", e);
}
}
public void release(Channel channel) {
try {
if (channel != null) {
pool.release(channel);
}
} catch (Exception e) {
throw new ConnectionException("Failed to release the connection", e);
}
}
public void close() {
try {
pool.close();
NettyConnectPoolHolder.remove(host, port);
} catch (Exception e) {
throw new ConnectionException("Failed to close the connection pool", e);
}
}
}

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* To avoid creating multiple connection pools for the same host:port, save all connection pools of the client
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class NettyConnectPoolHolder {
static int maxConnect = 64;
static Map<String, NettyConnectPool> connectPoolMap = new ConcurrentHashMap<>();
private static NettyConnectPool initPool(String host, int port,
long timeout, EventLoopGroup worker) {
return new NettyConnectPool(
host, port, maxConnect,
timeout, worker,
NioSocketChannel.class);
}
private static String getKey(String host, int port) {
return host + ":" + port;
}
/**
* The connection pool connectPoolMapping may already exist before the connection pool
* connectPoolMapping is established. In this case, the connection pool is directly overwritten
*
* @param host the host
* @param port the port
* @param pool This parameter applies only to the connection pool of netty
*/
public static void createPool(String host, int port, NettyConnectPool pool) {
connectPoolMap.put(getKey(host, port), pool);
}
/**
* Gets a connection pool, or null if there is no corresponding connectPoolMapping
*
* @param host the host
* @param port the port
* @return Map to the connection pool
*/
public static NettyConnectPool getPool(String host, int port) {
return connectPoolMap.get(getKey(host, port));
}
/**
* Gets a connection pool, and if there is no connectPoolMapping, creates one with the values provided and joins the connectPoolMapping
*
* @param host the host
* @param port the port
* @param timeout timeout
* @param worker Special {@link EventExecutorGroup} which allows registering {@link Channel}s
* that get processed for later selection during the event loop.
* @return Map to the connection pool
*/
public static synchronized NettyConnectPool getPool(String host, int port,
long timeout, EventLoopGroup worker) {
/*
* this cannot use the computeIfAbsent method directly here because put is already used in init.
* Details refer to https://bugs.openjdk.java.net/browse/JDK-8062841
*/
NettyConnectPool pool = getPool(host, port);
return pool == null ? initPool(host, port, timeout, worker) : pool;
}
/**
* Disconnect a connection connectPoolMapping. This must take effect at the same time as the connection pool is closed
*
* @param host host
* @param port port
*/
public static void remove(String host, int port) {
connectPoolMap.remove(getKey(host, port));
}
/**
* clear
*/
public static void clear() {
connectPoolMap.clear();
}
}

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save