mirror of https://github.com/longtai-cn/hippo4j
parent
d049ebc111
commit
9dedec99d3
@ -0,0 +1,111 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cn.hippo4j.core.executor;
|
||||
|
||||
import cn.hippo4j.common.toolkit.ThreadUtil;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.springframework.core.task.TaskDecorator;
|
||||
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* test for {@link DynamicThreadPoolExecutor}
|
||||
*/
|
||||
public class DynamicThreadPoolExecutorTest {
|
||||
|
||||
@Test
|
||||
public void testRedundancyHandler() {
|
||||
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
|
||||
|
||||
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
|
||||
1, 1, 1000L, TimeUnit.MILLISECONDS,
|
||||
1000L, true, 1000L,
|
||||
new ArrayBlockingQueue<>(1), "test", Thread::new, handler);
|
||||
|
||||
Assert.assertEquals(handler, executor.getRedundancyHandler());
|
||||
handler = new ThreadPoolExecutor.AbortPolicy();
|
||||
executor.setRedundancyHandler(handler);
|
||||
Assert.assertEquals(handler, executor.getRedundancyHandler());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTaskDecorator() {
|
||||
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
|
||||
1, 1, 1000L, TimeUnit.MILLISECONDS,
|
||||
1000L, true, 1000L,
|
||||
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
|
||||
|
||||
Assert.assertNull(executor.getTaskDecorator());
|
||||
TaskDecorator decorator = runnable -> runnable;
|
||||
executor.setTaskDecorator(decorator);
|
||||
Assert.assertEquals(decorator, executor.getTaskDecorator());
|
||||
|
||||
decorator = runnable -> runnable;
|
||||
executor.setTaskDecorator(decorator);
|
||||
Assert.assertEquals(decorator, executor.getTaskDecorator());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteTimeOut() {
|
||||
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
|
||||
1, 1, 1000L, TimeUnit.MILLISECONDS,
|
||||
1000L, true, 1000L,
|
||||
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
|
||||
|
||||
Assert.assertEquals(1000L, executor.getExecuteTimeOut().longValue());
|
||||
executor.setExecuteTimeOut(500L);
|
||||
Assert.assertEquals(500L, executor.getExecuteTimeOut().longValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRejectCount() {
|
||||
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
|
||||
1, 1, 1000L, TimeUnit.MILLISECONDS,
|
||||
1000L, true, 1000L,
|
||||
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
|
||||
|
||||
Assert.assertEquals(0L, executor.getRejectCountNum().longValue());
|
||||
Assert.assertEquals(0L, executor.getRejectCount().get());
|
||||
|
||||
executor.submit(() -> ThreadUtil.sleep(100L));
|
||||
executor.submit(() -> ThreadUtil.sleep(100L));
|
||||
executor.submit(() -> ThreadUtil.sleep(100L));
|
||||
ThreadUtil.sleep(200L);
|
||||
Assert.assertEquals(1L, executor.getRejectCountNum().longValue());
|
||||
Assert.assertEquals(1L, executor.getRejectCount().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSupportParam() {
|
||||
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
|
||||
1, 1, 1000L, TimeUnit.MILLISECONDS,
|
||||
1000L, true, 1000L,
|
||||
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
|
||||
Assert.assertEquals(1000L, executor.getAwaitTerminationMillis());
|
||||
Assert.assertTrue(executor.isWaitForTasksToCompleteOnShutdown());
|
||||
|
||||
executor.setSupportParam(500L, false);
|
||||
Assert.assertEquals(500L, executor.getAwaitTerminationMillis());
|
||||
Assert.assertFalse(executor.isWaitForTasksToCompleteOnShutdown());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,191 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cn.hippo4j.core.executor;
|
||||
|
||||
import cn.hippo4j.common.toolkit.ThreadUtil;
|
||||
import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
|
||||
import cn.hippo4j.core.plugin.RejectedAwarePlugin;
|
||||
import cn.hippo4j.core.plugin.ShutdownAwarePlugin;
|
||||
import cn.hippo4j.core.plugin.TaskAwarePlugin;
|
||||
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
|
||||
import lombok.Getter;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* test for {@link ExtensibleThreadPoolExecutor}
|
||||
*/
|
||||
public class ExtensibleThreadPoolExecutorTest {
|
||||
|
||||
private final RejectedExecutionHandler originalHandler = new ThreadPoolExecutor.DiscardPolicy();
|
||||
|
||||
private ExtensibleThreadPoolExecutor executor;
|
||||
|
||||
@Before
|
||||
public void initExecutor() {
|
||||
executor = new ExtensibleThreadPoolExecutor(
|
||||
"test", new DefaultThreadPoolPluginManager(),
|
||||
5, 5, 1000L, TimeUnit.MILLISECONDS,
|
||||
new ArrayBlockingQueue<>(1), Thread::new, originalHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetOrSetRejectedHandler() {
|
||||
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
|
||||
executor.setRejectedExecutionHandler(handler);
|
||||
Assert.assertSame(handler, executor.getRejectedExecutionHandler());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvokeTaskAwarePlugin() {
|
||||
TestTaskAwarePlugin plugin = new TestTaskAwarePlugin();
|
||||
executor.register(plugin);
|
||||
executor.submit(() -> {});
|
||||
executor.submit(() -> true);
|
||||
executor.submit(() -> {}, false);
|
||||
executor.execute(() -> {});
|
||||
Assert.assertEquals(7, plugin.getInvokeCount().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvokeExecuteAwarePlugin() {
|
||||
TestExecuteAwarePlugin plugin = new TestExecuteAwarePlugin();
|
||||
executor.register(plugin);
|
||||
executor.execute(() -> {
|
||||
});
|
||||
ThreadUtil.sleep(500L);
|
||||
Assert.assertEquals(2, plugin.getInvokeCount().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvokeRejectedAwarePlugin() {
|
||||
executor.setCorePoolSize(1);
|
||||
executor.setMaximumPoolSize(1);
|
||||
|
||||
TestRejectedAwarePlugin plugin = new TestRejectedAwarePlugin();
|
||||
executor.register(plugin);
|
||||
// blocking pool and queue
|
||||
executor.submit(() -> ThreadUtil.sleep(500L));
|
||||
executor.submit(() -> ThreadUtil.sleep(500L));
|
||||
// reject 3 tasks
|
||||
executor.submit(() -> {
|
||||
});
|
||||
executor.submit(() -> {
|
||||
});
|
||||
executor.submit(() -> {
|
||||
});
|
||||
|
||||
ThreadUtil.sleep(500L);
|
||||
Assert.assertEquals(3, plugin.getInvokeCount().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvokeTestShutdownAwarePluginWhenShutdown() throws InterruptedException {
|
||||
TestShutdownAwarePlugin plugin = new TestShutdownAwarePlugin();
|
||||
executor.register(plugin);
|
||||
executor.shutdown();
|
||||
executor.submit(() -> {throw new IllegalArgumentException("???");});
|
||||
if (executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
|
||||
Assert.assertEquals(3, plugin.getInvokeCount().get());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvokeTestShutdownAwarePluginWhenShutdownNow() throws InterruptedException {
|
||||
TestShutdownAwarePlugin plugin = new TestShutdownAwarePlugin();
|
||||
executor.register(plugin);
|
||||
executor.shutdownNow();
|
||||
if (executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
|
||||
Assert.assertEquals(3, plugin.getInvokeCount().get());
|
||||
}
|
||||
}
|
||||
|
||||
@Getter
|
||||
private final static class TestTaskAwarePlugin implements TaskAwarePlugin {
|
||||
private final AtomicInteger invokeCount = new AtomicInteger(0);
|
||||
private final String id = "TestTaskAwarePlugin";
|
||||
@Override
|
||||
public <V> Runnable beforeTaskCreate(ThreadPoolExecutor executor, Runnable runnable, V value) {
|
||||
invokeCount.incrementAndGet();
|
||||
return TaskAwarePlugin.super.beforeTaskCreate(executor, runnable, value);
|
||||
}
|
||||
@Override
|
||||
public <V> Callable<V> beforeTaskCreate(ThreadPoolExecutor executor, Callable<V> future) {
|
||||
invokeCount.incrementAndGet();
|
||||
return TaskAwarePlugin.super.beforeTaskCreate(executor, future);
|
||||
}
|
||||
@Override
|
||||
public Runnable beforeTaskExecute(Runnable runnable) {
|
||||
invokeCount.incrementAndGet();
|
||||
return TaskAwarePlugin.super.beforeTaskExecute(runnable);
|
||||
}
|
||||
}
|
||||
|
||||
@Getter
|
||||
private final static class TestExecuteAwarePlugin implements ExecuteAwarePlugin {
|
||||
private final AtomicInteger invokeCount = new AtomicInteger(0);
|
||||
private final String id = "TestExecuteAwarePlugin";
|
||||
@Override
|
||||
public void beforeExecute(Thread thread, Runnable runnable) {
|
||||
invokeCount.incrementAndGet();
|
||||
ExecuteAwarePlugin.super.beforeExecute(thread, runnable);
|
||||
}
|
||||
@Override
|
||||
public void afterExecute(Runnable runnable, Throwable throwable) {
|
||||
invokeCount.incrementAndGet();
|
||||
ExecuteAwarePlugin.super.afterExecute(runnable, throwable);
|
||||
}
|
||||
}
|
||||
|
||||
@Getter
|
||||
private final static class TestRejectedAwarePlugin implements RejectedAwarePlugin {
|
||||
private final AtomicInteger invokeCount = new AtomicInteger(0);
|
||||
private final String id = "TestRejectedAwarePlugin";
|
||||
@Override
|
||||
public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
|
||||
invokeCount.incrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
@Getter
|
||||
private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin {
|
||||
private final AtomicInteger invokeCount = new AtomicInteger(0);
|
||||
private final String id = "TestShutdownAwarePlugin";
|
||||
@Override
|
||||
public void beforeShutdown(ThreadPoolExecutor executor) {
|
||||
invokeCount.incrementAndGet();
|
||||
ShutdownAwarePlugin.super.beforeShutdown(executor);
|
||||
}
|
||||
@Override
|
||||
public void afterShutdown(ThreadPoolExecutor executor, List<Runnable> remainingTasks) {
|
||||
invokeCount.incrementAndGet();
|
||||
ShutdownAwarePlugin.super.afterShutdown(executor, remainingTasks);
|
||||
}
|
||||
@Override
|
||||
public void afterTerminated(ExtensibleThreadPoolExecutor executor) {
|
||||
invokeCount.incrementAndGet();
|
||||
ShutdownAwarePlugin.super.afterTerminated(executor);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,119 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cn.hippo4j.core.plugin;
|
||||
|
||||
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* test for {@link DefaultThreadPoolPluginManager}
|
||||
*/
|
||||
public class DefaultThreadPoolPluginManagerTest {
|
||||
|
||||
private DefaultThreadPoolPluginManager registry;
|
||||
|
||||
@Before
|
||||
public void initRegistry() {
|
||||
registry = new DefaultThreadPoolPluginManager();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegister() {
|
||||
TaskAwarePlugin taskAwarePlugin = new TestTaskAwarePlugin();
|
||||
registry.register(taskAwarePlugin);
|
||||
Assert.assertThrows(IllegalArgumentException.class, () -> registry.register(taskAwarePlugin));
|
||||
Assert.assertTrue(registry.isRegistered(taskAwarePlugin.getId()));
|
||||
Assert.assertEquals(1, registry.getTaskAwarePluginList().size());
|
||||
Assert.assertSame(taskAwarePlugin, registry.getPlugin(taskAwarePlugin.getId()).orElse(null));
|
||||
registry.getPluginOfType(taskAwarePlugin.getId(), TestTaskAwarePlugin.class)
|
||||
.ifPresent(plugin -> Assert.assertSame(plugin, taskAwarePlugin));
|
||||
Assert.assertEquals(taskAwarePlugin.getId(), registry.getPluginOfType(taskAwarePlugin.getId(), TestTaskAwarePlugin.class).map(TestTaskAwarePlugin::getId).orElse(null));
|
||||
registry.unregister(taskAwarePlugin.getId());
|
||||
Assert.assertFalse(registry.getPlugin(taskAwarePlugin.getId()).isPresent());
|
||||
|
||||
ExecuteAwarePlugin executeAwarePlugin = new TestExecuteAwarePlugin();
|
||||
registry.register(executeAwarePlugin);
|
||||
Assert.assertTrue(registry.isRegistered(executeAwarePlugin.getId()));
|
||||
Assert.assertEquals(1, registry.getExecuteAwarePluginList().size());
|
||||
|
||||
RejectedAwarePlugin rejectedAwarePlugin = new TestRejectedAwarePlugin();
|
||||
registry.register(rejectedAwarePlugin);
|
||||
Assert.assertTrue(registry.isRegistered(rejectedAwarePlugin.getId()));
|
||||
Assert.assertEquals(1, registry.getRejectedAwarePluginList().size());
|
||||
|
||||
ShutdownAwarePlugin shutdownAwarePlugin = new TestShutdownAwarePlugin();
|
||||
registry.register(shutdownAwarePlugin);
|
||||
Assert.assertTrue(registry.isRegistered(shutdownAwarePlugin.getId()));
|
||||
Assert.assertEquals(1, registry.getShutdownAwarePluginList().size());
|
||||
}
|
||||
|
||||
private final static class TestTaskAwarePlugin implements TaskAwarePlugin {
|
||||
|
||||
/**
|
||||
* Get id.
|
||||
*
|
||||
* @return id
|
||||
*/
|
||||
@Override
|
||||
public String getId() {
|
||||
return "TestTaskAwarePlugin";
|
||||
}
|
||||
}
|
||||
|
||||
private final static class TestExecuteAwarePlugin implements ExecuteAwarePlugin {
|
||||
|
||||
/**
|
||||
* Get id.
|
||||
*
|
||||
* @return id
|
||||
*/
|
||||
@Override
|
||||
public String getId() {
|
||||
return "TestExecuteAwarePlugin";
|
||||
}
|
||||
}
|
||||
|
||||
private final static class TestRejectedAwarePlugin implements RejectedAwarePlugin {
|
||||
|
||||
/**
|
||||
* Get id.
|
||||
*
|
||||
* @return id
|
||||
*/
|
||||
@Override
|
||||
public String getId() {
|
||||
return "TestRejectedAwarePlugin";
|
||||
}
|
||||
}
|
||||
|
||||
private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin {
|
||||
|
||||
/**
|
||||
* Get id.
|
||||
*
|
||||
* @return id
|
||||
*/
|
||||
@Override
|
||||
public String getId() {
|
||||
return "TestShutdownAwarePlugin";
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,77 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cn.hippo4j.core.plugin;
|
||||
|
||||
import cn.hippo4j.common.toolkit.ThreadUtil;
|
||||
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
|
||||
import cn.hippo4j.core.plugin.impl.TaskRejectCountRecordPlugin;
|
||||
import cn.hippo4j.core.plugin.impl.TaskTimeRecordPlugin;
|
||||
import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin;
|
||||
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.SneakyThrows;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* test {@link ThreadPoolPlugin}'s info to json
|
||||
*/
|
||||
public class PluginRuntimeTest {
|
||||
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
@SneakyThrows
|
||||
@Test
|
||||
public void testGetPluginRuntime() {
|
||||
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
|
||||
"test", new DefaultThreadPoolPluginManager(),
|
||||
1, 1, 1000L, TimeUnit.MILLISECONDS,
|
||||
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
|
||||
|
||||
// TaskRejectCountRecordPlugin
|
||||
TaskRejectCountRecordPlugin taskRejectCountRecordPlugin = new TaskRejectCountRecordPlugin();
|
||||
executor.register(taskRejectCountRecordPlugin);
|
||||
|
||||
// TaskRejectCountRecordPlugin
|
||||
TaskTimeRecordPlugin taskTimeRecordPlugin = new TaskTimeRecordPlugin();
|
||||
executor.register(taskTimeRecordPlugin);
|
||||
|
||||
// ThreadPoolExecutorShutdownPlugin
|
||||
ThreadPoolExecutorShutdownPlugin executorShutdownPlugin = new ThreadPoolExecutorShutdownPlugin(2000L, true);
|
||||
executor.register(executorShutdownPlugin);
|
||||
|
||||
executor.submit(() -> ThreadUtil.sleep(100L));
|
||||
executor.submit(() -> ThreadUtil.sleep(300L));
|
||||
executor.submit(() -> ThreadUtil.sleep(200L));
|
||||
|
||||
ThreadUtil.sleep(1000L);
|
||||
List<PluginRuntime> runtimeList = executor.getAllPlugins().stream()
|
||||
.map(ThreadPoolPlugin::getPluginRuntime)
|
||||
.collect(Collectors.toList());
|
||||
Assert.assertEquals(3, runtimeList.size());
|
||||
|
||||
System.out.println(objectMapper.writeValueAsString(runtimeList));
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,60 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cn.hippo4j.core.plugin.impl;
|
||||
|
||||
import cn.hippo4j.common.toolkit.ThreadUtil;
|
||||
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
|
||||
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* test for {@link TaskDecoratorPlugin}
|
||||
*/
|
||||
public class TaskDecoratorPluginTest {
|
||||
|
||||
private final AtomicInteger taskExecuteCount = new AtomicInteger(0);
|
||||
|
||||
@Test
|
||||
public void testExecute() {
|
||||
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
|
||||
"test", new DefaultThreadPoolPluginManager(),
|
||||
5, 5, 1000L, TimeUnit.MILLISECONDS,
|
||||
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
|
||||
TaskDecoratorPlugin plugin = new TaskDecoratorPlugin();
|
||||
plugin.addDecorator(runnable -> () -> {
|
||||
taskExecuteCount.incrementAndGet();
|
||||
runnable.run();
|
||||
});
|
||||
plugin.addDecorator(runnable -> () -> {
|
||||
taskExecuteCount.incrementAndGet();
|
||||
runnable.run();
|
||||
});
|
||||
executor.register(plugin);
|
||||
executor.execute(() -> {
|
||||
});
|
||||
ThreadUtil.sleep(500L);
|
||||
Assert.assertEquals(2, taskExecuteCount.get());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,52 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cn.hippo4j.core.plugin.impl;
|
||||
|
||||
import cn.hippo4j.common.toolkit.ThreadUtil;
|
||||
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
|
||||
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* test for {@link TaskRejectCountRecordPlugin}
|
||||
*/
|
||||
public class TaskRejectCountRecordPluginTest {
|
||||
|
||||
@Test
|
||||
public void testExecute() {
|
||||
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
|
||||
"test", new DefaultThreadPoolPluginManager(),
|
||||
1, 1, 1000L, TimeUnit.MILLISECONDS,
|
||||
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
|
||||
|
||||
TaskRejectCountRecordPlugin plugin = new TaskRejectCountRecordPlugin();
|
||||
executor.register(plugin);
|
||||
executor.submit(() -> ThreadUtil.sleep(500L));
|
||||
executor.submit(() -> ThreadUtil.sleep(500L));
|
||||
executor.submit(() -> ThreadUtil.sleep(500L));
|
||||
|
||||
ThreadUtil.sleep(500L);
|
||||
Assert.assertEquals((Long) 1L, plugin.getRejectCountNum());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,74 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cn.hippo4j.core.plugin.impl;
|
||||
|
||||
import cn.hippo4j.common.toolkit.ThreadUtil;
|
||||
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
|
||||
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* test for {@link TaskRejectNotifyAlarmPlugin}
|
||||
*/
|
||||
public class TaskRejectNotifyAlarmPluginTest {
|
||||
|
||||
@Test
|
||||
public void testBeforeRejectedExecution() {
|
||||
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
|
||||
"test", new DefaultThreadPoolPluginManager(),
|
||||
1, 1, 1000L, TimeUnit.MILLISECONDS,
|
||||
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
|
||||
|
||||
AtomicInteger rejectCount = new AtomicInteger(0);
|
||||
executor.register(new TestTaskRejectNotifyAlarmPlugin(rejectCount, executor));
|
||||
executor.submit(() -> ThreadUtil.sleep(200L));
|
||||
executor.submit(() -> ThreadUtil.sleep(200L));
|
||||
executor.submit(() -> ThreadUtil.sleep(200L));
|
||||
|
||||
ThreadUtil.sleep(1000L);
|
||||
Assert.assertEquals(1, rejectCount.get());
|
||||
}
|
||||
|
||||
@RequiredArgsConstructor
|
||||
private static class TestTaskRejectNotifyAlarmPlugin extends TaskRejectNotifyAlarmPlugin {
|
||||
|
||||
private final AtomicInteger count;
|
||||
private final ThreadPoolExecutor targetExecutor;
|
||||
|
||||
/**
|
||||
* Callback before task is rejected.
|
||||
*
|
||||
* @param runnable task
|
||||
* @param executor executor
|
||||
*/
|
||||
@Override
|
||||
public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
|
||||
count.incrementAndGet();
|
||||
Assert.assertEquals(targetExecutor, executor);
|
||||
super.beforeRejectedExecution(runnable, executor);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,55 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cn.hippo4j.core.plugin.impl;
|
||||
|
||||
import cn.hippo4j.common.toolkit.ThreadUtil;
|
||||
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
|
||||
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* test for {@link TaskTimeRecordPlugin}
|
||||
*/
|
||||
public class TaskTimeRecordPluginTest {
|
||||
|
||||
@Test
|
||||
public void testExecute() {
|
||||
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
|
||||
"test", new DefaultThreadPoolPluginManager(),
|
||||
3, 3, 1000L, TimeUnit.MILLISECONDS,
|
||||
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
|
||||
|
||||
TaskTimeRecordPlugin plugin = new TaskTimeRecordPlugin();
|
||||
executor.register(plugin);
|
||||
executor.submit(() -> ThreadUtil.sleep(100L));
|
||||
executor.submit(() -> ThreadUtil.sleep(300L));
|
||||
executor.submit(() -> ThreadUtil.sleep(200L));
|
||||
|
||||
ThreadUtil.sleep(1000L);
|
||||
TaskTimeRecordPlugin.Summary summary = plugin.summarize();
|
||||
Assert.assertEquals(1, summary.getMinTaskTimeMillis() / 100L);
|
||||
Assert.assertEquals(3, summary.getMaxTaskTimeMillis() / 100L);
|
||||
Assert.assertEquals(2, summary.getAvgTaskTimeMillis() / 100L);
|
||||
Assert.assertEquals(6, summary.getTotalTaskTimeMillis() / 100L);
|
||||
}
|
||||
}
|
@ -0,0 +1,83 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cn.hippo4j.core.plugin.impl;
|
||||
|
||||
import cn.hippo4j.common.toolkit.ThreadUtil;
|
||||
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
|
||||
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
|
||||
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* test for {@link ThreadPoolExecutorShutdownPlugin}
|
||||
*/
|
||||
public class ThreadPoolExecutorShutdownPluginTest {
|
||||
|
||||
public ExtensibleThreadPoolExecutor getExecutor(ThreadPoolPlugin plugin) {
|
||||
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
|
||||
"test", new DefaultThreadPoolPluginManager(),
|
||||
2, 2, 1000L, TimeUnit.MILLISECONDS,
|
||||
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
|
||||
executor.register(plugin);
|
||||
return executor;
|
||||
}
|
||||
|
||||
private static Callable<Integer> getCallable(AtomicInteger completedCount) {
|
||||
return () -> {
|
||||
ThreadUtil.sleep(1000L);
|
||||
return completedCount.incrementAndGet();
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteShutdownWhenWaitTaskCompleted() {
|
||||
ExtensibleThreadPoolExecutor executor = getExecutor(
|
||||
new ThreadPoolExecutorShutdownPlugin(2000L, true));
|
||||
|
||||
AtomicInteger completedCount = new AtomicInteger(0);
|
||||
Callable<Integer> future1 = getCallable(completedCount);
|
||||
Callable<Integer> future2 = getCallable(completedCount);
|
||||
executor.submit(future1);
|
||||
executor.submit(future2);
|
||||
|
||||
executor.shutdown();
|
||||
Assert.assertEquals(2, completedCount.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteShutdownWhenNotWaitTaskCompleted() {
|
||||
ExtensibleThreadPoolExecutor executor = getExecutor(
|
||||
new ThreadPoolExecutorShutdownPlugin(-1L, true));
|
||||
|
||||
AtomicInteger completedCount = new AtomicInteger(0);
|
||||
Callable<Integer> future1 = getCallable(completedCount);
|
||||
Callable<Integer> future2 = getCallable(completedCount);
|
||||
executor.submit(future1);
|
||||
executor.submit(future2);
|
||||
|
||||
executor.shutdown();
|
||||
Assert.assertEquals(0, completedCount.get());
|
||||
}
|
||||
}
|
Loading…
Reference in new issue