test: Add unit test for DynamicThreadPoolExecutor and ExtensibleThreadPoolExecutor (#884)

pull/886/head
黄成兴 2 years ago committed by GitHub
parent cc52829e24
commit 175bfb9c54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -10,6 +10,11 @@
<artifactId>hippo4j-core</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.executor;
import cn.hippo4j.common.toolkit.ThreadUtil;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.core.task.TaskDecorator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* test for {@link DynamicThreadPoolExecutor}
*/
public class DynamicThreadPoolExecutorTest {
@Test
public void testRedundancyHandler() {
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
1, 1, 1000L, TimeUnit.MILLISECONDS,
1000L, true, 1000L,
new ArrayBlockingQueue<>(1), "test", Thread::new, handler);
Assert.assertEquals(handler, executor.getRedundancyHandler());
handler = new ThreadPoolExecutor.AbortPolicy();
executor.setRedundancyHandler(handler);
Assert.assertEquals(handler, executor.getRedundancyHandler());
}
@Test
public void testTaskDecorator() {
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
1, 1, 1000L, TimeUnit.MILLISECONDS,
1000L, true, 1000L,
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
Assert.assertNull(executor.getTaskDecorator());
TaskDecorator decorator = runnable -> runnable;
executor.setTaskDecorator(decorator);
Assert.assertEquals(decorator, executor.getTaskDecorator());
decorator = runnable -> runnable;
executor.setTaskDecorator(decorator);
Assert.assertEquals(decorator, executor.getTaskDecorator());
}
@Test
public void testExecuteTimeOut() {
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
1, 1, 1000L, TimeUnit.MILLISECONDS,
1000L, true, 1000L,
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
Assert.assertEquals(1000L, executor.getExecuteTimeOut().longValue());
executor.setExecuteTimeOut(500L);
Assert.assertEquals(500L, executor.getExecuteTimeOut().longValue());
}
@Test
public void testDestroyWhenWaitForTask() {
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
1, 1, 1000L, TimeUnit.MILLISECONDS,
1000L, true, 1000L,
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
AtomicInteger count = new AtomicInteger(0);
executor.execute(() -> {
ThreadUtil.sleep(500L);
count.incrementAndGet();
});
executor.execute(() -> {
ThreadUtil.sleep(500L);
count.incrementAndGet();
});
executor.destroy();
// waitting for terminated
while (!executor.isTerminated()){};
Assert.assertEquals(2, count.get());
}
@Test
public void testDestroyWhenNotWaitForTask() {
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
1, 1, 1000L, TimeUnit.MILLISECONDS,
1000L, false, 1000L,
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
AtomicInteger count = new AtomicInteger(0);
executor.execute(() -> {
ThreadUtil.sleep(500L);
count.incrementAndGet();
});
executor.execute(() -> {
ThreadUtil.sleep(500L);
count.incrementAndGet();
});
executor.destroy();
// waitting for terminated
while (!executor.isTerminated()){};
Assert.assertEquals(1, count.get());
}
@Test
public void testRejectCount() {
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
1, 1, 1000L, TimeUnit.MILLISECONDS,
1000L, true, 1000L,
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
Assert.assertEquals(0L, executor.getRejectCountNum().longValue());
Assert.assertEquals(0L, executor.getRejectCount().get());
executor.submit(() -> ThreadUtil.sleep(100L));
executor.submit(() -> ThreadUtil.sleep(100L));
executor.submit(() -> ThreadUtil.sleep(100L));
ThreadUtil.sleep(200L);
Assert.assertEquals(1L, executor.getRejectCountNum().longValue());
Assert.assertEquals(1L, executor.getRejectCount().get());
}
@Test
public void testSupportParam() {
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
1, 1, 1000L, TimeUnit.MILLISECONDS,
1000L, true, 1000L,
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
Assert.assertEquals(1000L, executor.getAwaitTerminationMillis());
Assert.assertTrue(executor.isWaitForTasksToCompleteOnShutdown());
executor.setSupportParam(500L, false);
Assert.assertEquals(500L, executor.getAwaitTerminationMillis());
Assert.assertFalse(executor.isWaitForTasksToCompleteOnShutdown());
}
}

