From 05bfa3c66d28df4db1e5b2ec31ad991edc47b171 Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Tue, 1 Nov 2022 20:13:19 +0800 Subject: [PATCH] Field style refactoring and adding method logs (#878) --- .../executor/DynamicThreadPoolWrapper.java | 67 +++++++++++++++++ .../DynamicThreadPoolExecutorTest.java | 18 +++-- .../TaskTimeRecordPluginBenchmarkTest.java | 73 ++++++++++++------- 3 files changed, 125 insertions(+), 33 deletions(-) diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java index e5dab547..8f4d2941 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java @@ -25,7 +25,9 @@ import lombok.NoArgsConstructor; import org.springframework.beans.factory.DisposableBean; import java.util.concurrent.Callable; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; /** @@ -37,10 +39,27 @@ import java.util.concurrent.ThreadPoolExecutor; @AllArgsConstructor public class DynamicThreadPoolWrapper implements DisposableBean { + /** + * Determine the unique identifier of the thread pool in the project. + * + * @param tenantId tenant id + * @param itemId project id in the team + * @param threadPoolId Thread pool identifier under the project + */ private String tenantId, itemId, threadPoolId; + /** + * Whether the thread pool has completed initialization, + * and whether to subscribe to server-side configuration change events. + * + * @param subscribeFlag subscription server configuration id + * @param initFlag initial configuration complete flag + */ private boolean subscribeFlag, initFlag; + /** + * Thread pool executor. + */ private ThreadPoolExecutor executor; public DynamicThreadPoolWrapper(String threadPoolId) { @@ -53,14 +72,62 @@ public class DynamicThreadPoolWrapper implements DisposableBean { this.subscribeFlag = true; } + /** + * Executes the given task sometime in the future. The task + * may execute in a new thread or in an existing pooled thread. + *

+ * If the task cannot be submitted for execution, either because this + * executor has been shutdown or because its capacity has been reached, + * the task is handled by the current {@code RejectedExecutionHandler}. + * + * @param command the task to execute + * @throws RejectedExecutionException at discretion of + * {@code RejectedExecutionHandler}, if the task + * cannot be accepted for execution + * @throws NullPointerException if {@code command} is null + */ public void execute(Runnable command) { executor.execute(command); } + /** + * Submits a Runnable task for execution and returns a Future + * representing that task. The Future's {@code get} method will + * return {@code null} upon successful completion. + * + * @param task the task to submit + * @return a Future representing pending completion of the task + * @throws RejectedExecutionException if the task cannot be + * scheduled for execution + * @throws NullPointerException if the task is null + */ public Future submit(Runnable task) { return executor.submit(task); } + /** + * Submits a value-returning task for execution and returns a + * Future representing the pending results of the task. The + * Future's {@code get} method will return the task's result upon + * successful completion. + * + *

+ * If you would like to immediately block waiting + * for a task, you can use constructions of the form + * {@code result = exec.submit(aCallable).get();} + * + *

Note: The {@link Executors} class includes a set of methods + * that can convert some other common closure-like objects, + * for example, {@link java.security.PrivilegedAction} to + * {@link Callable} form so they can be submitted. + * + * @param task the task to submit + * @param the type of the task's result + * @return a Future representing pending completion of the task + * @throws RejectedExecutionException if the task cannot be + * scheduled for execution + * @throws NullPointerException if the task is null + */ public Future submit(Callable task) { return executor.submit(task); } diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java index cfa344a0..63d362af 100644 --- a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java @@ -80,9 +80,9 @@ public class DynamicThreadPoolExecutorTest { @Test public void testDestroyWhenWaitForTask() { DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor( - 1, 1, 1000L, TimeUnit.MILLISECONDS, - 1000L, true, 1000L, - new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy()); + 1, 1, 1000L, TimeUnit.MILLISECONDS, + 1000L, true, 1000L, + new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy()); AtomicInteger count = new AtomicInteger(0); executor.execute(() -> { @@ -96,16 +96,17 @@ public class DynamicThreadPoolExecutorTest { executor.destroy(); // waitting for terminated - while (!executor.isTerminated()){}; + while (!executor.isTerminated()) { + } ; Assert.assertEquals(2, count.get()); } @Test public void testDestroyWhenNotWaitForTask() { DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor( - 1, 1, 1000L, TimeUnit.MILLISECONDS, - 1000L, false, 1000L, - new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy()); + 1, 1, 1000L, TimeUnit.MILLISECONDS, + 1000L, false, 1000L, + new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy()); AtomicInteger count = new AtomicInteger(0); executor.execute(() -> { @@ -119,7 +120,8 @@ public class DynamicThreadPoolExecutorTest { executor.destroy(); // waitting for terminated - while (!executor.isTerminated()){}; + while (!executor.isTerminated()) { + } ; Assert.assertEquals(1, count.get()); } diff --git a/hippo4j-example/hippo4j-example-core/src/test/java/cn/hippo4j/example/core/TaskTimeRecordPluginBenchmarkTest.java b/hippo4j-example/hippo4j-example-core/src/test/java/cn/hippo4j/example/core/TaskTimeRecordPluginBenchmarkTest.java index 2bf935a4..408af6e0 100644 --- a/hippo4j-example/hippo4j-example-core/src/test/java/cn/hippo4j/example/core/TaskTimeRecordPluginBenchmarkTest.java +++ b/hippo4j-example/hippo4j-example-core/src/test/java/cn/hippo4j/example/core/TaskTimeRecordPluginBenchmarkTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.example.core; import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; @@ -34,15 +51,16 @@ public class TaskTimeRecordPluginBenchmarkTest { public void origin_200(Blackhole blackhole) { int threadCount = 200; ThreadPoolExecutor executor = new ThreadPoolExecutor( - threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); + threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); executor.prestartAllCoreThreads(); List tasks = getTask(threadCount, blackhole); tasks.forEach(executor::execute); executor.shutdown(); - while (!executor.isTerminated()) {} + while (!executor.isTerminated()) { + } } @SneakyThrows @@ -50,15 +68,16 @@ public class TaskTimeRecordPluginBenchmarkTest { public void origin_50(Blackhole blackhole) { int threadCount = 50; ThreadPoolExecutor executor = new ThreadPoolExecutor( - threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); + threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); executor.prestartAllCoreThreads(); List tasks = getTask(threadCount, blackhole); tasks.forEach(executor::execute); executor.shutdown(); - while (!executor.isTerminated()) {} + while (!executor.isTerminated()) { + } } @SneakyThrows @@ -66,16 +85,17 @@ public class TaskTimeRecordPluginBenchmarkTest { public void not_plugin_50(Blackhole blackhole) { int threadCount = 50; ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( - "test", new DefaultThreadPoolPluginManager(), - threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); + "test", new DefaultThreadPoolPluginManager(), + threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); executor.prestartAllCoreThreads(); List tasks = getTask(threadCount, blackhole); tasks.forEach(executor::execute); executor.shutdown(); - while (!executor.isTerminated()) {} + while (!executor.isTerminated()) { + } } @SneakyThrows @@ -83,16 +103,17 @@ public class TaskTimeRecordPluginBenchmarkTest { public void not_plugin_200(Blackhole blackhole) { int threadCount = 200; ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( - "test", new DefaultThreadPoolPluginManager(), - threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); + "test", new DefaultThreadPoolPluginManager(), + threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); executor.prestartAllCoreThreads(); List tasks = getTask(threadCount, blackhole); tasks.forEach(executor::execute); executor.shutdown(); - while (!executor.isTerminated()) {} + while (!executor.isTerminated()) { + } } @SneakyThrows @@ -100,9 +121,9 @@ public class TaskTimeRecordPluginBenchmarkTest { public void plugin_50(Blackhole blackhole) { int threadCount = 50; ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( - "test", new DefaultThreadPoolPluginManager(), - threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); + "test", new DefaultThreadPoolPluginManager(), + threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); executor.prestartAllCoreThreads(); executor.register(new TaskTimeRecordPlugin()); @@ -110,7 +131,8 @@ public class TaskTimeRecordPluginBenchmarkTest { tasks.forEach(executor::execute); executor.shutdown(); - while (!executor.isTerminated()) {} + while (!executor.isTerminated()) { + } } @SneakyThrows @@ -118,9 +140,9 @@ public class TaskTimeRecordPluginBenchmarkTest { public void plugin_200(Blackhole blackhole) { int threadCount = 200; ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( - "test", new DefaultThreadPoolPluginManager(), - threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); + "test", new DefaultThreadPoolPluginManager(), + threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); executor.prestartAllCoreThreads(); executor.register(new TaskTimeRecordPlugin()); @@ -128,7 +150,8 @@ public class TaskTimeRecordPluginBenchmarkTest { tasks.forEach(executor::execute); executor.shutdown(); - while (!executor.isTerminated()) {} + while (!executor.isTerminated()) { + } } private List getTask(int count, Blackhole blackhole) { @@ -142,9 +165,9 @@ public class TaskTimeRecordPluginBenchmarkTest { public static void main(String[] args) throws Exception { Options opts = new OptionsBuilder() - .include(TaskTimeRecordPluginBenchmarkTest.class.getSimpleName()) - .resultFormat(ResultFormatType.JSON) - .build(); + .include(TaskTimeRecordPluginBenchmarkTest.class.getSimpleName()) + .resultFormat(ResultFormatType.JSON) + .build(); new Runner(opts).run(); }