From a8471718951f7c1000c515f8c74f43581d604471 Mon Sep 17 00:00:00 2001
From: huangchengxing <841396397@qq.com>
Date: Tue, 1 Nov 2022 17:19:22 +0800
Subject: [PATCH] test: Add unit test for DynamicThreadPoolExecutor and
ExtensibleThreadPoolExecutor
---
hippo4j-core/pom.xml | 5 +
.../DynamicThreadPoolExecutorTest.java | 158 +++++++++++++
.../ExtensibleThreadPoolExecutorTest.java | 219 ++++++++++++++++++
3 files changed, 382 insertions(+)
create mode 100644 hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java
create mode 100644 hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java
diff --git a/hippo4j-core/pom.xml b/hippo4j-core/pom.xml
index bd891a49..e9e05f10 100644
--- a/hippo4j-core/pom.xml
+++ b/hippo4j-core/pom.xml
@@ -10,6 +10,11 @@
hippo4j-core
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
org.projectlombok
lombok
diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java
new file mode 100644
index 00000000..cfa344a0
--- /dev/null
+++ b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.core.executor;
+
+import cn.hippo4j.common.toolkit.ThreadUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.core.task.TaskDecorator;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * test for {@link DynamicThreadPoolExecutor}
+ */
+public class DynamicThreadPoolExecutorTest {
+
+ @Test
+ public void testRedundancyHandler() {
+ RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
+
+ DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
+ 1, 1, 1000L, TimeUnit.MILLISECONDS,
+ 1000L, true, 1000L,
+ new ArrayBlockingQueue<>(1), "test", Thread::new, handler);
+
+ Assert.assertEquals(handler, executor.getRedundancyHandler());
+ handler = new ThreadPoolExecutor.AbortPolicy();
+ executor.setRedundancyHandler(handler);
+ Assert.assertEquals(handler, executor.getRedundancyHandler());
+ }
+
+ @Test
+ public void testTaskDecorator() {
+ DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
+ 1, 1, 1000L, TimeUnit.MILLISECONDS,
+ 1000L, true, 1000L,
+ new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
+
+ Assert.assertNull(executor.getTaskDecorator());
+ TaskDecorator decorator = runnable -> runnable;
+ executor.setTaskDecorator(decorator);
+ Assert.assertEquals(decorator, executor.getTaskDecorator());
+
+ decorator = runnable -> runnable;
+ executor.setTaskDecorator(decorator);
+ Assert.assertEquals(decorator, executor.getTaskDecorator());
+ }
+
+ @Test
+ public void testExecuteTimeOut() {
+ DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
+ 1, 1, 1000L, TimeUnit.MILLISECONDS,
+ 1000L, true, 1000L,
+ new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
+
+ Assert.assertEquals(1000L, executor.getExecuteTimeOut().longValue());
+ executor.setExecuteTimeOut(500L);
+ Assert.assertEquals(500L, executor.getExecuteTimeOut().longValue());
+ }
+
+ @Test
+ public void testDestroyWhenWaitForTask() {
+ DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
+ 1, 1, 1000L, TimeUnit.MILLISECONDS,
+ 1000L, true, 1000L,
+ new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
+ AtomicInteger count = new AtomicInteger(0);
+
+ executor.execute(() -> {
+ ThreadUtil.sleep(500L);
+ count.incrementAndGet();
+ });
+ executor.execute(() -> {
+ ThreadUtil.sleep(500L);
+ count.incrementAndGet();
+ });
+ executor.destroy();
+
+ // waitting for terminated
+ while (!executor.isTerminated()){};
+ Assert.assertEquals(2, count.get());
+ }
+
+ @Test
+ public void testDestroyWhenNotWaitForTask() {
+ DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
+ 1, 1, 1000L, TimeUnit.MILLISECONDS,
+ 1000L, false, 1000L,
+ new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
+ AtomicInteger count = new AtomicInteger(0);
+
+ executor.execute(() -> {
+ ThreadUtil.sleep(500L);
+ count.incrementAndGet();
+ });
+ executor.execute(() -> {
+ ThreadUtil.sleep(500L);
+ count.incrementAndGet();
+ });
+ executor.destroy();
+
+ // waitting for terminated
+ while (!executor.isTerminated()){};
+ Assert.assertEquals(1, count.get());
+ }
+
+ @Test
+ public void testRejectCount() {
+ DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
+ 1, 1, 1000L, TimeUnit.MILLISECONDS,
+ 1000L, true, 1000L,
+ new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
+
+ Assert.assertEquals(0L, executor.getRejectCountNum().longValue());
+ Assert.assertEquals(0L, executor.getRejectCount().get());
+
+ executor.submit(() -> ThreadUtil.sleep(100L));
+ executor.submit(() -> ThreadUtil.sleep(100L));
+ executor.submit(() -> ThreadUtil.sleep(100L));
+ ThreadUtil.sleep(200L);
+ Assert.assertEquals(1L, executor.getRejectCountNum().longValue());
+ Assert.assertEquals(1L, executor.getRejectCount().get());
+ }
+
+ @Test
+ public void testSupportParam() {
+ DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
+ 1, 1, 1000L, TimeUnit.MILLISECONDS,
+ 1000L, true, 1000L,
+ new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
+ Assert.assertEquals(1000L, executor.getAwaitTerminationMillis());
+ Assert.assertTrue(executor.isWaitForTasksToCompleteOnShutdown());
+
+ executor.setSupportParam(500L, false);
+ Assert.assertEquals(500L, executor.getAwaitTerminationMillis());
+ Assert.assertFalse(executor.isWaitForTasksToCompleteOnShutdown());
+ }
+
+}
diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java
new file mode 100644
index 00000000..7642adc6
--- /dev/null
+++ b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.core.executor;
+
+import cn.hippo4j.common.toolkit.ThreadUtil;
+import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
+import cn.hippo4j.core.plugin.RejectedAwarePlugin;
+import cn.hippo4j.core.plugin.ShutdownAwarePlugin;
+import cn.hippo4j.core.plugin.TaskAwarePlugin;
+import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
+import cn.hippo4j.core.plugin.manager.ThreadPoolPluginManager;
+import lombok.Getter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * test for {@link ExtensibleThreadPoolExecutor}
+ */
+public class ExtensibleThreadPoolExecutorTest {
+
+ private final RejectedExecutionHandler originalHandler = new ThreadPoolExecutor.DiscardPolicy();
+
+ private ExtensibleThreadPoolExecutor executor;
+
+ private ThreadPoolPluginManager manager;
+
+ @Before
+ public void initExecutor() {
+ manager = new DefaultThreadPoolPluginManager();
+ executor = new ExtensibleThreadPoolExecutor(
+ "test", manager,
+ 5, 5, 1000L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(1), Thread::new, originalHandler);
+ }
+
+ @Test
+ public void testGetThreadPoolId() {
+ Assert.assertEquals("test", executor.getThreadPoolId());
+ }
+
+ @Test
+ public void testGetThreadPoolExecutor() {
+ Assert.assertSame(executor, executor.getThreadPoolExecutor());
+ }
+
+ @Test
+ public void testGetThreadPoolPluginManager() {
+ Assert.assertSame(manager, executor.getThreadPoolPluginManager());
+ }
+
+ @Test
+ public void testGetOrSetRejectedHandler() {
+ RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
+ executor.setRejectedExecutionHandler(handler);
+ Assert.assertSame(handler, executor.getRejectedExecutionHandler());
+ }
+
+ @Test
+ public void testInvokeTaskAwarePlugin() {
+ TestTaskAwarePlugin plugin = new TestTaskAwarePlugin();
+ executor.register(plugin);
+ executor.submit(() -> {
+ });
+ executor.submit(() -> true);
+ executor.submit(() -> {
+ }, false);
+ executor.execute(() -> {
+ });
+ Assert.assertEquals(7, plugin.getInvokeCount().get());
+ }
+
+ @Test
+ public void testInvokeExecuteAwarePlugin() {
+ TestExecuteAwarePlugin plugin = new TestExecuteAwarePlugin();
+ executor.register(plugin);
+ executor.execute(() -> {
+ });
+ ThreadUtil.sleep(500L);
+ Assert.assertEquals(2, plugin.getInvokeCount().get());
+ }
+
+ @Test
+ public void testInvokeRejectedAwarePlugin() {
+ executor.setCorePoolSize(1);
+ executor.setMaximumPoolSize(1);
+
+ TestRejectedAwarePlugin plugin = new TestRejectedAwarePlugin();
+ executor.register(plugin);
+ // blocking pool and queue
+ executor.submit(() -> ThreadUtil.sleep(500L));
+ executor.submit(() -> ThreadUtil.sleep(500L));
+ // reject 3 tasks
+ executor.submit(() -> {
+ });
+ executor.submit(() -> {
+ });
+ executor.submit(() -> {
+ });
+
+ ThreadUtil.sleep(500L);
+ Assert.assertEquals(3, plugin.getInvokeCount().get());
+ }
+
+ @Test
+ public void testInvokeTestShutdownAwarePluginWhenShutdown() throws InterruptedException {
+ TestShutdownAwarePlugin plugin = new TestShutdownAwarePlugin();
+ executor.register(plugin);
+ executor.shutdown();
+ executor.submit(() -> {
+ throw new IllegalArgumentException("???");
+ });
+ if (executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
+ Assert.assertEquals(3, plugin.getInvokeCount().get());
+ }
+ }
+
+ @Test
+ public void testInvokeTestShutdownAwarePluginWhenShutdownNow() throws InterruptedException {
+ TestShutdownAwarePlugin plugin = new TestShutdownAwarePlugin();
+ executor.register(plugin);
+ executor.shutdownNow();
+ if (executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
+ Assert.assertEquals(3, plugin.getInvokeCount().get());
+ }
+ }
+
+ @Getter
+ private final static class TestTaskAwarePlugin implements TaskAwarePlugin {
+
+ private final AtomicInteger invokeCount = new AtomicInteger(0);
+ private final String id = "TestTaskAwarePlugin";
+ @Override
+ public Runnable beforeTaskCreate(ThreadPoolExecutor executor, Runnable runnable, V value) {
+ invokeCount.incrementAndGet();
+ return TaskAwarePlugin.super.beforeTaskCreate(executor, runnable, value);
+ }
+ @Override
+ public Callable beforeTaskCreate(ThreadPoolExecutor executor, Callable future) {
+ invokeCount.incrementAndGet();
+ return TaskAwarePlugin.super.beforeTaskCreate(executor, future);
+ }
+ @Override
+ public Runnable beforeTaskExecute(Runnable runnable) {
+ invokeCount.incrementAndGet();
+ return TaskAwarePlugin.super.beforeTaskExecute(runnable);
+ }
+ }
+
+ @Getter
+ private final static class TestExecuteAwarePlugin implements ExecuteAwarePlugin {
+
+ private final AtomicInteger invokeCount = new AtomicInteger(0);
+ private final String id = "TestExecuteAwarePlugin";
+ @Override
+ public void beforeExecute(Thread thread, Runnable runnable) {
+ invokeCount.incrementAndGet();
+ ExecuteAwarePlugin.super.beforeExecute(thread, runnable);
+ }
+ @Override
+ public void afterExecute(Runnable runnable, Throwable throwable) {
+ invokeCount.incrementAndGet();
+ ExecuteAwarePlugin.super.afterExecute(runnable, throwable);
+ }
+ }
+
+ @Getter
+ private final static class TestRejectedAwarePlugin implements RejectedAwarePlugin {
+
+ private final AtomicInteger invokeCount = new AtomicInteger(0);
+ private final String id = "TestRejectedAwarePlugin";
+ @Override
+ public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
+ invokeCount.incrementAndGet();
+ }
+ }
+
+ @Getter
+ private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin {
+
+ private final AtomicInteger invokeCount = new AtomicInteger(0);
+ private final String id = "TestShutdownAwarePlugin";
+ @Override
+ public void beforeShutdown(ThreadPoolExecutor executor) {
+ invokeCount.incrementAndGet();
+ ShutdownAwarePlugin.super.beforeShutdown(executor);
+ }
+ @Override
+ public void afterShutdown(ThreadPoolExecutor executor, List remainingTasks) {
+ invokeCount.incrementAndGet();
+ ShutdownAwarePlugin.super.afterShutdown(executor, remainingTasks);
+ }
+ @Override
+ public void afterTerminated(ExtensibleThreadPoolExecutor executor) {
+ invokeCount.incrementAndGet();
+ ShutdownAwarePlugin.super.afterTerminated(executor);
+ }
+ }
+
+}