Field style refactoring and adding method logs (#878)

pull/892/head
chen.ma 2 years ago
parent 334fae1f12
commit 05bfa3c66d

@ -25,7 +25,9 @@ import lombok.NoArgsConstructor;
import org.springframework.beans.factory.DisposableBean;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
/**
@ -37,10 +39,27 @@ import java.util.concurrent.ThreadPoolExecutor;
@AllArgsConstructor
public class DynamicThreadPoolWrapper implements DisposableBean {
/**
* Determine the unique identifier of the thread pool in the project.
*
* @param tenantId tenant id
* @param itemId project id in the team
* @param threadPoolId Thread pool identifier under the project
*/
private String tenantId, itemId, threadPoolId;
/**
* Whether the thread pool has completed initialization,
* and whether to subscribe to server-side configuration change events.
*
* @param subscribeFlag subscription server configuration id
* @param initFlag initial configuration complete flag
*/
private boolean subscribeFlag, initFlag;
/**
* Thread pool executor.
*/
private ThreadPoolExecutor executor;
public DynamicThreadPoolWrapper(String threadPoolId) {
@ -53,14 +72,62 @@ public class DynamicThreadPoolWrapper implements DisposableBean {
this.subscribeFlag = true;
}
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
* <p>
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
executor.execute(command);
}
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return {@code null} upon <em>successful</em> completion.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
public Future<?> submit(Runnable task) {
return executor.submit(task);
}
/**
* Submits a value-returning task for execution and returns a
* Future representing the pending results of the task. The
* Future's {@code get} method will return the task's result upon
* successful completion.
*
* <p>
* If you would like to immediately block waiting
* for a task, you can use constructions of the form
* {@code result = exec.submit(aCallable).get();}
*
* <p>Note: The {@link Executors} class includes a set of methods
* that can convert some other common closure-like objects,
* for example, {@link java.security.PrivilegedAction} to
* {@link Callable} form so they can be submitted.
*
* @param task the task to submit
* @param <T> the type of the task's result
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
public <T> Future<T> submit(Callable<T> task) {
return executor.submit(task);
}

@ -80,9 +80,9 @@ public class DynamicThreadPoolExecutorTest {
@Test
public void testDestroyWhenWaitForTask() {
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
1, 1, 1000L, TimeUnit.MILLISECONDS,
1000L, true, 1000L,
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
1, 1, 1000L, TimeUnit.MILLISECONDS,
1000L, true, 1000L,
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
AtomicInteger count = new AtomicInteger(0);
executor.execute(() -> {
@ -96,16 +96,17 @@ public class DynamicThreadPoolExecutorTest {
executor.destroy();
// waitting for terminated
while (!executor.isTerminated()){};
while (!executor.isTerminated()) {
} ;
Assert.assertEquals(2, count.get());
}
@Test
public void testDestroyWhenNotWaitForTask() {
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
1, 1, 1000L, TimeUnit.MILLISECONDS,
1000L, false, 1000L,
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
1, 1, 1000L, TimeUnit.MILLISECONDS,
1000L, false, 1000L,
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
AtomicInteger count = new AtomicInteger(0);
executor.execute(() -> {
@ -119,7 +120,8 @@ public class DynamicThreadPoolExecutorTest {
executor.destroy();
// waitting for terminated
while (!executor.isTerminated()){};
while (!executor.isTerminated()) {
} ;
Assert.assertEquals(1, count.get());
}

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.example.core;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
@ -34,15 +51,16 @@ public class TaskTimeRecordPluginBenchmarkTest {
public void origin_200(Blackhole blackhole) {
int threadCount = 200;
ThreadPoolExecutor executor = new ThreadPoolExecutor(
threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
executor.prestartAllCoreThreads();
List<Runnable> tasks = getTask(threadCount, blackhole);
tasks.forEach(executor::execute);
executor.shutdown();
while (!executor.isTerminated()) {}
while (!executor.isTerminated()) {
}
}
@SneakyThrows
@ -50,15 +68,16 @@ public class TaskTimeRecordPluginBenchmarkTest {
public void origin_50(Blackhole blackhole) {
int threadCount = 50;
ThreadPoolExecutor executor = new ThreadPoolExecutor(
threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
executor.prestartAllCoreThreads();
List<Runnable> tasks = getTask(threadCount, blackhole);
tasks.forEach(executor::execute);
executor.shutdown();
while (!executor.isTerminated()) {}
while (!executor.isTerminated()) {
}
}
@SneakyThrows
@ -66,16 +85,17 @@ public class TaskTimeRecordPluginBenchmarkTest {
public void not_plugin_50(Blackhole blackhole) {
int threadCount = 50;
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginManager(),
threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
"test", new DefaultThreadPoolPluginManager(),
threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
executor.prestartAllCoreThreads();
List<Runnable> tasks = getTask(threadCount, blackhole);
tasks.forEach(executor::execute);
executor.shutdown();
while (!executor.isTerminated()) {}
while (!executor.isTerminated()) {
}
}
@SneakyThrows
@ -83,16 +103,17 @@ public class TaskTimeRecordPluginBenchmarkTest {
public void not_plugin_200(Blackhole blackhole) {
int threadCount = 200;
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginManager(),
threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
"test", new DefaultThreadPoolPluginManager(),
threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
executor.prestartAllCoreThreads();
List<Runnable> tasks = getTask(threadCount, blackhole);
tasks.forEach(executor::execute);
executor.shutdown();
while (!executor.isTerminated()) {}
while (!executor.isTerminated()) {
}
}
@SneakyThrows
@ -100,9 +121,9 @@ public class TaskTimeRecordPluginBenchmarkTest {
public void plugin_50(Blackhole blackhole) {
int threadCount = 50;
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginManager(),
threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
"test", new DefaultThreadPoolPluginManager(),
threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
executor.prestartAllCoreThreads();
executor.register(new TaskTimeRecordPlugin());
@ -110,7 +131,8 @@ public class TaskTimeRecordPluginBenchmarkTest {
tasks.forEach(executor::execute);
executor.shutdown();
while (!executor.isTerminated()) {}
while (!executor.isTerminated()) {
}
}
@SneakyThrows
@ -118,9 +140,9 @@ public class TaskTimeRecordPluginBenchmarkTest {
public void plugin_200(Blackhole blackhole) {
int threadCount = 200;
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginManager(),
threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
"test", new DefaultThreadPoolPluginManager(),
threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
executor.prestartAllCoreThreads();
executor.register(new TaskTimeRecordPlugin());
@ -128,7 +150,8 @@ public class TaskTimeRecordPluginBenchmarkTest {
tasks.forEach(executor::execute);
executor.shutdown();
while (!executor.isTerminated()) {}
while (!executor.isTerminated()) {
}
}
private List<Runnable> getTask(int count, Blackhole blackhole) {
@ -142,9 +165,9 @@ public class TaskTimeRecordPluginBenchmarkTest {
public static void main(String[] args) throws Exception {
Options opts = new OptionsBuilder()
.include(TaskTimeRecordPluginBenchmarkTest.class.getSimpleName())
.resultFormat(ResultFormatType.JSON)
.build();
.include(TaskTimeRecordPluginBenchmarkTest.class.getSimpleName())
.resultFormat(ResultFormatType.JSON)
.build();
new Runner(opts).run();
}

Loading…
Cancel
Save