org.projectlombok
lombok
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java
index e5dab547..8f4d2941 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java
@@ -25,7 +25,9 @@ import lombok.NoArgsConstructor;
import org.springframework.beans.factory.DisposableBean;
import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
/**
@@ -37,10 +39,27 @@ import java.util.concurrent.ThreadPoolExecutor;
@AllArgsConstructor
public class DynamicThreadPoolWrapper implements DisposableBean {
+ /**
+ * Determine the unique identifier of the thread pool in the project.
+ *
+ * @param tenantId tenant id
+ * @param itemId project id in the team
+ * @param threadPoolId Thread pool identifier under the project
+ */
private String tenantId, itemId, threadPoolId;
+ /**
+ * Whether the thread pool has completed initialization,
+ * and whether to subscribe to server-side configuration change events.
+ *
+ * @param subscribeFlag subscription server configuration id
+ * @param initFlag initial configuration complete flag
+ */
private boolean subscribeFlag, initFlag;
+ /**
+ * Thread pool executor.
+ */
private ThreadPoolExecutor executor;
public DynamicThreadPoolWrapper(String threadPoolId) {
@@ -53,14 +72,62 @@ public class DynamicThreadPoolWrapper implements DisposableBean {
this.subscribeFlag = true;
}
+ /**
+ * Executes the given task sometime in the future. The task
+ * may execute in a new thread or in an existing pooled thread.
+ *
+ * If the task cannot be submitted for execution, either because this
+ * executor has been shutdown or because its capacity has been reached,
+ * the task is handled by the current {@code RejectedExecutionHandler}.
+ *
+ * @param command the task to execute
+ * @throws RejectedExecutionException at discretion of
+ * {@code RejectedExecutionHandler}, if the task
+ * cannot be accepted for execution
+ * @throws NullPointerException if {@code command} is null
+ */
public void execute(Runnable command) {
executor.execute(command);
}
+ /**
+ * Submits a Runnable task for execution and returns a Future
+ * representing that task. The Future's {@code get} method will
+ * return {@code null} upon successful completion.
+ *
+ * @param task the task to submit
+ * @return a Future representing pending completion of the task
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
+ * @throws NullPointerException if the task is null
+ */
public Future> submit(Runnable task) {
return executor.submit(task);
}
+ /**
+ * Submits a value-returning task for execution and returns a
+ * Future representing the pending results of the task. The
+ * Future's {@code get} method will return the task's result upon
+ * successful completion.
+ *
+ *
+ * If you would like to immediately block waiting
+ * for a task, you can use constructions of the form
+ * {@code result = exec.submit(aCallable).get();}
+ *
+ *
Note: The {@link Executors} class includes a set of methods
+ * that can convert some other common closure-like objects,
+ * for example, {@link java.security.PrivilegedAction} to
+ * {@link Callable} form so they can be submitted.
+ *
+ * @param task the task to submit
+ * @param the type of the task's result
+ * @return a Future representing pending completion of the task
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
+ * @throws NullPointerException if the task is null
+ */
public Future submit(Callable task) {
return executor.submit(task);
}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/AbstractTaskTimerPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/AbstractTaskTimerPlugin.java
new file mode 100644
index 00000000..4a42fef3
--- /dev/null
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/AbstractTaskTimerPlugin.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.core.plugin.impl;
+
+import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
+import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
+import cn.hippo4j.core.toolkit.SystemClock;
+
+import java.util.Optional;
+
+/**
+ * An abstract task execution time recording plugin
+ * for thread-safe statistics the execution time of tasks.
+ *
+ *
Must override {@link #processTaskTime} to define the processing logic for task execution time.
+ * Default time precision is milliseconds, may override {@link #currentTime} to redefine the time precision.
+ *
+ * @see TaskTimeRecordPlugin
+ * @see TaskTimeoutNotifyAlarmPlugin
+ */
+public abstract class AbstractTaskTimerPlugin implements ExecuteAwarePlugin {
+
+ /**
+ * start times of executed tasks
+ */
+ private final ThreadLocal startTimes = new ThreadLocal<>();
+
+ /**
+ * Record the time when the worker thread starts executing the task.
+ *
+ * @param thread thread of executing task
+ * @param runnable task
+ * @see ExtensibleThreadPoolExecutor#beforeExecute
+ */
+ @Override
+ public final void beforeExecute(Thread thread, Runnable runnable) {
+ startTimes.set(currentTime());
+ }
+
+ /**
+ * Record the total time for the worker thread to complete the task, and update the time record.
+ *
+ * @param runnable runnable
+ * @param throwable exception thrown during execution
+ */
+ @Override
+ public final void afterExecute(Runnable runnable, Throwable throwable) {
+ try {
+ Optional.ofNullable(startTimes.get())
+ .map(startTime -> currentTime() - startTime)
+ .ifPresent(this::processTaskTime);
+ } finally {
+ startTimes.remove();
+ }
+ }
+
+ /**
+ * Get the current time.
+ *
+ * @return current time
+ */
+ protected long currentTime() {
+ return SystemClock.now();
+ }
+
+ /**
+ * Processing the execution time of the task.
+ *
+ * @param taskExecuteTime execute time of task
+ */
+ protected abstract void processTaskTime(long taskExecuteTime);
+
+}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java
index 3ce5c5ad..0bb029c2 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java
@@ -17,25 +17,19 @@
package cn.hippo4j.core.plugin.impl;
-import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
-import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.plugin.PluginRuntime;
-import cn.hippo4j.core.toolkit.SystemClock;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Record task execution time indicator.
- *
- * @see TaskTimeoutNotifyAlarmPlugin
*/
@RequiredArgsConstructor
-public class TaskTimeRecordPlugin implements ExecuteAwarePlugin {
+public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin {
public static final String PLUGIN_NAME = "task-time-record-plugin";
@@ -74,23 +68,6 @@ public class TaskTimeRecordPlugin implements ExecuteAwarePlugin {
return PLUGIN_NAME;
}
- /**
- * start times of executed tasks
- */
- private final ThreadLocal startTimes = new ThreadLocal<>();
-
- /**
- * Record the time when the worker thread starts executing the task.
- *
- * @param thread thread of executing task
- * @param runnable task
- * @see ExtensibleThreadPoolExecutor#beforeExecute
- */
- @Override
- public void beforeExecute(Thread thread, Runnable runnable) {
- startTimes.set(SystemClock.now());
- }
-
/**
* Get plugin runtime info.
*
@@ -107,44 +84,25 @@ public class TaskTimeRecordPlugin implements ExecuteAwarePlugin {
.addInfo("avgTaskTime", summary.getAvgTaskTimeMillis() + "ms");
}
- /**
- * Record the total time for the worker thread to complete the task, and update the time record.
- *
- * @param runnable runnable
- * @param throwable exception thrown during execution
- */
- @Override
- public void afterExecute(Runnable runnable, Throwable throwable) {
- try {
- Long startTime = startTimes.get();
- if (Objects.isNull(startTime)) {
- return;
- }
- long executeTime = SystemClock.now() - startTime;
- recordTaskTime(executeTime);
- } finally {
- startTimes.remove();
- }
- }
-
/**
* Refresh time indicators of the current instance.
*
- * @param taskExecutionTime millisecond
+ * @param taskExecuteTime execute time of task
*/
- protected void recordTaskTime(long taskExecutionTime) {
+ @Override
+ protected void processTaskTime(long taskExecuteTime) {
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
if (taskCount == 0) {
- maxTaskTimeMillis = taskExecutionTime;
- minTaskTimeMillis = taskExecutionTime;
+ maxTaskTimeMillis = taskExecuteTime;
+ minTaskTimeMillis = taskExecuteTime;
} else {
- maxTaskTimeMillis = Math.max(taskExecutionTime, maxTaskTimeMillis);
- minTaskTimeMillis = Math.min(taskExecutionTime, minTaskTimeMillis);
+ maxTaskTimeMillis = Math.max(taskExecuteTime, maxTaskTimeMillis);
+ minTaskTimeMillis = Math.min(taskExecuteTime, minTaskTimeMillis);
}
taskCount = taskCount + 1;
- totalTaskTimeMillis += taskExecutionTime;
+ totalTaskTimeMillis += taskExecuteTime;
} finally {
writeLock.unlock();
}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java
index 12e522db..4a566f17 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java
@@ -27,11 +27,10 @@ import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
/**
- * Record task execution time indicator,
- * and send alarm notification when the execution time exceeds the threshold.
+ * Send alarm notification when the execution time exceeds the threshold.
*/
@AllArgsConstructor
-public class TaskTimeoutNotifyAlarmPlugin extends TaskTimeRecordPlugin {
+public class TaskTimeoutNotifyAlarmPlugin extends AbstractTaskTimerPlugin {
public static final String PLUGIN_NAME = "task-timeout-notify-alarm-plugin";
@@ -40,6 +39,15 @@ public class TaskTimeoutNotifyAlarmPlugin extends TaskTimeRecordPlugin {
*/
private final String threadPoolId;
+ @Getter
+ @Setter
+ private Long executeTimeOut;
+
+ /**
+ * thread-pool
+ */
+ private final ThreadPoolExecutor threadPoolExecutor;
+
/**
* Get id.
*
@@ -50,30 +58,21 @@ public class TaskTimeoutNotifyAlarmPlugin extends TaskTimeRecordPlugin {
return PLUGIN_NAME;
}
- @Getter
- @Setter
- private Long executeTimeOut;
-
- /**
- * thread-pool
- */
- private final ThreadPoolExecutor threadPoolExecutor;
-
/**
* Check whether the task execution time exceeds {@link #executeTimeOut},
* if it exceeds this time, send an alarm notification.
*
- * @param executeTime executeTime in nanosecond
+ * @param taskExecuteTime execute time of task
*/
@Override
- protected void recordTaskTime(long executeTime) {
- super.recordTaskTime(executeTime);
- if (executeTime <= executeTimeOut) {
+ protected void processTaskTime(long taskExecuteTime) {
+ if (taskExecuteTime <= executeTimeOut) {
return;
}
Optional.ofNullable(ApplicationContextHolder.getInstance())
.map(context -> context.getBean(ThreadPoolNotifyAlarmHandler.class))
.ifPresent(handler -> handler.asyncSendExecuteTimeOutAlarm(
- threadPoolId, executeTime, executeTimeOut, threadPoolExecutor));
+ threadPoolId, taskExecuteTime, executeTimeOut, threadPoolExecutor));
}
+
}
diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java
new file mode 100644
index 00000000..63d362af
--- /dev/null
+++ b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.core.executor;
+
+import cn.hippo4j.common.toolkit.ThreadUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.core.task.TaskDecorator;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * test for {@link DynamicThreadPoolExecutor}
+ */
+public class DynamicThreadPoolExecutorTest {
+
+ @Test
+ public void testRedundancyHandler() {
+ RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
+
+ DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
+ 1, 1, 1000L, TimeUnit.MILLISECONDS,
+ 1000L, true, 1000L,
+ new ArrayBlockingQueue<>(1), "test", Thread::new, handler);
+
+ Assert.assertEquals(handler, executor.getRedundancyHandler());
+ handler = new ThreadPoolExecutor.AbortPolicy();
+ executor.setRedundancyHandler(handler);
+ Assert.assertEquals(handler, executor.getRedundancyHandler());
+ }
+
+ @Test
+ public void testTaskDecorator() {
+ DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
+ 1, 1, 1000L, TimeUnit.MILLISECONDS,
+ 1000L, true, 1000L,
+ new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
+
+ Assert.assertNull(executor.getTaskDecorator());
+ TaskDecorator decorator = runnable -> runnable;
+ executor.setTaskDecorator(decorator);
+ Assert.assertEquals(decorator, executor.getTaskDecorator());
+
+ decorator = runnable -> runnable;
+ executor.setTaskDecorator(decorator);
+ Assert.assertEquals(decorator, executor.getTaskDecorator());
+ }
+
+ @Test
+ public void testExecuteTimeOut() {
+ DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
+ 1, 1, 1000L, TimeUnit.MILLISECONDS,
+ 1000L, true, 1000L,
+ new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
+
+ Assert.assertEquals(1000L, executor.getExecuteTimeOut().longValue());
+ executor.setExecuteTimeOut(500L);
+ Assert.assertEquals(500L, executor.getExecuteTimeOut().longValue());
+ }
+
+ @Test
+ public void testDestroyWhenWaitForTask() {
+ DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
+ 1, 1, 1000L, TimeUnit.MILLISECONDS,
+ 1000L, true, 1000L,
+ new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
+ AtomicInteger count = new AtomicInteger(0);
+
+ executor.execute(() -> {
+ ThreadUtil.sleep(500L);
+ count.incrementAndGet();
+ });
+ executor.execute(() -> {
+ ThreadUtil.sleep(500L);
+ count.incrementAndGet();
+ });
+ executor.destroy();
+
+ // waitting for terminated
+ while (!executor.isTerminated()) {
+ } ;
+ Assert.assertEquals(2, count.get());
+ }
+
+ @Test
+ public void testDestroyWhenNotWaitForTask() {
+ DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
+ 1, 1, 1000L, TimeUnit.MILLISECONDS,
+ 1000L, false, 1000L,
+ new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
+ AtomicInteger count = new AtomicInteger(0);
+
+ executor.execute(() -> {
+ ThreadUtil.sleep(500L);
+ count.incrementAndGet();
+ });
+ executor.execute(() -> {
+ ThreadUtil.sleep(500L);
+ count.incrementAndGet();
+ });
+ executor.destroy();
+
+ // waitting for terminated
+ while (!executor.isTerminated()) {
+ } ;
+ Assert.assertEquals(1, count.get());
+ }
+
+ @Test
+ public void testRejectCount() {
+ DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
+ 1, 1, 1000L, TimeUnit.MILLISECONDS,
+ 1000L, true, 1000L,
+ new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
+
+ Assert.assertEquals(0L, executor.getRejectCountNum().longValue());
+ Assert.assertEquals(0L, executor.getRejectCount().get());
+
+ executor.submit(() -> ThreadUtil.sleep(100L));
+ executor.submit(() -> ThreadUtil.sleep(100L));
+ executor.submit(() -> ThreadUtil.sleep(100L));
+ ThreadUtil.sleep(200L);
+ Assert.assertEquals(1L, executor.getRejectCountNum().longValue());
+ Assert.assertEquals(1L, executor.getRejectCount().get());
+ }
+
+ @Test
+ public void testSupportParam() {
+ DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
+ 1, 1, 1000L, TimeUnit.MILLISECONDS,
+ 1000L, true, 1000L,
+ new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
+ Assert.assertEquals(1000L, executor.getAwaitTerminationMillis());
+ Assert.assertTrue(executor.isWaitForTasksToCompleteOnShutdown());
+
+ executor.setSupportParam(500L, false);
+ Assert.assertEquals(500L, executor.getAwaitTerminationMillis());
+ Assert.assertFalse(executor.isWaitForTasksToCompleteOnShutdown());
+ }
+
+}
diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java
new file mode 100644
index 00000000..7642adc6
--- /dev/null
+++ b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.core.executor;
+
+import cn.hippo4j.common.toolkit.ThreadUtil;
+import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
+import cn.hippo4j.core.plugin.RejectedAwarePlugin;
+import cn.hippo4j.core.plugin.ShutdownAwarePlugin;
+import cn.hippo4j.core.plugin.TaskAwarePlugin;
+import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
+import cn.hippo4j.core.plugin.manager.ThreadPoolPluginManager;
+import lombok.Getter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * test for {@link ExtensibleThreadPoolExecutor}
+ */
+public class ExtensibleThreadPoolExecutorTest {
+
+ private final RejectedExecutionHandler originalHandler = new ThreadPoolExecutor.DiscardPolicy();
+
+ private ExtensibleThreadPoolExecutor executor;
+
+ private ThreadPoolPluginManager manager;
+
+ @Before
+ public void initExecutor() {
+ manager = new DefaultThreadPoolPluginManager();
+ executor = new ExtensibleThreadPoolExecutor(
+ "test", manager,
+ 5, 5, 1000L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(1), Thread::new, originalHandler);
+ }
+
+ @Test
+ public void testGetThreadPoolId() {
+ Assert.assertEquals("test", executor.getThreadPoolId());
+ }
+
+ @Test
+ public void testGetThreadPoolExecutor() {
+ Assert.assertSame(executor, executor.getThreadPoolExecutor());
+ }
+
+ @Test
+ public void testGetThreadPoolPluginManager() {
+ Assert.assertSame(manager, executor.getThreadPoolPluginManager());
+ }
+
+ @Test
+ public void testGetOrSetRejectedHandler() {
+ RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
+ executor.setRejectedExecutionHandler(handler);
+ Assert.assertSame(handler, executor.getRejectedExecutionHandler());
+ }
+
+ @Test
+ public void testInvokeTaskAwarePlugin() {
+ TestTaskAwarePlugin plugin = new TestTaskAwarePlugin();
+ executor.register(plugin);
+ executor.submit(() -> {
+ });
+ executor.submit(() -> true);
+ executor.submit(() -> {
+ }, false);
+ executor.execute(() -> {
+ });
+ Assert.assertEquals(7, plugin.getInvokeCount().get());
+ }
+
+ @Test
+ public void testInvokeExecuteAwarePlugin() {
+ TestExecuteAwarePlugin plugin = new TestExecuteAwarePlugin();
+ executor.register(plugin);
+ executor.execute(() -> {
+ });
+ ThreadUtil.sleep(500L);
+ Assert.assertEquals(2, plugin.getInvokeCount().get());
+ }
+
+ @Test
+ public void testInvokeRejectedAwarePlugin() {
+ executor.setCorePoolSize(1);
+ executor.setMaximumPoolSize(1);
+
+ TestRejectedAwarePlugin plugin = new TestRejectedAwarePlugin();
+ executor.register(plugin);
+ // blocking pool and queue
+ executor.submit(() -> ThreadUtil.sleep(500L));
+ executor.submit(() -> ThreadUtil.sleep(500L));
+ // reject 3 tasks
+ executor.submit(() -> {
+ });
+ executor.submit(() -> {
+ });
+ executor.submit(() -> {
+ });
+
+ ThreadUtil.sleep(500L);
+ Assert.assertEquals(3, plugin.getInvokeCount().get());
+ }
+
+ @Test
+ public void testInvokeTestShutdownAwarePluginWhenShutdown() throws InterruptedException {
+ TestShutdownAwarePlugin plugin = new TestShutdownAwarePlugin();
+ executor.register(plugin);
+ executor.shutdown();
+ executor.submit(() -> {
+ throw new IllegalArgumentException("???");
+ });
+ if (executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
+ Assert.assertEquals(3, plugin.getInvokeCount().get());
+ }
+ }
+
+ @Test
+ public void testInvokeTestShutdownAwarePluginWhenShutdownNow() throws InterruptedException {
+ TestShutdownAwarePlugin plugin = new TestShutdownAwarePlugin();
+ executor.register(plugin);
+ executor.shutdownNow();
+ if (executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
+ Assert.assertEquals(3, plugin.getInvokeCount().get());
+ }
+ }
+
+ @Getter
+ private final static class TestTaskAwarePlugin implements TaskAwarePlugin {
+
+ private final AtomicInteger invokeCount = new AtomicInteger(0);
+ private final String id = "TestTaskAwarePlugin";
+ @Override
+ public Runnable beforeTaskCreate(ThreadPoolExecutor executor, Runnable runnable, V value) {
+ invokeCount.incrementAndGet();
+ return TaskAwarePlugin.super.beforeTaskCreate(executor, runnable, value);
+ }
+ @Override
+ public Callable beforeTaskCreate(ThreadPoolExecutor executor, Callable future) {
+ invokeCount.incrementAndGet();
+ return TaskAwarePlugin.super.beforeTaskCreate(executor, future);
+ }
+ @Override
+ public Runnable beforeTaskExecute(Runnable runnable) {
+ invokeCount.incrementAndGet();
+ return TaskAwarePlugin.super.beforeTaskExecute(runnable);
+ }
+ }
+
+ @Getter
+ private final static class TestExecuteAwarePlugin implements ExecuteAwarePlugin {
+
+ private final AtomicInteger invokeCount = new AtomicInteger(0);
+ private final String id = "TestExecuteAwarePlugin";
+ @Override
+ public void beforeExecute(Thread thread, Runnable runnable) {
+ invokeCount.incrementAndGet();
+ ExecuteAwarePlugin.super.beforeExecute(thread, runnable);
+ }
+ @Override
+ public void afterExecute(Runnable runnable, Throwable throwable) {
+ invokeCount.incrementAndGet();
+ ExecuteAwarePlugin.super.afterExecute(runnable, throwable);
+ }
+ }
+
+ @Getter
+ private final static class TestRejectedAwarePlugin implements RejectedAwarePlugin {
+
+ private final AtomicInteger invokeCount = new AtomicInteger(0);
+ private final String id = "TestRejectedAwarePlugin";
+ @Override
+ public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
+ invokeCount.incrementAndGet();
+ }
+ }
+
+ @Getter
+ private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin {
+
+ private final AtomicInteger invokeCount = new AtomicInteger(0);
+ private final String id = "TestShutdownAwarePlugin";
+ @Override
+ public void beforeShutdown(ThreadPoolExecutor executor) {
+ invokeCount.incrementAndGet();
+ ShutdownAwarePlugin.super.beforeShutdown(executor);
+ }
+ @Override
+ public void afterShutdown(ThreadPoolExecutor executor, List remainingTasks) {
+ invokeCount.incrementAndGet();
+ ShutdownAwarePlugin.super.afterShutdown(executor, remainingTasks);
+ }
+ @Override
+ public void afterTerminated(ExtensibleThreadPoolExecutor executor) {
+ invokeCount.incrementAndGet();
+ ShutdownAwarePlugin.super.afterTerminated(executor);
+ }
+ }
+
+}
diff --git a/hippo4j-example/hippo4j-example-core/pom.xml b/hippo4j-example/hippo4j-example-core/pom.xml
index 50ed2a3d..2628dbd6 100644
--- a/hippo4j-example/hippo4j-example-core/pom.xml
+++ b/hippo4j-example/hippo4j-example-core/pom.xml
@@ -34,5 +34,20 @@
com.alibaba
transmittable-thread-local
+
+