mirror of https://github.com/longtai-cn/hippo4j
Merge b2c6ab3694
into 7d78be3cab
commit
2cb69f2bb3
@ -0,0 +1,279 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package cn.hippo4j.core.executor.plugin.impl;
|
||||||
|
|
||||||
|
import cn.hippo4j.core.executor.plugin.ExecuteAwarePlugin;
|
||||||
|
import cn.hippo4j.core.executor.plugin.PluginRuntime;
|
||||||
|
import cn.hippo4j.core.executor.plugin.manager.MaximumActiveThreadCountCheckerRegistrar;
|
||||||
|
import cn.hippo4j.core.executor.plugin.manager.ThreadPoolPluginSupport;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.util.concurrent.RunnableFuture;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>A plugin that checks whether the number of active threads in a thread pool exceeds the maximum thread count.
|
||||||
|
*
|
||||||
|
* <p>When task is submitted to the thread pool, before the current worker thread executes the task:
|
||||||
|
* <ol>
|
||||||
|
* <li>check whether the maximum number of threads in the thread pool is exceeded;</li>
|
||||||
|
* <li>
|
||||||
|
* if already exceeded, interrupt the worker thread,
|
||||||
|
* and throw an {@link IllegalMaximumActiveCountException} exception to destroy the worker thread;
|
||||||
|
* </li>
|
||||||
|
* </ol>
|
||||||
|
*
|
||||||
|
* <p><b>NOTE</b>: if custom {@link Thread.UncaughtExceptionHandler} is set for the thread pool,
|
||||||
|
* it may catch the {@link IllegalMaximumActiveCountException} exception and cause the worker thread to not be destroyed.
|
||||||
|
*
|
||||||
|
* @see MaximumActiveThreadCountCheckerRegistrar
|
||||||
|
* @see IllegalMaximumActiveCountException
|
||||||
|
*/
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@Slf4j
|
||||||
|
public class MaximumActiveThreadCountChecker implements ExecuteAwarePlugin {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thread pool.
|
||||||
|
*/
|
||||||
|
public final ThreadPoolPluginSupport threadPoolPluginSupport;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>A pointer to a number object used to record the number of threads in the thread pool
|
||||||
|
* that exceeds the maximum thread count.
|
||||||
|
* There are two possible scenarios:
|
||||||
|
* <ul>
|
||||||
|
* <li>Pointing to {@code null}: indicating that there are no threads in the thread pool exceeding the maximum count;</li>
|
||||||
|
* <li>
|
||||||
|
* Pointing to a number object, indicating that there are threads in the thread pool
|
||||||
|
* exceeding the maximum count, and the number represents the overflow thread count;
|
||||||
|
* </li>
|
||||||
|
* </ul>
|
||||||
|
*
|
||||||
|
* <p>After performing operations based on the original number object pointed by the pointer,
|
||||||
|
* the pointer may change:
|
||||||
|
* <ul>
|
||||||
|
* <li>The pointer remains unchanged, indicating the operations performed on the original number object are valid;</li>
|
||||||
|
* <li>
|
||||||
|
* The pointer points to another number object, indicating that the operations performed
|
||||||
|
* on the original number object should be considered invalid.
|
||||||
|
* It requires obtaining the number object again and retrying the operations
|
||||||
|
* </li>
|
||||||
|
* <li>
|
||||||
|
* The pointer points to {@code null}, indicating that there are no threads
|
||||||
|
* in the thread pool exceeding the maximum count.
|
||||||
|
* The operations based on the old reference are still effective;
|
||||||
|
* </li>
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
|
public final AtomicReference<AtomicInteger> overflowThreadNumber = new AtomicReference<>(null);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Set the number of overflow threads.
|
||||||
|
*
|
||||||
|
* @param number number of overflow threads
|
||||||
|
*/
|
||||||
|
public void setOverflowThreadNumber(int number) {
|
||||||
|
AtomicInteger currentOverflowThreads = (number > 0) ? new AtomicInteger(number) : null;
|
||||||
|
overflowThreadNumber.set(currentOverflowThreads);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Get the number of overflow threads.
|
||||||
|
*/
|
||||||
|
public Integer getOverflowThreadNumber() {
|
||||||
|
AtomicInteger currentOverflowThreads = overflowThreadNumber.get();
|
||||||
|
return currentOverflowThreads == null ? 0 : currentOverflowThreads.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Check {@link ThreadPoolExecutor#getActiveCount()} whether greater than {@link ThreadPoolExecutor#getMaximumPoolSize()}.
|
||||||
|
* if the number of threads in the thread pool exceeds the maximum thread count, set the number of overflow threads.
|
||||||
|
*
|
||||||
|
* @return number of overflow threads
|
||||||
|
* @see #setOverflowThreadNumber(int)
|
||||||
|
*/
|
||||||
|
public Integer checkOverflowThreads() {
|
||||||
|
ThreadPoolExecutor executor = threadPoolPluginSupport.getThreadPoolExecutor();
|
||||||
|
int activeCount = executor.getActiveCount();
|
||||||
|
int maximumActiveCount = executor.getMaximumPoolSize();
|
||||||
|
int number = activeCount > maximumActiveCount ? activeCount - maximumActiveCount : 0;
|
||||||
|
if (number > 0) {
|
||||||
|
setOverflowThreadNumber(number);
|
||||||
|
}
|
||||||
|
return number;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get plugin runtime info.
|
||||||
|
*
|
||||||
|
* @return plugin runtime info
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public PluginRuntime getPluginRuntime() {
|
||||||
|
return new PluginRuntime(getId())
|
||||||
|
.addInfo("overflowThreadNumber", getOverflowThreadNumber());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Check the maximum number of threads in the thread pool before the task is executed,
|
||||||
|
* if the maximum number of threads is exceeded,
|
||||||
|
* an {@link IllegalMaximumActiveCountException} will be thrown.
|
||||||
|
*
|
||||||
|
* @param thread thread of executing task
|
||||||
|
* @param runnable task
|
||||||
|
* @throws IllegalMaximumActiveCountException thread if the maximum number of threads is exceeded
|
||||||
|
* @see ThreadPoolExecutor#beforeExecute
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void beforeExecute(Thread thread, Runnable runnable) {
|
||||||
|
if (!hasOverflowThread()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
log.warn(
|
||||||
|
"The maximum number of threads in the thread pool '{}' has been exceeded, task '{}' will redelivery",
|
||||||
|
threadPoolPluginSupport.getThreadPoolId(), runnable);
|
||||||
|
submitTaskAfterCheckFail(runnable);
|
||||||
|
interruptAndThrowException(thread);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>check the maximum number of threads in the thread pool after the task is executed.
|
||||||
|
*
|
||||||
|
* @return true if the maximum number of threads in the thread pool has been exceeded
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("all")
|
||||||
|
private boolean hasOverflowThread() {
|
||||||
|
AtomicInteger current;
|
||||||
|
|
||||||
|
// has overflow thread?
|
||||||
|
while ((current = overflowThreadNumber.get()) != null) {
|
||||||
|
int curr = current.get();
|
||||||
|
|
||||||
|
// it's already no overflow thread
|
||||||
|
if (curr < 1) {
|
||||||
|
// check whether the current value is effective, if not, try again
|
||||||
|
if (overflowThreadNumber.compareAndSet(current, null)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// has overflow thread, try to cas to decrease the number of overflow threads
|
||||||
|
int expect = curr - 1;
|
||||||
|
if (!current.compareAndSet(curr, expect)) {
|
||||||
|
// cas fail, the current value is uneffective, try again
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
cas success, decrease the number of overflow threads,
|
||||||
|
then, use cas to check the current value is effective:
|
||||||
|
|
||||||
|
1.if already no overflow thread, set the overflowThreadNumber to null;
|
||||||
|
2.if still has other overflow thread, keep the overflowThreadNumber reference unchanged
|
||||||
|
*/
|
||||||
|
AtomicInteger newRef = (expect > 0) ? current : null;
|
||||||
|
if (overflowThreadNumber.compareAndSet(current, newRef)) {
|
||||||
|
|
||||||
|
/*
|
||||||
|
cas success, it's means the current value is effective.
|
||||||
|
|
||||||
|
if this thread is the last overflow thread,
|
||||||
|
and already set the overflowThreadNumber to null,
|
||||||
|
then we return true to throw an exception.
|
||||||
|
*/
|
||||||
|
if (expect == 0) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
if this thread is not the last overflow thread,
|
||||||
|
the value may still be decreased by other threads before this step.
|
||||||
|
|
||||||
|
1.if current value still greater than 0,
|
||||||
|
we must try again to check whether the current value is effective.
|
||||||
|
|
||||||
|
2.if current value is set to null,
|
||||||
|
it's means that after the current value is decremented to 0 by other threads, it is set to null.
|
||||||
|
|
||||||
|
in addition, here may be a ABA problem:
|
||||||
|
when cas success, if the current value is still greater than 0,
|
||||||
|
its means still has other overflow thread can set it to null after the value is cas to 0,
|
||||||
|
we can't determine whether the null value is set by same check,
|
||||||
|
so we may make an error and throw an exception in the next check after the current check.
|
||||||
|
it's low probability, and the error is acceptable.
|
||||||
|
*/
|
||||||
|
AtomicInteger nc = overflowThreadNumber.get();
|
||||||
|
if (nc == current || nc == null) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// the current value is uneffective, try again
|
||||||
|
}
|
||||||
|
|
||||||
|
// no overflow thread
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Submit task to thread pool after check fail.
|
||||||
|
*
|
||||||
|
* @param runnable task
|
||||||
|
*/
|
||||||
|
protected void submitTaskAfterCheckFail(Runnable runnable) {
|
||||||
|
if (runnable instanceof RunnableFuture) {
|
||||||
|
RunnableFuture<?> future = (RunnableFuture<?>) runnable;
|
||||||
|
if (future.isDone()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (future.isCancelled()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
threadPoolPluginSupport.getThreadPoolExecutor().execute(runnable);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void interruptAndThrowException(Thread thread) {
|
||||||
|
thread.interrupt();
|
||||||
|
throw new IllegalMaximumActiveCountException(
|
||||||
|
"The maximum number of threads in the thread pool '" + threadPoolPluginSupport.getThreadPoolId()
|
||||||
|
+ "' has been exceeded.");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A {@link RuntimeException} that indicates that the maximum number of threads in the thread pool has been exceeded.
|
||||||
|
*/
|
||||||
|
public static class IllegalMaximumActiveCountException extends RuntimeException {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a new runtime exception with the specified detail message.
|
||||||
|
* The cause is not initialized, and may subsequently be initialized by a
|
||||||
|
* call to {@link #initCause}.
|
||||||
|
*
|
||||||
|
* @param message the detail message. The detail message is saved for
|
||||||
|
* later retrieval by the {@link #getMessage()} method.
|
||||||
|
*/
|
||||||
|
public IllegalMaximumActiveCountException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,87 @@
|
|||||||
|
package cn.hippo4j.core.executor.plugin.manager;
|
||||||
|
|
||||||
|
import cn.hippo4j.common.toolkit.Assert;
|
||||||
|
import cn.hippo4j.core.executor.plugin.impl.MaximumActiveThreadCountChecker;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>A register for {@link MaximumActiveThreadCountChecker}.<br />
|
||||||
|
* After {@link #startCheckOverflowThreads} has been invoked,
|
||||||
|
* it will start a task for check whether the thread pool has overflow threads in the specified interval.
|
||||||
|
*
|
||||||
|
* @author huangchengxing
|
||||||
|
* @see MaximumActiveThreadCountChecker
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class MaximumActiveThreadCountCheckerRegistrar implements ThreadPoolPluginRegistrar {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* registered checkers
|
||||||
|
*/
|
||||||
|
private final List<MaximumActiveThreadCountChecker> checkers = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* scheduled executor service
|
||||||
|
*/
|
||||||
|
private final ScheduledExecutorService scheduledExecutorService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* task for check whether the thread pool has overflow threads.
|
||||||
|
*/
|
||||||
|
private ScheduledFuture<?> task;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Start task for check whether the thread pool has overflow threads.
|
||||||
|
* if task is already running, it will be canceled and restarted.
|
||||||
|
*/
|
||||||
|
public void startCheckOverflowThreads(long checkInterval, TimeUnit checkIntervalTimeUnit) {
|
||||||
|
Assert.isTrue(checkInterval > 0, "checkInterval must be greater than 0");
|
||||||
|
Assert.notNull(checkIntervalTimeUnit, "checkIntervalTimeUnit must not be null");
|
||||||
|
if (task != null) {
|
||||||
|
stopCheckOverflowThreads();
|
||||||
|
}
|
||||||
|
log.info("start check overflow threads task, checkInterval: {}, checkIntervalTimeUnit: {}", checkInterval, checkIntervalTimeUnit);
|
||||||
|
this.task = scheduledExecutorService.scheduleAtFixedRate(
|
||||||
|
() -> checkers.forEach(MaximumActiveThreadCountChecker::checkOverflowThreads),
|
||||||
|
checkInterval, checkInterval, checkIntervalTimeUnit
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cancel task for check whether the thread pool has overflow threads.
|
||||||
|
*
|
||||||
|
* @return true if the task was canceled before it completed normally
|
||||||
|
*/
|
||||||
|
public boolean stopCheckOverflowThreads() {
|
||||||
|
// is acceptable to cancel a task even if it is already running,
|
||||||
|
// because this behavior will not affect the normal operation of the thread pool
|
||||||
|
if (task != null && task.cancel(true)) {
|
||||||
|
log.info("cancel check overflow threads task");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create and register plugin for the specified thread-pool instance.
|
||||||
|
*
|
||||||
|
* @param support thread pool plugin manager delegate
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void doRegister(ThreadPoolPluginSupport support) {
|
||||||
|
MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(support);
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("register maximum active thread count checker for thread pool: {}", support.getThreadPoolId());
|
||||||
|
}
|
||||||
|
support.register(checker);
|
||||||
|
checkers.add(checker);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,118 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package cn.hippo4j.core.executor.plugin.impl;
|
||||||
|
|
||||||
|
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
|
||||||
|
import cn.hippo4j.core.executor.plugin.PluginRuntime;
|
||||||
|
import cn.hippo4j.core.executor.plugin.manager.DefaultThreadPoolPluginManager;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* test for {@link cn.hippo4j.core.executor.plugin.impl.MaximumActiveThreadCountChecker}
|
||||||
|
*
|
||||||
|
* @author huangchengxing
|
||||||
|
*/
|
||||||
|
public class MaximumActiveThreadCountCheckerTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetPluginRuntime() {
|
||||||
|
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
|
||||||
|
"test", new DefaultThreadPoolPluginManager(),
|
||||||
|
5, 5, 0L, TimeUnit.MILLISECONDS,
|
||||||
|
new ArrayBlockingQueue<>(10), t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy());
|
||||||
|
MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor);
|
||||||
|
Assert.assertEquals(checker.getClass().getSimpleName(), checker.getPluginRuntime().getPluginId());
|
||||||
|
// check plugin info
|
||||||
|
checker.checkOverflowThreads();
|
||||||
|
List<PluginRuntime.Info> infoList = checker.getPluginRuntime().getInfoList();
|
||||||
|
Assert.assertEquals(1, infoList.size());
|
||||||
|
Assert.assertEquals("overflowThreadNumber", infoList.get(0).getName());
|
||||||
|
Assert.assertEquals(0, infoList.get(0).getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWhenEnableSubmitTaskAfterCheckFail() {
|
||||||
|
int maximumThreadNum = 3;
|
||||||
|
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1);
|
||||||
|
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
|
||||||
|
"test", new DefaultThreadPoolPluginManager(),
|
||||||
|
maximumThreadNum, maximumThreadNum, 0L, TimeUnit.MILLISECONDS,
|
||||||
|
queue, t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy());
|
||||||
|
MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor);
|
||||||
|
executor.register(checker);
|
||||||
|
|
||||||
|
// create 2 workers and block them
|
||||||
|
CountDownLatch latch1 = submitTaskForBlockingThread(maximumThreadNum - 1, executor);
|
||||||
|
try {
|
||||||
|
// wait for the 2 workers to be executed task
|
||||||
|
Thread.sleep(500L);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* after 2 worker blocked,
|
||||||
|
* submit task and create the last worker,
|
||||||
|
* make overflow thread number to 1
|
||||||
|
*/
|
||||||
|
CountDownLatch latch2 = submitTaskForBlockingThread(1, executor);
|
||||||
|
executor.setCorePoolSize(maximumThreadNum - 1);
|
||||||
|
executor.setMaximumPoolSize(maximumThreadNum - 1);
|
||||||
|
checker.setOverflowThreadNumber(1);
|
||||||
|
|
||||||
|
// wait for plugin to re-deliver the task to the queue
|
||||||
|
try {
|
||||||
|
Thread.sleep(500L);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
Assert.assertEquals(1, queue.size());
|
||||||
|
// last worker destroyed due to the exception which thrown by plugin
|
||||||
|
Assert.assertEquals(2, executor.getActiveCount());
|
||||||
|
|
||||||
|
// free resources
|
||||||
|
latch1.countDown();
|
||||||
|
latch2.countDown();
|
||||||
|
executor.shutdown();
|
||||||
|
while (executor.isTerminated()) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private CountDownLatch submitTaskForBlockingThread(int num, ThreadPoolExecutor executor) {
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
for (int i = 0; i < num; i++) {
|
||||||
|
Runnable runnable = () -> {
|
||||||
|
System.out.println(Thread.currentThread().getName() + " start");
|
||||||
|
try {
|
||||||
|
latch.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
System.out.println(Thread.currentThread().getName() + " has been interrupted");
|
||||||
|
}
|
||||||
|
System.out.println(Thread.currentThread().getName() + " completed");
|
||||||
|
};
|
||||||
|
System.out.println("submit task@" + runnable.hashCode());
|
||||||
|
executor.execute(runnable);
|
||||||
|
}
|
||||||
|
return latch;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in new issue