diff --git a/hippo4j-core/pom.xml b/hippo4j-core/pom.xml index bd891a49..e9e05f10 100644 --- a/hippo4j-core/pom.xml +++ b/hippo4j-core/pom.xml @@ -10,6 +10,11 @@ hippo4j-core + + org.springframework.boot + spring-boot-starter-test + test + org.projectlombok lombok diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java new file mode 100644 index 00000000..cfa344a0 --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.executor; + +import cn.hippo4j.common.toolkit.ThreadUtil; +import org.junit.Assert; +import org.junit.Test; +import org.springframework.core.task.TaskDecorator; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * test for {@link DynamicThreadPoolExecutor} + */ +public class DynamicThreadPoolExecutorTest { + + @Test + public void testRedundancyHandler() { + RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy(); + + DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor( + 1, 1, 1000L, TimeUnit.MILLISECONDS, + 1000L, true, 1000L, + new ArrayBlockingQueue<>(1), "test", Thread::new, handler); + + Assert.assertEquals(handler, executor.getRedundancyHandler()); + handler = new ThreadPoolExecutor.AbortPolicy(); + executor.setRedundancyHandler(handler); + Assert.assertEquals(handler, executor.getRedundancyHandler()); + } + + @Test + public void testTaskDecorator() { + DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor( + 1, 1, 1000L, TimeUnit.MILLISECONDS, + 1000L, true, 1000L, + new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy()); + + Assert.assertNull(executor.getTaskDecorator()); + TaskDecorator decorator = runnable -> runnable; + executor.setTaskDecorator(decorator); + Assert.assertEquals(decorator, executor.getTaskDecorator()); + + decorator = runnable -> runnable; + executor.setTaskDecorator(decorator); + Assert.assertEquals(decorator, executor.getTaskDecorator()); + } + + @Test + public void testExecuteTimeOut() { + DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor( + 1, 1, 1000L, TimeUnit.MILLISECONDS, + 1000L, true, 1000L, + new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy()); + + Assert.assertEquals(1000L, executor.getExecuteTimeOut().longValue()); + executor.setExecuteTimeOut(500L); + Assert.assertEquals(500L, executor.getExecuteTimeOut().longValue()); + } + + @Test + public void testDestroyWhenWaitForTask() { + DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor( + 1, 1, 1000L, TimeUnit.MILLISECONDS, + 1000L, true, 1000L, + new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy()); + AtomicInteger count = new AtomicInteger(0); + + executor.execute(() -> { + ThreadUtil.sleep(500L); + count.incrementAndGet(); + }); + executor.execute(() -> { + ThreadUtil.sleep(500L); + count.incrementAndGet(); + }); + executor.destroy(); + + // waitting for terminated + while (!executor.isTerminated()){}; + Assert.assertEquals(2, count.get()); + } + + @Test + public void testDestroyWhenNotWaitForTask() { + DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor( + 1, 1, 1000L, TimeUnit.MILLISECONDS, + 1000L, false, 1000L, + new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy()); + AtomicInteger count = new AtomicInteger(0); + + executor.execute(() -> { + ThreadUtil.sleep(500L); + count.incrementAndGet(); + }); + executor.execute(() -> { + ThreadUtil.sleep(500L); + count.incrementAndGet(); + }); + executor.destroy(); + + // waitting for terminated + while (!executor.isTerminated()){}; + Assert.assertEquals(1, count.get()); + } + + @Test + public void testRejectCount() { + DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor( + 1, 1, 1000L, TimeUnit.MILLISECONDS, + 1000L, true, 1000L, + new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy()); + + Assert.assertEquals(0L, executor.getRejectCountNum().longValue()); + Assert.assertEquals(0L, executor.getRejectCount().get()); + + executor.submit(() -> ThreadUtil.sleep(100L)); + executor.submit(() -> ThreadUtil.sleep(100L)); + executor.submit(() -> ThreadUtil.sleep(100L)); + ThreadUtil.sleep(200L); + Assert.assertEquals(1L, executor.getRejectCountNum().longValue()); + Assert.assertEquals(1L, executor.getRejectCount().get()); + } + + @Test + public void testSupportParam() { + DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor( + 1, 1, 1000L, TimeUnit.MILLISECONDS, + 1000L, true, 1000L, + new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy()); + Assert.assertEquals(1000L, executor.getAwaitTerminationMillis()); + Assert.assertTrue(executor.isWaitForTasksToCompleteOnShutdown()); + + executor.setSupportParam(500L, false); + Assert.assertEquals(500L, executor.getAwaitTerminationMillis()); + Assert.assertFalse(executor.isWaitForTasksToCompleteOnShutdown()); + } + +} diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java new file mode 100644 index 00000000..7642adc6 --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.executor; + +import cn.hippo4j.common.toolkit.ThreadUtil; +import cn.hippo4j.core.plugin.ExecuteAwarePlugin; +import cn.hippo4j.core.plugin.RejectedAwarePlugin; +import cn.hippo4j.core.plugin.ShutdownAwarePlugin; +import cn.hippo4j.core.plugin.TaskAwarePlugin; +import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; +import cn.hippo4j.core.plugin.manager.ThreadPoolPluginManager; +import lombok.Getter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * test for {@link ExtensibleThreadPoolExecutor} + */ +public class ExtensibleThreadPoolExecutorTest { + + private final RejectedExecutionHandler originalHandler = new ThreadPoolExecutor.DiscardPolicy(); + + private ExtensibleThreadPoolExecutor executor; + + private ThreadPoolPluginManager manager; + + @Before + public void initExecutor() { + manager = new DefaultThreadPoolPluginManager(); + executor = new ExtensibleThreadPoolExecutor( + "test", manager, + 5, 5, 1000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(1), Thread::new, originalHandler); + } + + @Test + public void testGetThreadPoolId() { + Assert.assertEquals("test", executor.getThreadPoolId()); + } + + @Test + public void testGetThreadPoolExecutor() { + Assert.assertSame(executor, executor.getThreadPoolExecutor()); + } + + @Test + public void testGetThreadPoolPluginManager() { + Assert.assertSame(manager, executor.getThreadPoolPluginManager()); + } + + @Test + public void testGetOrSetRejectedHandler() { + RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); + executor.setRejectedExecutionHandler(handler); + Assert.assertSame(handler, executor.getRejectedExecutionHandler()); + } + + @Test + public void testInvokeTaskAwarePlugin() { + TestTaskAwarePlugin plugin = new TestTaskAwarePlugin(); + executor.register(plugin); + executor.submit(() -> { + }); + executor.submit(() -> true); + executor.submit(() -> { + }, false); + executor.execute(() -> { + }); + Assert.assertEquals(7, plugin.getInvokeCount().get()); + } + + @Test + public void testInvokeExecuteAwarePlugin() { + TestExecuteAwarePlugin plugin = new TestExecuteAwarePlugin(); + executor.register(plugin); + executor.execute(() -> { + }); + ThreadUtil.sleep(500L); + Assert.assertEquals(2, plugin.getInvokeCount().get()); + } + + @Test + public void testInvokeRejectedAwarePlugin() { + executor.setCorePoolSize(1); + executor.setMaximumPoolSize(1); + + TestRejectedAwarePlugin plugin = new TestRejectedAwarePlugin(); + executor.register(plugin); + // blocking pool and queue + executor.submit(() -> ThreadUtil.sleep(500L)); + executor.submit(() -> ThreadUtil.sleep(500L)); + // reject 3 tasks + executor.submit(() -> { + }); + executor.submit(() -> { + }); + executor.submit(() -> { + }); + + ThreadUtil.sleep(500L); + Assert.assertEquals(3, plugin.getInvokeCount().get()); + } + + @Test + public void testInvokeTestShutdownAwarePluginWhenShutdown() throws InterruptedException { + TestShutdownAwarePlugin plugin = new TestShutdownAwarePlugin(); + executor.register(plugin); + executor.shutdown(); + executor.submit(() -> { + throw new IllegalArgumentException("???"); + }); + if (executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) { + Assert.assertEquals(3, plugin.getInvokeCount().get()); + } + } + + @Test + public void testInvokeTestShutdownAwarePluginWhenShutdownNow() throws InterruptedException { + TestShutdownAwarePlugin plugin = new TestShutdownAwarePlugin(); + executor.register(plugin); + executor.shutdownNow(); + if (executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) { + Assert.assertEquals(3, plugin.getInvokeCount().get()); + } + } + + @Getter + private final static class TestTaskAwarePlugin implements TaskAwarePlugin { + + private final AtomicInteger invokeCount = new AtomicInteger(0); + private final String id = "TestTaskAwarePlugin"; + @Override + public Runnable beforeTaskCreate(ThreadPoolExecutor executor, Runnable runnable, V value) { + invokeCount.incrementAndGet(); + return TaskAwarePlugin.super.beforeTaskCreate(executor, runnable, value); + } + @Override + public Callable beforeTaskCreate(ThreadPoolExecutor executor, Callable future) { + invokeCount.incrementAndGet(); + return TaskAwarePlugin.super.beforeTaskCreate(executor, future); + } + @Override + public Runnable beforeTaskExecute(Runnable runnable) { + invokeCount.incrementAndGet(); + return TaskAwarePlugin.super.beforeTaskExecute(runnable); + } + } + + @Getter + private final static class TestExecuteAwarePlugin implements ExecuteAwarePlugin { + + private final AtomicInteger invokeCount = new AtomicInteger(0); + private final String id = "TestExecuteAwarePlugin"; + @Override + public void beforeExecute(Thread thread, Runnable runnable) { + invokeCount.incrementAndGet(); + ExecuteAwarePlugin.super.beforeExecute(thread, runnable); + } + @Override + public void afterExecute(Runnable runnable, Throwable throwable) { + invokeCount.incrementAndGet(); + ExecuteAwarePlugin.super.afterExecute(runnable, throwable); + } + } + + @Getter + private final static class TestRejectedAwarePlugin implements RejectedAwarePlugin { + + private final AtomicInteger invokeCount = new AtomicInteger(0); + private final String id = "TestRejectedAwarePlugin"; + @Override + public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { + invokeCount.incrementAndGet(); + } + } + + @Getter + private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin { + + private final AtomicInteger invokeCount = new AtomicInteger(0); + private final String id = "TestShutdownAwarePlugin"; + @Override + public void beforeShutdown(ThreadPoolExecutor executor) { + invokeCount.incrementAndGet(); + ShutdownAwarePlugin.super.beforeShutdown(executor); + } + @Override + public void afterShutdown(ThreadPoolExecutor executor, List remainingTasks) { + invokeCount.incrementAndGet(); + ShutdownAwarePlugin.super.afterShutdown(executor, remainingTasks); + } + @Override + public void afterTerminated(ExtensibleThreadPoolExecutor executor) { + invokeCount.incrementAndGet(); + ShutdownAwarePlugin.super.afterTerminated(executor); + } + } + +}