pull/1592/merge
Haotian Ma 6 months ago committed by GitHub
commit cff7abd524
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -19,8 +19,10 @@ package cn.hippo4j.common.executor.support;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** /**
* Synchronous put queue policy. * Synchronous put queue policy.
@ -28,13 +30,39 @@ import java.util.concurrent.ThreadPoolExecutor;
@Slf4j @Slf4j
public class SyncPutQueuePolicy implements RejectedExecutionHandler { public class SyncPutQueuePolicy implements RejectedExecutionHandler {
// The timeout value for the offer method (ms).
private int timeout;
private final boolean enableTimeout;
public SyncPutQueuePolicy(int timeout){
if (timeout < 0){
throw new IllegalArgumentException("timeout must be greater than 0");
}
this.timeout = timeout;
this.enableTimeout = true;
}
public SyncPutQueuePolicy (){
this.enableTimeout = false;
}
@Override @Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.isShutdown()) { if (executor.isShutdown()) {
return; return;
} }
try { try {
if (enableTimeout) {
if (!executor.getQueue().offer(r, timeout, TimeUnit.MILLISECONDS)) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
executor.toString() + " with timeout " + timeout + "ms.");
}
}
else {
executor.getQueue().put(r); executor.getQueue().put(r);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("Adding Queue task to thread pool failed.", e); log.error("Adding Queue task to thread pool failed.", e);
} }

@ -22,20 +22,23 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import static org.junit.Assert.fail;
/** /**
* Synchronous placement queue policy implementation test * Synchronous placement queue policy implementation test
*/ */
public class SyncPutQueuePolicyTest { public class SyncPutQueuePolicyTest {
/** /**
* test thread pool rejected execution * test thread pool rejected execution without timeout
*/ */
@Test @Test
public void testRejectedExecution() { public void testRejectedExecutionWithoutTimeout() {
SyncPutQueuePolicy syncPutQueuePolicy = new SyncPutQueuePolicy(); SyncPutQueuePolicy syncPutQueuePolicy = new SyncPutQueuePolicy();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2,
60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), syncPutQueuePolicy); 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), syncPutQueuePolicy);
@ -50,4 +53,34 @@ public class SyncPutQueuePolicyTest {
} }
Assert.assertEquals(4, threadPoolExecutor.getCompletedTaskCount()); Assert.assertEquals(4, threadPoolExecutor.getCompletedTaskCount());
} }
/**
* test thread pool rejected execution with timeout
*/
@Test
public void testRejectedExecutionWithTimeout() {
SyncPutQueuePolicy syncPutQueuePolicy = new SyncPutQueuePolicy(300);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), syncPutQueuePolicy);
threadPoolExecutor.prestartAllCoreThreads();
Assert.assertSame(syncPutQueuePolicy, threadPoolExecutor.getRejectedExecutionHandler());
IntStream.range(0, 4).forEach(s -> {
threadPoolExecutor.execute(() -> ThreadUtil.sleep(200L));
});
IntStream.range(0, 2).forEach(s -> {
threadPoolExecutor.execute(() -> ThreadUtil.sleep(500L));
});
try {
threadPoolExecutor.execute(() -> ThreadUtil.sleep(100L));
ThreadUtil.sleep(1000L);
fail("should throw RejectedExecutionException");
} catch (Exception e) {
Assert.assertTrue(e instanceof RejectedExecutionException);
}
threadPoolExecutor.shutdown();
while (!threadPoolExecutor.isTerminated()) {
}
Assert.assertEquals(6, threadPoolExecutor.getCompletedTaskCount());
}
} }

Loading…
Cancel
Save