@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.executor;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.plugin.RejectedAwarePlugin;
import cn.hippo4j.core.plugin.ShutdownAwarePlugin;
import cn.hippo4j.core.plugin.TaskAwarePlugin;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
import cn.hippo4j.core.plugin.manager.ThreadPoolPluginManager;
import lombok.Getter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* test for {@link ExtensibleThreadPoolExecutor}
*/
public class ExtensibleThreadPoolExecutorTest {
private final RejectedExecutionHandler originalHandler = new ThreadPoolExecutor.DiscardPolicy();
private ExtensibleThreadPoolExecutor executor;
private ThreadPoolPluginManager manager;
@Before
public void initExecutor() {
manager = new DefaultThreadPoolPluginManager();
executor = new ExtensibleThreadPoolExecutor(
"test", manager,
5, 5, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, originalHandler);
}
@Test
public void testGetThreadPoolId() {
Assert.assertEquals("test", executor.getThreadPoolId());
}
@Test
public void testGetThreadPoolExecutor() {
Assert.assertSame(executor, executor.getThreadPoolExecutor());
}
@Test
public void testGetThreadPoolPluginManager() {
Assert.assertSame(manager, executor.getThreadPoolPluginManager());
}
@Test
public void testGetOrSetRejectedHandler() {
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
executor.setRejectedExecutionHandler(handler);
Assert.assertSame(handler, executor.getRejectedExecutionHandler());
}
@Test
public void testInvokeTaskAwarePlugin() {
TestTaskAwarePlugin plugin = new TestTaskAwarePlugin();
executor.register(plugin);
executor.submit(() -> {
});
executor.submit(() -> true);
executor.submit(() -> {
}, false);
executor.execute(() -> {
});
Assert.assertEquals(7, plugin.getInvokeCount().get());
}
@Test
public void testInvokeExecuteAwarePlugin() {
TestExecuteAwarePlugin plugin = new TestExecuteAwarePlugin();
executor.register(plugin);
executor.execute(() -> {
});
ThreadUtil.sleep(500L);
Assert.assertEquals(2, plugin.getInvokeCount().get());
}
@Test
public void testInvokeRejectedAwarePlugin() {
executor.setCorePoolSize(1);
executor.setMaximumPoolSize(1);
TestRejectedAwarePlugin plugin = new TestRejectedAwarePlugin();
executor.register(plugin);
// blocking pool and queue
executor.submit(() -> ThreadUtil.sleep(500L));
executor.submit(() -> ThreadUtil.sleep(500L));
// reject 3 tasks
executor.submit(() -> {
});
executor.submit(() -> {
});
executor.submit(() -> {
});
ThreadUtil.sleep(500L);
Assert.assertEquals(3, plugin.getInvokeCount().get());
}
@Test
public void testInvokeTestShutdownAwarePluginWhenShutdown() throws InterruptedException {
TestShutdownAwarePlugin plugin = new TestShutdownAwarePlugin();
executor.register(plugin);
executor.shutdown();
executor.submit(() -> {
throw new IllegalArgumentException("???");
});
if (executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
Assert.assertEquals(3, plugin.getInvokeCount().get());
}
}
@Test
public void testInvokeTestShutdownAwarePluginWhenShutdownNow() throws InterruptedException {
TestShutdownAwarePlugin plugin = new TestShutdownAwarePlugin();
executor.register(plugin);
executor.shutdownNow();
if (executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
Assert.assertEquals(3, plugin.getInvokeCount().get());
}
}
@Getter
private final static class TestTaskAwarePlugin implements TaskAwarePlugin {
private final AtomicInteger invokeCount = new AtomicInteger(0);
private final String id = "TestTaskAwarePlugin";
@Override
public <V> Runnable beforeTaskCreate(ThreadPoolExecutor executor, Runnable runnable, V value) {
invokeCount.incrementAndGet();
return TaskAwarePlugin.super.beforeTaskCreate(executor, runnable, value);
}
@Override
public <V> Callable<V> beforeTaskCreate(ThreadPoolExecutor executor, Callable<V> future) {
invokeCount.incrementAndGet();
return TaskAwarePlugin.super.beforeTaskCreate(executor, future);
}
@Override
public Runnable beforeTaskExecute(Runnable runnable) {
invokeCount.incrementAndGet();
return TaskAwarePlugin.super.beforeTaskExecute(runnable);
}
}
@Getter
private final static class TestExecuteAwarePlugin implements ExecuteAwarePlugin {
private final AtomicInteger invokeCount = new AtomicInteger(0);
private final String id = "TestExecuteAwarePlugin";
@Override
public void beforeExecute(Thread thread, Runnable runnable) {
invokeCount.incrementAndGet();
ExecuteAwarePlugin.super.beforeExecute(thread, runnable);
}
@Override
public void afterExecute(Runnable runnable, Throwable throwable) {
invokeCount.incrementAndGet();
ExecuteAwarePlugin.super.afterExecute(runnable, throwable);
}
}
@Getter
private final static class TestRejectedAwarePlugin implements RejectedAwarePlugin {
private final AtomicInteger invokeCount = new AtomicInteger(0);
private final String id = "TestRejectedAwarePlugin";
@Override
public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
invokeCount.incrementAndGet();
}
}
@Getter
private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin {
private final AtomicInteger invokeCount = new AtomicInteger(0);
private final String id = "TestShutdownAwarePlugin";
@Override
public void beforeShutdown(ThreadPoolExecutor executor) {
invokeCount.incrementAndGet();
ShutdownAwarePlugin.super.beforeShutdown(executor);
}
@Override
public void afterShutdown(ThreadPoolExecutor executor, List<Runnable> remainingTasks) {
invokeCount.incrementAndGet();
ShutdownAwarePlugin.super.afterShutdown(executor, remainingTasks);
}
@Override
public void afterTerminated(ExtensibleThreadPoolExecutor executor) {
invokeCount.incrementAndGet();
ShutdownAwarePlugin.super.afterTerminated(executor);
}
}
}
Loading…
Cancel
Save