diff --git a/hippo4j-core/pom.xml b/hippo4j-core/pom.xml index bd891a49..e9e05f10 100644 --- a/hippo4j-core/pom.xml +++ b/hippo4j-core/pom.xml @@ -10,6 +10,11 @@ hippo4j-core + + org.springframework.boot + spring-boot-starter-test + test + org.projectlombok lombok diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java new file mode 100644 index 00000000..75f27875 --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.executor; + +import cn.hippo4j.common.toolkit.ThreadUtil; +import org.junit.Assert; +import org.junit.Test; +import org.springframework.core.task.TaskDecorator; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * test for {@link DynamicThreadPoolExecutor} + */ +public class DynamicThreadPoolExecutorTest { + + @Test + public void testRedundancyHandler() { + RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy(); + + DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor( + 1, 1, 1000L, TimeUnit.MILLISECONDS, + 1000L, true, 1000L, + new ArrayBlockingQueue<>(1), "test", Thread::new, handler); + + Assert.assertEquals(handler, executor.getRedundancyHandler()); + handler = new ThreadPoolExecutor.AbortPolicy(); + executor.setRedundancyHandler(handler); + Assert.assertEquals(handler, executor.getRedundancyHandler()); + } + + @Test + public void testTaskDecorator() { + DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor( + 1, 1, 1000L, TimeUnit.MILLISECONDS, + 1000L, true, 1000L, + new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy()); + + Assert.assertNull(executor.getTaskDecorator()); + TaskDecorator decorator = runnable -> runnable; + executor.setTaskDecorator(decorator); + Assert.assertEquals(decorator, executor.getTaskDecorator()); + + decorator = runnable -> runnable; + executor.setTaskDecorator(decorator); + Assert.assertEquals(decorator, executor.getTaskDecorator()); + } + + @Test + public void testExecuteTimeOut() { + DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor( + 1, 1, 1000L, TimeUnit.MILLISECONDS, + 1000L, true, 1000L, + new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy()); + + Assert.assertEquals(1000L, executor.getExecuteTimeOut().longValue()); + executor.setExecuteTimeOut(500L); + Assert.assertEquals(500L, executor.getExecuteTimeOut().longValue()); + } + + @Test + public void testRejectCount() { + DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor( + 1, 1, 1000L, TimeUnit.MILLISECONDS, + 1000L, true, 1000L, + new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy()); + + Assert.assertEquals(0L, executor.getRejectCountNum().longValue()); + Assert.assertEquals(0L, executor.getRejectCount().get()); + + executor.submit(() -> ThreadUtil.sleep(100L)); + executor.submit(() -> ThreadUtil.sleep(100L)); + executor.submit(() -> ThreadUtil.sleep(100L)); + ThreadUtil.sleep(200L); + Assert.assertEquals(1L, executor.getRejectCountNum().longValue()); + Assert.assertEquals(1L, executor.getRejectCount().get()); + } + + @Test + public void testSupportParam() { + DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor( + 1, 1, 1000L, TimeUnit.MILLISECONDS, + 1000L, true, 1000L, + new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy()); + Assert.assertEquals(1000L, executor.getAwaitTerminationMillis()); + Assert.assertTrue(executor.isWaitForTasksToCompleteOnShutdown()); + + executor.setSupportParam(500L, false); + Assert.assertEquals(500L, executor.getAwaitTerminationMillis()); + Assert.assertFalse(executor.isWaitForTasksToCompleteOnShutdown()); + } + +} diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java new file mode 100644 index 00000000..6479aeb5 --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutorTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.executor; + +import cn.hippo4j.common.toolkit.ThreadUtil; +import cn.hippo4j.core.plugin.ExecuteAwarePlugin; +import cn.hippo4j.core.plugin.RejectedAwarePlugin; +import cn.hippo4j.core.plugin.ShutdownAwarePlugin; +import cn.hippo4j.core.plugin.TaskAwarePlugin; +import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; +import lombok.Getter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * test for {@link ExtensibleThreadPoolExecutor} + */ +public class ExtensibleThreadPoolExecutorTest { + + private final RejectedExecutionHandler originalHandler = new ThreadPoolExecutor.DiscardPolicy(); + + private ExtensibleThreadPoolExecutor executor; + + @Before + public void initExecutor() { + executor = new ExtensibleThreadPoolExecutor( + "test", new DefaultThreadPoolPluginManager(), + 5, 5, 1000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(1), Thread::new, originalHandler); + } + + @Test + public void testGetOrSetRejectedHandler() { + RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); + executor.setRejectedExecutionHandler(handler); + Assert.assertSame(handler, executor.getRejectedExecutionHandler()); + } + + @Test + public void testInvokeTaskAwarePlugin() { + TestTaskAwarePlugin plugin = new TestTaskAwarePlugin(); + executor.register(plugin); + executor.submit(() -> {}); + executor.submit(() -> true); + executor.submit(() -> {}, false); + executor.execute(() -> {}); + Assert.assertEquals(7, plugin.getInvokeCount().get()); + } + + @Test + public void testInvokeExecuteAwarePlugin() { + TestExecuteAwarePlugin plugin = new TestExecuteAwarePlugin(); + executor.register(plugin); + executor.execute(() -> { + }); + ThreadUtil.sleep(500L); + Assert.assertEquals(2, plugin.getInvokeCount().get()); + } + + @Test + public void testInvokeRejectedAwarePlugin() { + executor.setCorePoolSize(1); + executor.setMaximumPoolSize(1); + + TestRejectedAwarePlugin plugin = new TestRejectedAwarePlugin(); + executor.register(plugin); + // blocking pool and queue + executor.submit(() -> ThreadUtil.sleep(500L)); + executor.submit(() -> ThreadUtil.sleep(500L)); + // reject 3 tasks + executor.submit(() -> { + }); + executor.submit(() -> { + }); + executor.submit(() -> { + }); + + ThreadUtil.sleep(500L); + Assert.assertEquals(3, plugin.getInvokeCount().get()); + } + + @Test + public void testInvokeTestShutdownAwarePluginWhenShutdown() throws InterruptedException { + TestShutdownAwarePlugin plugin = new TestShutdownAwarePlugin(); + executor.register(plugin); + executor.shutdown(); + executor.submit(() -> {throw new IllegalArgumentException("???");}); + if (executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) { + Assert.assertEquals(3, plugin.getInvokeCount().get()); + } + } + + @Test + public void testInvokeTestShutdownAwarePluginWhenShutdownNow() throws InterruptedException { + TestShutdownAwarePlugin plugin = new TestShutdownAwarePlugin(); + executor.register(plugin); + executor.shutdownNow(); + if (executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) { + Assert.assertEquals(3, plugin.getInvokeCount().get()); + } + } + + @Getter + private final static class TestTaskAwarePlugin implements TaskAwarePlugin { + private final AtomicInteger invokeCount = new AtomicInteger(0); + private final String id = "TestTaskAwarePlugin"; + @Override + public Runnable beforeTaskCreate(ThreadPoolExecutor executor, Runnable runnable, V value) { + invokeCount.incrementAndGet(); + return TaskAwarePlugin.super.beforeTaskCreate(executor, runnable, value); + } + @Override + public Callable beforeTaskCreate(ThreadPoolExecutor executor, Callable future) { + invokeCount.incrementAndGet(); + return TaskAwarePlugin.super.beforeTaskCreate(executor, future); + } + @Override + public Runnable beforeTaskExecute(Runnable runnable) { + invokeCount.incrementAndGet(); + return TaskAwarePlugin.super.beforeTaskExecute(runnable); + } + } + + @Getter + private final static class TestExecuteAwarePlugin implements ExecuteAwarePlugin { + private final AtomicInteger invokeCount = new AtomicInteger(0); + private final String id = "TestExecuteAwarePlugin"; + @Override + public void beforeExecute(Thread thread, Runnable runnable) { + invokeCount.incrementAndGet(); + ExecuteAwarePlugin.super.beforeExecute(thread, runnable); + } + @Override + public void afterExecute(Runnable runnable, Throwable throwable) { + invokeCount.incrementAndGet(); + ExecuteAwarePlugin.super.afterExecute(runnable, throwable); + } + } + + @Getter + private final static class TestRejectedAwarePlugin implements RejectedAwarePlugin { + private final AtomicInteger invokeCount = new AtomicInteger(0); + private final String id = "TestRejectedAwarePlugin"; + @Override + public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { + invokeCount.incrementAndGet(); + } + } + + @Getter + private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin { + private final AtomicInteger invokeCount = new AtomicInteger(0); + private final String id = "TestShutdownAwarePlugin"; + @Override + public void beforeShutdown(ThreadPoolExecutor executor) { + invokeCount.incrementAndGet(); + ShutdownAwarePlugin.super.beforeShutdown(executor); + } + @Override + public void afterShutdown(ThreadPoolExecutor executor, List remainingTasks) { + invokeCount.incrementAndGet(); + ShutdownAwarePlugin.super.afterShutdown(executor, remainingTasks); + } + @Override + public void afterTerminated(ExtensibleThreadPoolExecutor executor) { + invokeCount.incrementAndGet(); + ShutdownAwarePlugin.super.afterTerminated(executor); + } + } + +} diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginManagerTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginManagerTest.java new file mode 100644 index 00000000..fecbcd5f --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginManagerTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin; + +import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * test for {@link DefaultThreadPoolPluginManager} + */ +public class DefaultThreadPoolPluginManagerTest { + + private DefaultThreadPoolPluginManager registry; + + @Before + public void initRegistry() { + registry = new DefaultThreadPoolPluginManager(); + } + + @Test + public void testRegister() { + TaskAwarePlugin taskAwarePlugin = new TestTaskAwarePlugin(); + registry.register(taskAwarePlugin); + Assert.assertThrows(IllegalArgumentException.class, () -> registry.register(taskAwarePlugin)); + Assert.assertTrue(registry.isRegistered(taskAwarePlugin.getId())); + Assert.assertEquals(1, registry.getTaskAwarePluginList().size()); + Assert.assertSame(taskAwarePlugin, registry.getPlugin(taskAwarePlugin.getId()).orElse(null)); + registry.getPluginOfType(taskAwarePlugin.getId(), TestTaskAwarePlugin.class) + .ifPresent(plugin -> Assert.assertSame(plugin, taskAwarePlugin)); + Assert.assertEquals(taskAwarePlugin.getId(), registry.getPluginOfType(taskAwarePlugin.getId(), TestTaskAwarePlugin.class).map(TestTaskAwarePlugin::getId).orElse(null)); + registry.unregister(taskAwarePlugin.getId()); + Assert.assertFalse(registry.getPlugin(taskAwarePlugin.getId()).isPresent()); + + ExecuteAwarePlugin executeAwarePlugin = new TestExecuteAwarePlugin(); + registry.register(executeAwarePlugin); + Assert.assertTrue(registry.isRegistered(executeAwarePlugin.getId())); + Assert.assertEquals(1, registry.getExecuteAwarePluginList().size()); + + RejectedAwarePlugin rejectedAwarePlugin = new TestRejectedAwarePlugin(); + registry.register(rejectedAwarePlugin); + Assert.assertTrue(registry.isRegistered(rejectedAwarePlugin.getId())); + Assert.assertEquals(1, registry.getRejectedAwarePluginList().size()); + + ShutdownAwarePlugin shutdownAwarePlugin = new TestShutdownAwarePlugin(); + registry.register(shutdownAwarePlugin); + Assert.assertTrue(registry.isRegistered(shutdownAwarePlugin.getId())); + Assert.assertEquals(1, registry.getShutdownAwarePluginList().size()); + } + + private final static class TestTaskAwarePlugin implements TaskAwarePlugin { + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return "TestTaskAwarePlugin"; + } + } + + private final static class TestExecuteAwarePlugin implements ExecuteAwarePlugin { + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return "TestExecuteAwarePlugin"; + } + } + + private final static class TestRejectedAwarePlugin implements RejectedAwarePlugin { + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return "TestRejectedAwarePlugin"; + } + } + + private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin { + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return "TestShutdownAwarePlugin"; + } + } + +} diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/PluginRuntimeTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/PluginRuntimeTest.java new file mode 100644 index 00000000..f5806157 --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/PluginRuntimeTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin; + +import cn.hippo4j.common.toolkit.ThreadUtil; +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.impl.TaskRejectCountRecordPlugin; +import cn.hippo4j.core.plugin.impl.TaskTimeRecordPlugin; +import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin; +import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.SneakyThrows; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * test {@link ThreadPoolPlugin}'s info to json + */ +public class PluginRuntimeTest { + + private final ObjectMapper objectMapper = new ObjectMapper(); + + @SneakyThrows + @Test + public void testGetPluginRuntime() { + ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( + "test", new DefaultThreadPoolPluginManager(), + 1, 1, 1000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); + + // TaskRejectCountRecordPlugin + TaskRejectCountRecordPlugin taskRejectCountRecordPlugin = new TaskRejectCountRecordPlugin(); + executor.register(taskRejectCountRecordPlugin); + + // TaskRejectCountRecordPlugin + TaskTimeRecordPlugin taskTimeRecordPlugin = new TaskTimeRecordPlugin(); + executor.register(taskTimeRecordPlugin); + + // ThreadPoolExecutorShutdownPlugin + ThreadPoolExecutorShutdownPlugin executorShutdownPlugin = new ThreadPoolExecutorShutdownPlugin(2000L, true); + executor.register(executorShutdownPlugin); + + executor.submit(() -> ThreadUtil.sleep(100L)); + executor.submit(() -> ThreadUtil.sleep(300L)); + executor.submit(() -> ThreadUtil.sleep(200L)); + + ThreadUtil.sleep(1000L); + List runtimeList = executor.getAllPlugins().stream() + .map(ThreadPoolPlugin::getPluginRuntime) + .collect(Collectors.toList()); + Assert.assertEquals(3, runtimeList.size()); + + System.out.println(objectMapper.writeValueAsString(runtimeList)); + } + +} diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPluginTest.java new file mode 100644 index 00000000..21445799 --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPluginTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.common.toolkit.ThreadUtil; +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * test for {@link TaskDecoratorPlugin} + */ +public class TaskDecoratorPluginTest { + + private final AtomicInteger taskExecuteCount = new AtomicInteger(0); + + @Test + public void testExecute() { + ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( + "test", new DefaultThreadPoolPluginManager(), + 5, 5, 1000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); + TaskDecoratorPlugin plugin = new TaskDecoratorPlugin(); + plugin.addDecorator(runnable -> () -> { + taskExecuteCount.incrementAndGet(); + runnable.run(); + }); + plugin.addDecorator(runnable -> () -> { + taskExecuteCount.incrementAndGet(); + runnable.run(); + }); + executor.register(plugin); + executor.execute(() -> { + }); + ThreadUtil.sleep(500L); + Assert.assertEquals(2, taskExecuteCount.get()); + } + +} diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPluginTest.java new file mode 100644 index 00000000..ca9b492d --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPluginTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.common.toolkit.ThreadUtil; +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * test for {@link TaskRejectCountRecordPlugin} + */ +public class TaskRejectCountRecordPluginTest { + + @Test + public void testExecute() { + ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( + "test", new DefaultThreadPoolPluginManager(), + 1, 1, 1000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); + + TaskRejectCountRecordPlugin plugin = new TaskRejectCountRecordPlugin(); + executor.register(plugin); + executor.submit(() -> ThreadUtil.sleep(500L)); + executor.submit(() -> ThreadUtil.sleep(500L)); + executor.submit(() -> ThreadUtil.sleep(500L)); + + ThreadUtil.sleep(500L); + Assert.assertEquals((Long) 1L, plugin.getRejectCountNum()); + } + +} diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPluginTest.java new file mode 100644 index 00000000..9032c9c9 --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPluginTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.common.toolkit.ThreadUtil; +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; +import lombok.RequiredArgsConstructor; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * test for {@link TaskRejectNotifyAlarmPlugin} + */ +public class TaskRejectNotifyAlarmPluginTest { + + @Test + public void testBeforeRejectedExecution() { + ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( + "test", new DefaultThreadPoolPluginManager(), + 1, 1, 1000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); + + AtomicInteger rejectCount = new AtomicInteger(0); + executor.register(new TestTaskRejectNotifyAlarmPlugin(rejectCount, executor)); + executor.submit(() -> ThreadUtil.sleep(200L)); + executor.submit(() -> ThreadUtil.sleep(200L)); + executor.submit(() -> ThreadUtil.sleep(200L)); + + ThreadUtil.sleep(1000L); + Assert.assertEquals(1, rejectCount.get()); + } + + @RequiredArgsConstructor + private static class TestTaskRejectNotifyAlarmPlugin extends TaskRejectNotifyAlarmPlugin { + + private final AtomicInteger count; + private final ThreadPoolExecutor targetExecutor; + + /** + * Callback before task is rejected. + * + * @param runnable task + * @param executor executor + */ + @Override + public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { + count.incrementAndGet(); + Assert.assertEquals(targetExecutor, executor); + super.beforeRejectedExecution(runnable, executor); + } + } + +} diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java new file mode 100644 index 00000000..2a6a2731 --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.common.toolkit.ThreadUtil; +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * test for {@link TaskTimeRecordPlugin} + */ +public class TaskTimeRecordPluginTest { + + @Test + public void testExecute() { + ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( + "test", new DefaultThreadPoolPluginManager(), + 3, 3, 1000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); + + TaskTimeRecordPlugin plugin = new TaskTimeRecordPlugin(); + executor.register(plugin); + executor.submit(() -> ThreadUtil.sleep(100L)); + executor.submit(() -> ThreadUtil.sleep(300L)); + executor.submit(() -> ThreadUtil.sleep(200L)); + + ThreadUtil.sleep(1000L); + TaskTimeRecordPlugin.Summary summary = plugin.summarize(); + Assert.assertEquals(1, summary.getMinTaskTimeMillis() / 100L); + Assert.assertEquals(3, summary.getMaxTaskTimeMillis() / 100L); + Assert.assertEquals(2, summary.getAvgTaskTimeMillis() / 100L); + Assert.assertEquals(6, summary.getTotalTaskTimeMillis() / 100L); + } +} diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPluginTest.java new file mode 100644 index 00000000..c0fb97a6 --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPluginTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.common.toolkit.ThreadUtil; +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.ThreadPoolPlugin; +import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * test for {@link ThreadPoolExecutorShutdownPlugin} + */ +public class ThreadPoolExecutorShutdownPluginTest { + + public ExtensibleThreadPoolExecutor getExecutor(ThreadPoolPlugin plugin) { + ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( + "test", new DefaultThreadPoolPluginManager(), + 2, 2, 1000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); + executor.register(plugin); + return executor; + } + + private static Callable getCallable(AtomicInteger completedCount) { + return () -> { + ThreadUtil.sleep(1000L); + return completedCount.incrementAndGet(); + }; + } + + @Test + public void testExecuteShutdownWhenWaitTaskCompleted() { + ExtensibleThreadPoolExecutor executor = getExecutor( + new ThreadPoolExecutorShutdownPlugin(2000L, true)); + + AtomicInteger completedCount = new AtomicInteger(0); + Callable future1 = getCallable(completedCount); + Callable future2 = getCallable(completedCount); + executor.submit(future1); + executor.submit(future2); + + executor.shutdown(); + Assert.assertEquals(2, completedCount.get()); + } + + @Test + public void testExecuteShutdownWhenNotWaitTaskCompleted() { + ExtensibleThreadPoolExecutor executor = getExecutor( + new ThreadPoolExecutorShutdownPlugin(-1L, true)); + + AtomicInteger completedCount = new AtomicInteger(0); + Callable future1 = getCallable(completedCount); + Callable future2 = getCallable(completedCount); + executor.submit(future1); + executor.submit(future2); + + executor.shutdown(); + Assert.assertEquals(0, completedCount.get()); + } +} \ No newline at end of file