test: completed test cases

pull/836/head
huangchengxing 3 years ago
parent 313cedffc8
commit 90afc2369c

@ -1,9 +0,0 @@
package cn.hippo4j.core.executor;
/**
* test for {@link ExtensibleThreadPoolExecutor}
*
* @author huangchengxing
*/
public class ActionAwareThreadPoolExecutorTest {
}

@ -0,0 +1,167 @@
package cn.hippo4j.core.executor;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.plugin.*;
import lombok.Getter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* test for {@link ExtensibleThreadPoolExecutor}
*
* @author huangchengxing
*/
public class ExtensibleThreadPoolExecutorTest {
private final RejectedExecutionHandler originalHandler = new ThreadPoolExecutor.DiscardPolicy();
private ExtensibleThreadPoolExecutor executor;
@Before
public void initExecutor() {
executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginRegistry(),
5, 5, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, originalHandler
);
}
@Test
public void testGetOrSetRejectedHandler() {
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
executor.setRejectedExecutionHandler(handler);
Assert.assertSame(handler, executor.getRejectedExecutionHandler());
}
@Test
public void testInvokeTaskAwarePlugin() {
TestTaskAwarePlugin plugin = new TestTaskAwarePlugin();
executor.register(plugin);
executor.submit(() -> {});
executor.submit(() -> true);
executor.submit(() -> {}, false);
Assert.assertEquals(3, plugin.getInvokeCount().get());
}
@Test
public void testInvokeExecuteAwarePlugin() {
TestExecuteAwarePlugin plugin = new TestExecuteAwarePlugin();
executor.register(plugin);
executor.execute(() -> {});
ThreadUtil.sleep(500L);
Assert.assertEquals(3, plugin.getInvokeCount().get());
}
@Test
public void testInvokeRejectedAwarePlugin() {
executor.setCorePoolSize(1);
executor.setMaximumPoolSize(1);
TestRejectedAwarePlugin plugin = new TestRejectedAwarePlugin();
executor.register(plugin);
// blocking pool and queue
executor.submit(() -> ThreadUtil.sleep(500L));
executor.submit(() -> ThreadUtil.sleep(500L));
// reject 3 tasks
executor.submit(() -> {});
executor.submit(() -> {});
executor.submit(() -> {});
ThreadUtil.sleep(500L);
Assert.assertEquals(3, plugin.getInvokeCount().get());
}
@Test
public void testInvokeTestShutdownAwarePluginWhenShutdown() throws InterruptedException {
TestShutdownAwarePlugin plugin = new TestShutdownAwarePlugin();
executor.register(plugin);
executor.shutdown();
if (executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
Assert.assertEquals(3, plugin.getInvokeCount().get());
}
}
@Test
public void testInvokeTestShutdownAwarePluginWhenShutdownNow() throws InterruptedException {
TestShutdownAwarePlugin plugin = new TestShutdownAwarePlugin();
executor.register(plugin);
executor.shutdownNow();
if (executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
Assert.assertEquals(3, plugin.getInvokeCount().get());
}
}
@Getter
private final static class TestTaskAwarePlugin implements TaskAwarePlugin {
private final AtomicInteger invokeCount = new AtomicInteger(0);
private final String id = "TestTaskAwarePlugin";
@Override
public <V> Runnable beforeTaskCreate(ThreadPoolExecutor executor, Runnable runnable, V value) {
invokeCount.incrementAndGet();
return TaskAwarePlugin.super.beforeTaskCreate(executor, runnable, value);
}
@Override
public <V> Callable<V> beforeTaskCreate(ThreadPoolExecutor executor, Callable<V> future) {
invokeCount.incrementAndGet();
return TaskAwarePlugin.super.beforeTaskCreate(executor, future);
}
}
@Getter
private final static class TestExecuteAwarePlugin implements ExecuteAwarePlugin {
private final AtomicInteger invokeCount = new AtomicInteger(0);
private final String id = "TestExecuteAwarePlugin";
@Override
public void beforeExecute(Thread thread, Runnable runnable) {
invokeCount.incrementAndGet();
ExecuteAwarePlugin.super.beforeExecute(thread, runnable);
}
@Override
public Runnable execute(Runnable runnable) {
invokeCount.incrementAndGet();
return ExecuteAwarePlugin.super.execute(runnable);
}
@Override
public void afterExecute(Runnable runnable, Throwable throwable) {
invokeCount.incrementAndGet();
ExecuteAwarePlugin.super.afterExecute(runnable, throwable);
}
}
@Getter
private final static class TestRejectedAwarePlugin implements RejectedAwarePlugin {
private final AtomicInteger invokeCount = new AtomicInteger(0);
private final String id = "TestRejectedAwarePlugin";
@Override
public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
invokeCount.incrementAndGet();
}
}
@Getter
private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin {
private final AtomicInteger invokeCount = new AtomicInteger(0);
private final String id = "TestShutdownAwarePlugin";
@Override
public void beforeShutdown(ThreadPoolExecutor executor) {
invokeCount.incrementAndGet();
ShutdownAwarePlugin.super.beforeShutdown(executor);
}
@Override
public void afterShutdown(ThreadPoolExecutor executor, List<Runnable> remainingTasks) {
invokeCount.incrementAndGet();
ShutdownAwarePlugin.super.afterShutdown(executor, remainingTasks);
}
@Override
public void afterTerminated(ExtensibleThreadPoolExecutor executor) {
invokeCount.incrementAndGet();
ShutdownAwarePlugin.super.afterTerminated(executor);
}
}
}

