Optimize the lock logic of TaskTimeRecordPlugin (#922)

* refactor: Optimize the lock logic of TaskTimeRecordPlugin

* Simplify data structure and remove useless verification

* test: Adjust test cases
pull/938/head
黄成兴 3 years ago committed by GitHub
parent 1c80bc0d4c
commit 6ecbf41779
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -17,46 +17,62 @@
package cn.hippo4j.core.plugin.impl; package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.core.plugin.PluginRuntime; import cn.hippo4j.core.plugin.PluginRuntime;
import lombok.Getter; import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/** /**
* Record task execution time indicator. * <p>Record task execution time indicator. <br />
* The initialization size of the timer container can be specified during construction,
* It will route it to different timers in the container according to the {@link Thread#getId},
* to reduce the lock competition strength for a single timer.
*/ */
@RequiredArgsConstructor
public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin { public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin {
private static final int MAXIMUM_CAPACITY = 1 << 30;
public static final String PLUGIN_NAME = "task-time-record-plugin"; public static final String PLUGIN_NAME = "task-time-record-plugin";
/** /**
* Lock instance * modulo
*/ */
private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final int modulo;
/** /**
* Total execution milli time of all tasks * timers
*/ */
private long totalTaskTimeMillis = 0L; public final Timer[] timerTable;
/** /**
* Maximum task milli execution time, default -1 * Create a {@link TaskTimeRecordPlugin}
*/ *
private long maxTaskTimeMillis = -1L; * @param initialCapacity initial capacity of timer table
/**
* Minimal task milli execution time, default -1
*/ */
private long minTaskTimeMillis = -1L; public TaskTimeRecordPlugin(int initialCapacity) {
Assert.isTrue(initialCapacity >= 1, "count must great then 0");
initialCapacity = tableSizeFor(initialCapacity);
timerTable = (Timer[]) Array.newInstance(Timer.class, initialCapacity);
for (int i = 0; i < timerTable.length; i++) {
timerTable[i] = new Timer();
}
modulo = initialCapacity - 1;
}
/** /**
* Count of completed task * Create a {@link TaskTimeRecordPlugin}
*/ */
private long taskCount = 0L; public TaskTimeRecordPlugin() {
this(1);
}
/** /**
* Get id. * Get id.
@ -91,21 +107,8 @@ public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin {
*/ */
@Override @Override
protected void processTaskTime(long taskExecuteTime) { protected void processTaskTime(long taskExecuteTime) {
Lock writeLock = lock.writeLock(); Timer timer = getTimerForCurrentThread();
writeLock.lock(); timer.recordTaskTime(taskExecuteTime);
try {
if (taskCount == 0) {
maxTaskTimeMillis = taskExecuteTime;
minTaskTimeMillis = taskExecuteTime;
} else {
maxTaskTimeMillis = Math.max(taskExecuteTime, maxTaskTimeMillis);
minTaskTimeMillis = Math.min(taskExecuteTime, minTaskTimeMillis);
}
taskCount = taskCount + 1;
totalTaskTimeMillis += taskExecuteTime;
} finally {
writeLock.unlock();
}
} }
/** /**
@ -114,19 +117,129 @@ public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin {
* @return data snapshot * @return data snapshot
*/ */
public Summary summarize() { public Summary summarize() {
Lock readLock = lock.readLock(); // ignore unused timers
Summary statistics; List<Summary> summaries = Arrays.stream(timerTable)
readLock.lock(); .map(Timer::summarize)
try { .filter(s -> s.getTaskCount() > 0)
statistics = new Summary( .collect(Collectors.toList());
this.totalTaskTimeMillis,
this.maxTaskTimeMillis, // summarize data
this.minTaskTimeMillis, long totalTaskTimeMillis = 0L;
this.taskCount); long maxTaskTimeMillis = -1L;
} finally { long minTaskTimeMillis = -1L;
readLock.unlock(); long taskCount = 0L;
for (Summary summary : summaries) {
if (taskCount > 0) {
maxTaskTimeMillis = Math.max(maxTaskTimeMillis, summary.getMaxTaskTimeMillis());
minTaskTimeMillis = Math.min(minTaskTimeMillis, summary.getMinTaskTimeMillis());
} else {
maxTaskTimeMillis = summary.getMaxTaskTimeMillis();
minTaskTimeMillis = summary.getMinTaskTimeMillis();
}
totalTaskTimeMillis += summary.getTotalTaskTimeMillis();
taskCount += summary.getTaskCount();
}
return new Summary(totalTaskTimeMillis, maxTaskTimeMillis, minTaskTimeMillis, taskCount);
}
private Timer getTimerForCurrentThread() {
/*
* use table tableSize - 1 to take modulus for tid, and the remainder obtained is the subscript of the timer corresponding to the thread in the table. eg: tid = 10086, tableSize = 8, then we
* get 10086 & (8 - 1) = 4
*/
long threadId = Thread.currentThread().getId();
int index = (int) (threadId & modulo);
return timerTable[index];
}
/**
* copy from {@link HashMap#tableSizeFor}
*/
static int tableSizeFor(int cap) {
int n = cap - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return n >= MAXIMUM_CAPACITY ? MAXIMUM_CAPACITY : n + 1;
}
/**
* <p>Independent unit for providing time recording function.<br />
* Support thread-safe operations when reading and writing in a concurrent environment.
*/
private static class Timer {
/**
* Lock instance
*/
private final ReadWriteLock lock = new ReentrantReadWriteLock();
/**
* Total execution milli time of all tasks
*/
private long totalTaskTimeMillis = 0L;
/**
* Maximum task milli execution time, default -1
*/
private long maxTaskTimeMillis = -1L;
/**
* Minimal task milli execution time, default -1
*/
private long minTaskTimeMillis = -1L;
/**
* Count of completed task
*/
private long taskCount = 0L;
/**
* Record task execute time.
*
* @param taskExecuteTime task execute time
*/
public void recordTaskTime(long taskExecuteTime) {
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
if (taskCount > 0) {
maxTaskTimeMillis = Math.max(taskExecuteTime, maxTaskTimeMillis);
minTaskTimeMillis = Math.min(taskExecuteTime, minTaskTimeMillis);
} else {
maxTaskTimeMillis = taskExecuteTime;
minTaskTimeMillis = taskExecuteTime;
}
taskCount = taskCount + 1;
totalTaskTimeMillis += taskExecuteTime;
} finally {
writeLock.unlock();
}
} }
return statistics;
/**
* Get the summary statistics of the instance at the current time.
*
* @return data snapshot
*/
public Summary summarize() {
Lock readLock = lock.readLock();
Summary statistics;
readLock.lock();
try {
statistics = new Summary(
this.totalTaskTimeMillis,
this.maxTaskTimeMillis,
this.minTaskTimeMillis,
this.taskCount);
} finally {
readLock.unlock();
}
return statistics;
}
} }
/** /**
@ -166,4 +279,5 @@ public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin {
return totalTaskCount > 0L ? getTotalTaskTimeMillis() / totalTaskCount : -1; return totalTaskCount > 0L ? getTotalTaskTimeMillis() / totalTaskCount : -1;
} }
} }
} }

@ -20,6 +20,7 @@ package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -30,6 +31,7 @@ import java.util.concurrent.TimeUnit;
/** /**
* test for {@link TaskTimeRecordPlugin} * test for {@link TaskTimeRecordPlugin}
*/ */
@Slf4j
public class TaskTimeRecordPluginTest { public class TaskTimeRecordPluginTest {
@Test @Test
@ -49,20 +51,29 @@ public class TaskTimeRecordPluginTest {
3, 3, 1000L, TimeUnit.MILLISECONDS, 3, 3, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
TaskTimeRecordPlugin plugin = new TaskTimeRecordPlugin(); TaskTimeRecordPlugin plugin = new TaskTimeRecordPlugin(3);
executor.register(plugin); executor.register(plugin);
executor.submit(() -> ThreadUtil.sleep(1000L)); executor.submit(() -> ThreadUtil.sleep(1000L));
executor.submit(() -> ThreadUtil.sleep(3000L)); executor.submit(() -> ThreadUtil.sleep(3000L));
executor.submit(() -> ThreadUtil.sleep(2000L)); executor.submit(() -> ThreadUtil.sleep(2000L));
executor.submit(() -> ThreadUtil.sleep(2000L));
// waiting for shutdown // waiting for shutdown
executor.shutdown(); executor.shutdown();
while (!executor.isTerminated()) { while (!executor.isTerminated()) {
} }
TaskTimeRecordPlugin.Summary summary = plugin.summarize(); TaskTimeRecordPlugin.Summary summary = plugin.summarize();
Assert.assertEquals(1, summary.getMinTaskTimeMillis() / 1000L); Assert.assertTrue(testInDeviation(summary.getMinTaskTimeMillis(), 1000L, 300L));
Assert.assertEquals(3, summary.getMaxTaskTimeMillis() / 1000L); Assert.assertTrue(testInDeviation(summary.getMaxTaskTimeMillis(), 3000L, 300L));
Assert.assertEquals(2, summary.getAvgTaskTimeMillis() / 1000L); Assert.assertTrue(testInDeviation(summary.getAvgTaskTimeMillis(), 2000L, 300L));
Assert.assertEquals(6, summary.getTotalTaskTimeMillis() / 1000L); Assert.assertTrue(testInDeviation(summary.getTotalTaskTimeMillis(), 8000L, 300L));
}
private boolean testInDeviation(long except, long actual, long offer) {
long exceptLower = except - offer;
long exceptUpper = except + offer;
log.info("test {} < [{}] < {}", exceptLower, actual, exceptUpper);
return exceptLower < actual && actual < exceptUpper;
} }
} }

Loading…
Cancel
Save