diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java
index e5dab547..8f4d2941 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java
@@ -25,7 +25,9 @@ import lombok.NoArgsConstructor;
import org.springframework.beans.factory.DisposableBean;
import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
/**
@@ -37,10 +39,27 @@ import java.util.concurrent.ThreadPoolExecutor;
@AllArgsConstructor
public class DynamicThreadPoolWrapper implements DisposableBean {
+ /**
+ * Determine the unique identifier of the thread pool in the project.
+ *
+ * @param tenantId tenant id
+ * @param itemId project id in the team
+ * @param threadPoolId Thread pool identifier under the project
+ */
private String tenantId, itemId, threadPoolId;
+ /**
+ * Whether the thread pool has completed initialization,
+ * and whether to subscribe to server-side configuration change events.
+ *
+ * @param subscribeFlag subscription server configuration id
+ * @param initFlag initial configuration complete flag
+ */
private boolean subscribeFlag, initFlag;
+ /**
+ * Thread pool executor.
+ */
private ThreadPoolExecutor executor;
public DynamicThreadPoolWrapper(String threadPoolId) {
@@ -53,14 +72,62 @@ public class DynamicThreadPoolWrapper implements DisposableBean {
this.subscribeFlag = true;
}
+ /**
+ * Executes the given task sometime in the future. The task
+ * may execute in a new thread or in an existing pooled thread.
+ *
+ * If the task cannot be submitted for execution, either because this
+ * executor has been shutdown or because its capacity has been reached,
+ * the task is handled by the current {@code RejectedExecutionHandler}.
+ *
+ * @param command the task to execute
+ * @throws RejectedExecutionException at discretion of
+ * {@code RejectedExecutionHandler}, if the task
+ * cannot be accepted for execution
+ * @throws NullPointerException if {@code command} is null
+ */
public void execute(Runnable command) {
executor.execute(command);
}
+ /**
+ * Submits a Runnable task for execution and returns a Future
+ * representing that task. The Future's {@code get} method will
+ * return {@code null} upon successful completion.
+ *
+ * @param task the task to submit
+ * @return a Future representing pending completion of the task
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
+ * @throws NullPointerException if the task is null
+ */
public Future> submit(Runnable task) {
return executor.submit(task);
}
+ /**
+ * Submits a value-returning task for execution and returns a
+ * Future representing the pending results of the task. The
+ * Future's {@code get} method will return the task's result upon
+ * successful completion.
+ *
+ *
+ * If you would like to immediately block waiting
+ * for a task, you can use constructions of the form
+ * {@code result = exec.submit(aCallable).get();}
+ *
+ *
Note: The {@link Executors} class includes a set of methods
+ * that can convert some other common closure-like objects,
+ * for example, {@link java.security.PrivilegedAction} to
+ * {@link Callable} form so they can be submitted.
+ *
+ * @param task the task to submit
+ * @param the type of the task's result
+ * @return a Future representing pending completion of the task
+ * @throws RejectedExecutionException if the task cannot be
+ * scheduled for execution
+ * @throws NullPointerException if the task is null
+ */
public Future submit(Callable task) {
return executor.submit(task);
}
diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java
index cfa344a0..63d362af 100644
--- a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java
+++ b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java
@@ -80,9 +80,9 @@ public class DynamicThreadPoolExecutorTest {
@Test
public void testDestroyWhenWaitForTask() {
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
- 1, 1, 1000L, TimeUnit.MILLISECONDS,
- 1000L, true, 1000L,
- new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
+ 1, 1, 1000L, TimeUnit.MILLISECONDS,
+ 1000L, true, 1000L,
+ new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
AtomicInteger count = new AtomicInteger(0);
executor.execute(() -> {
@@ -96,16 +96,17 @@ public class DynamicThreadPoolExecutorTest {
executor.destroy();
// waitting for terminated
- while (!executor.isTerminated()){};
+ while (!executor.isTerminated()) {
+ } ;
Assert.assertEquals(2, count.get());
}
@Test
public void testDestroyWhenNotWaitForTask() {
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
- 1, 1, 1000L, TimeUnit.MILLISECONDS,
- 1000L, false, 1000L,
- new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
+ 1, 1, 1000L, TimeUnit.MILLISECONDS,
+ 1000L, false, 1000L,
+ new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
AtomicInteger count = new AtomicInteger(0);
executor.execute(() -> {
@@ -119,7 +120,8 @@ public class DynamicThreadPoolExecutorTest {
executor.destroy();
// waitting for terminated
- while (!executor.isTerminated()){};
+ while (!executor.isTerminated()) {
+ } ;
Assert.assertEquals(1, count.get());
}
diff --git a/hippo4j-example/hippo4j-example-core/src/test/java/cn/hippo4j/example/core/TaskTimeRecordPluginBenchmarkTest.java b/hippo4j-example/hippo4j-example-core/src/test/java/cn/hippo4j/example/core/TaskTimeRecordPluginBenchmarkTest.java
index 2bf935a4..408af6e0 100644
--- a/hippo4j-example/hippo4j-example-core/src/test/java/cn/hippo4j/example/core/TaskTimeRecordPluginBenchmarkTest.java
+++ b/hippo4j-example/hippo4j-example-core/src/test/java/cn/hippo4j/example/core/TaskTimeRecordPluginBenchmarkTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package cn.hippo4j.example.core;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
@@ -34,15 +51,16 @@ public class TaskTimeRecordPluginBenchmarkTest {
public void origin_200(Blackhole blackhole) {
int threadCount = 200;
ThreadPoolExecutor executor = new ThreadPoolExecutor(
- threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
+ threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
executor.prestartAllCoreThreads();
List tasks = getTask(threadCount, blackhole);
tasks.forEach(executor::execute);
executor.shutdown();
- while (!executor.isTerminated()) {}
+ while (!executor.isTerminated()) {
+ }
}
@SneakyThrows
@@ -50,15 +68,16 @@ public class TaskTimeRecordPluginBenchmarkTest {
public void origin_50(Blackhole blackhole) {
int threadCount = 50;
ThreadPoolExecutor executor = new ThreadPoolExecutor(
- threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
+ threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
executor.prestartAllCoreThreads();
List tasks = getTask(threadCount, blackhole);
tasks.forEach(executor::execute);
executor.shutdown();
- while (!executor.isTerminated()) {}
+ while (!executor.isTerminated()) {
+ }
}
@SneakyThrows
@@ -66,16 +85,17 @@ public class TaskTimeRecordPluginBenchmarkTest {
public void not_plugin_50(Blackhole blackhole) {
int threadCount = 50;
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
- "test", new DefaultThreadPoolPluginManager(),
- threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
+ "test", new DefaultThreadPoolPluginManager(),
+ threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
executor.prestartAllCoreThreads();
List tasks = getTask(threadCount, blackhole);
tasks.forEach(executor::execute);
executor.shutdown();
- while (!executor.isTerminated()) {}
+ while (!executor.isTerminated()) {
+ }
}
@SneakyThrows
@@ -83,16 +103,17 @@ public class TaskTimeRecordPluginBenchmarkTest {
public void not_plugin_200(Blackhole blackhole) {
int threadCount = 200;
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
- "test", new DefaultThreadPoolPluginManager(),
- threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
+ "test", new DefaultThreadPoolPluginManager(),
+ threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
executor.prestartAllCoreThreads();
List tasks = getTask(threadCount, blackhole);
tasks.forEach(executor::execute);
executor.shutdown();
- while (!executor.isTerminated()) {}
+ while (!executor.isTerminated()) {
+ }
}
@SneakyThrows
@@ -100,9 +121,9 @@ public class TaskTimeRecordPluginBenchmarkTest {
public void plugin_50(Blackhole blackhole) {
int threadCount = 50;
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
- "test", new DefaultThreadPoolPluginManager(),
- threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
+ "test", new DefaultThreadPoolPluginManager(),
+ threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
executor.prestartAllCoreThreads();
executor.register(new TaskTimeRecordPlugin());
@@ -110,7 +131,8 @@ public class TaskTimeRecordPluginBenchmarkTest {
tasks.forEach(executor::execute);
executor.shutdown();
- while (!executor.isTerminated()) {}
+ while (!executor.isTerminated()) {
+ }
}
@SneakyThrows
@@ -118,9 +140,9 @@ public class TaskTimeRecordPluginBenchmarkTest {
public void plugin_200(Blackhole blackhole) {
int threadCount = 200;
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
- "test", new DefaultThreadPoolPluginManager(),
- threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
+ "test", new DefaultThreadPoolPluginManager(),
+ threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
executor.prestartAllCoreThreads();
executor.register(new TaskTimeRecordPlugin());
@@ -128,7 +150,8 @@ public class TaskTimeRecordPluginBenchmarkTest {
tasks.forEach(executor::execute);
executor.shutdown();
- while (!executor.isTerminated()) {}
+ while (!executor.isTerminated()) {
+ }
}
private List getTask(int count, Blackhole blackhole) {
@@ -142,9 +165,9 @@ public class TaskTimeRecordPluginBenchmarkTest {
public static void main(String[] args) throws Exception {
Options opts = new OptionsBuilder()
- .include(TaskTimeRecordPluginBenchmarkTest.class.getSimpleName())
- .resultFormat(ResultFormatType.JSON)
- .build();
+ .include(TaskTimeRecordPluginBenchmarkTest.class.getSimpleName())
+ .resultFormat(ResultFormatType.JSON)
+ .build();
new Runner(opts).run();
}