@ -22,23 +22,38 @@ public class DefaultThreadPoolPluginRegistryTest {
public void testRegister() {
TaskAwarePlugin taskAwarePlugin = new TestTaskAwarePlugin();
registry.register(taskAwarePlugin);
Assert.assertTrue(registry.isRegistered(taskAwarePlugin));
Assert.assertThrows(IllegalArgumentException.class, () -> registry.register(taskAwarePlugin));
Assert.assertTrue(registry.isRegistered(taskAwarePlugin.getId()));
Assert.assertEquals(1, registry.getTaskAwareList().size());
Assert.assertSame(taskAwarePlugin, registry.getPlugin(taskAwarePlugin.getId()));
registry.getAndThen(taskAwarePlugin.getId(), TestTaskAwarePlugin.class, plugin -> Assert.assertSame(plugin, taskAwarePlugin));
Assert.assertEquals(taskAwarePlugin.getId(), registry.getAndThen(taskAwarePlugin.getId(), TestTaskAwarePlugin.class, TestTaskAwarePlugin::getId, null));
registry.unregister(taskAwarePlugin.getId());
Assert.assertNull(registry.getPlugin(taskAwarePlugin.getId()));
ExecuteAwarePlugin executeAwarePlugin = new TestExecuteAwarePlugin();
registry.register(executeAwarePlugin);
Assert.assertTrue(registry.isRegistered(executeAwarePlugin));
Assert.assertTrue(registry.isRegistered(executeAwarePlugin.getId()));
Assert.assertEquals(1, registry.getExecuteAwareList().size());
Assert.assertSame(executeAwarePlugin, registry.getPlugin(executeAwarePlugin.getId()));
registry.unregister(executeAwarePlugin.getId());
Assert.assertNull(registry.getPlugin(executeAwarePlugin.getId()));
RejectedAwarePlugin rejectedAwarePlugin = new TestRejectedAwarePlugin();
registry.register(rejectedAwarePlugin);
Assert.assertTrue(registry.isRegistered(rejectedAwarePlugin));
Assert.assertTrue(registry.isRegistered(rejectedAwarePlugin.getId()));
Assert.assertEquals(1, registry.getRejectedAwareList().size());
Assert.assertSame(rejectedAwarePlugin, registry.getPlugin(rejectedAwarePlugin.getId()));
registry.unregister(rejectedAwarePlugin.getId());
Assert.assertNull(registry.getPlugin(rejectedAwarePlugin.getId()));
ShutdownAwarePlugin shutdownAwarePlugin = new TestShutdownAwarePlugin();
registry.register(shutdownAwarePlugin);
Assert.assertTrue(registry.isRegistered(shutdownAwarePlugin));
Assert.assertTrue(registry.isRegistered(shutdownAwarePlugin.getId()));
Assert.assertEquals(1, registry.getShutdownAwareList().size());
Assert.assertSame(shutdownAwarePlugin, registry.getPlugin(shutdownAwarePlugin.getId()));
registry.unregister(shutdownAwarePlugin.getId());
Assert.assertNull(registry.getPlugin(shutdownAwarePlugin.getId()));
}
private final static class TestTaskAwarePlugin implements TaskAwarePlugin {

@ -1,9 +1,45 @@
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* test for {@link TaskDecoratorPlugin}
*
* @author huangchengxing
*/
public class TaskDecoratorPluginTest {
private final AtomicInteger taskExecuteCount = new AtomicInteger(0);
@Test
public void testExecute() {
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginRegistry(),
5, 5, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()
);
TaskDecoratorPlugin plugin = new TaskDecoratorPlugin();
plugin.addDecorator(runnable -> () -> {
taskExecuteCount.incrementAndGet();
runnable.run();
});
plugin.addDecorator(runnable -> () -> {
taskExecuteCount.incrementAndGet();
runnable.run();
});
executor.register(plugin);
executor.execute(() -> {});
ThreadUtil.sleep(500L);
Assert.assertEquals(2, taskExecuteCount.get());
}
}

@ -1,9 +1,38 @@
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* test for {@link TaskRejectCountRecordPlugin}
*
* @author huangchengxing
*/
public class TaskRejectCountRecordPluginTest {
@Test
public void testExecute() {
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginRegistry(),
1, 1, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()
);
TaskRejectCountRecordPlugin plugin = new TaskRejectCountRecordPlugin();
executor.register(plugin);
executor.submit(() -> ThreadUtil.sleep(500L));
executor.submit(() -> ThreadUtil.sleep(500L));
executor.submit(() -> ThreadUtil.sleep(500L));
ThreadUtil.sleep(500L);
Assert.assertEquals((Long)1L, plugin.getRejectCountNum());
}
}

@ -1,9 +1,42 @@
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.SyncTimeRecorder;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* test for {@link TaskTimeRecordAwareProcessorPlugin}
* test for {@link TaskTimeRecordPlugin}
*
* @author huangchengxing
*/
public class TaskTimeRecordPluginTest {
@Test
public void testExecute() {
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginRegistry(),
3, 3, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()
);
TaskTimeRecordPlugin plugin = new TaskTimeRecordPlugin();
executor.register(plugin);
executor.submit(() -> ThreadUtil.sleep(100L));
executor.submit(() -> ThreadUtil.sleep(300L));
executor.submit(() -> ThreadUtil.sleep(200L));
ThreadUtil.sleep(1000L);
SyncTimeRecorder.Summary summary = plugin.summarize();
Assert.assertEquals(1, summary.getMinTaskTime() / 100L);
Assert.assertEquals(3, summary.getMaxTaskTime() / 100L);
Assert.assertEquals(2, summary.getAvgTaskTimeMillis() / 100L);
Assert.assertEquals(6, summary.getTotalTaskTime() / 100L);
}
}

@ -1,9 +1,71 @@
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.DefaultThreadPoolPluginRegistry;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* test for {@link ThreadPoolExecutorShutdownPlugin}
*
* @author huangchengxing
*/
public class ThreadPoolExecutorShutdownPluginTest {
public ExtensibleThreadPoolExecutor getExecutor(ThreadPoolPlugin plugin) {
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginRegistry(),
2, 2, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()
);
executor.register(plugin);
return executor;
}
private static Callable<Integer> getCallable(AtomicInteger completedCount) {
return () -> {
ThreadUtil.sleep(1000L);
return completedCount.incrementAndGet();
};
}
@Test
public void testExecuteShutdownWhenWaitTaskCompleted() {
ExtensibleThreadPoolExecutor executor = getExecutor(
new ThreadPoolExecutorShutdownPlugin(2000L, true)
);
AtomicInteger completedCount = new AtomicInteger(0);
Callable<Integer> future1 = getCallable(completedCount);
Callable<Integer> future2 = getCallable(completedCount);
executor.submit(future1);
executor.submit(future2);
executor.shutdown();
Assert.assertEquals(2, completedCount.get());
}
@Test
public void testExecuteShutdownWhenNotWaitTaskCompleted() {
ExtensibleThreadPoolExecutor executor = getExecutor(
new ThreadPoolExecutorShutdownPlugin(-1L, true)
);
AtomicInteger completedCount = new AtomicInteger(0);
Callable<Integer> future1 = getCallable(completedCount);
Callable<Integer> future2 = getCallable(completedCount);
executor.submit(future1);
executor.submit(future2);
executor.shutdown();
Assert.assertEquals(0, completedCount.get());
}
}
Loading…
Cancel
Save