diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java index dd6ad982..0c3f2bf5 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java @@ -17,46 +17,62 @@ package cn.hippo4j.core.plugin.impl; +import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.core.plugin.PluginRuntime; import lombok.Getter; import lombok.RequiredArgsConstructor; +import java.lang.reflect.Array; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; /** - * Record task execution time indicator. + *

Record task execution time indicator.
+ * The initialization size of the timer container can be specified during construction, + * It will route it to different timers in the container according to the {@link Thread#getId}, + * to reduce the lock competition strength for a single timer. */ -@RequiredArgsConstructor public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin { + private static final int MAXIMUM_CAPACITY = 1 << 30; public static final String PLUGIN_NAME = "task-time-record-plugin"; /** - * Lock instance + * modulo */ - private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final int modulo; /** - * Total execution milli time of all tasks + * timers */ - private long totalTaskTimeMillis = 0L; + public final Timer[] timerTable; /** - * Maximum task milli execution time, default -1 - */ - private long maxTaskTimeMillis = -1L; - - /** - * Minimal task milli execution time, default -1 + * Create a {@link TaskTimeRecordPlugin} + * + * @param initialCapacity initial capacity of timer table */ - private long minTaskTimeMillis = -1L; + public TaskTimeRecordPlugin(int initialCapacity) { + Assert.isTrue(initialCapacity >= 1, "count must great then 0"); + initialCapacity = tableSizeFor(initialCapacity); + timerTable = (Timer[]) Array.newInstance(Timer.class, initialCapacity); + for (int i = 0; i < timerTable.length; i++) { + timerTable[i] = new Timer(); + } + modulo = initialCapacity - 1; + } /** - * Count of completed task + * Create a {@link TaskTimeRecordPlugin} */ - private long taskCount = 0L; + public TaskTimeRecordPlugin() { + this(1); + } /** * Get id. @@ -91,21 +107,8 @@ public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin { */ @Override protected void processTaskTime(long taskExecuteTime) { - Lock writeLock = lock.writeLock(); - writeLock.lock(); - try { - if (taskCount == 0) { - maxTaskTimeMillis = taskExecuteTime; - minTaskTimeMillis = taskExecuteTime; - } else { - maxTaskTimeMillis = Math.max(taskExecuteTime, maxTaskTimeMillis); - minTaskTimeMillis = Math.min(taskExecuteTime, minTaskTimeMillis); - } - taskCount = taskCount + 1; - totalTaskTimeMillis += taskExecuteTime; - } finally { - writeLock.unlock(); - } + Timer timer = getTimerForCurrentThread(); + timer.recordTaskTime(taskExecuteTime); } /** @@ -114,19 +117,129 @@ public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin { * @return data snapshot */ public Summary summarize() { - Lock readLock = lock.readLock(); - Summary statistics; - readLock.lock(); - try { - statistics = new Summary( - this.totalTaskTimeMillis, - this.maxTaskTimeMillis, - this.minTaskTimeMillis, - this.taskCount); - } finally { - readLock.unlock(); + // ignore unused timers + List

summaries = Arrays.stream(timerTable) + .map(Timer::summarize) + .filter(s -> s.getTaskCount() > 0) + .collect(Collectors.toList()); + + // summarize data + long totalTaskTimeMillis = 0L; + long maxTaskTimeMillis = -1L; + long minTaskTimeMillis = -1L; + long taskCount = 0L; + for (Summary summary : summaries) { + if (taskCount > 0) { + maxTaskTimeMillis = Math.max(maxTaskTimeMillis, summary.getMaxTaskTimeMillis()); + minTaskTimeMillis = Math.min(minTaskTimeMillis, summary.getMinTaskTimeMillis()); + } else { + maxTaskTimeMillis = summary.getMaxTaskTimeMillis(); + minTaskTimeMillis = summary.getMinTaskTimeMillis(); + } + totalTaskTimeMillis += summary.getTotalTaskTimeMillis(); + taskCount += summary.getTaskCount(); + } + return new Summary(totalTaskTimeMillis, maxTaskTimeMillis, minTaskTimeMillis, taskCount); + } + + private Timer getTimerForCurrentThread() { + /* + * use table tableSize - 1 to take modulus for tid, and the remainder obtained is the subscript of the timer corresponding to the thread in the table. eg: tid = 10086, tableSize = 8, then we + * get 10086 & (8 - 1) = 4 + */ + long threadId = Thread.currentThread().getId(); + int index = (int) (threadId & modulo); + return timerTable[index]; + } + + /** + * copy from {@link HashMap#tableSizeFor} + */ + static int tableSizeFor(int cap) { + int n = cap - 1; + n |= n >>> 1; + n |= n >>> 2; + n |= n >>> 4; + n |= n >>> 8; + n |= n >>> 16; + return n >= MAXIMUM_CAPACITY ? MAXIMUM_CAPACITY : n + 1; + } + + /** + *

Independent unit for providing time recording function.
+ * Support thread-safe operations when reading and writing in a concurrent environment. + */ + private static class Timer { + + /** + * Lock instance + */ + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** + * Total execution milli time of all tasks + */ + private long totalTaskTimeMillis = 0L; + + /** + * Maximum task milli execution time, default -1 + */ + private long maxTaskTimeMillis = -1L; + + /** + * Minimal task milli execution time, default -1 + */ + private long minTaskTimeMillis = -1L; + + /** + * Count of completed task + */ + private long taskCount = 0L; + + /** + * Record task execute time. + * + * @param taskExecuteTime task execute time + */ + public void recordTaskTime(long taskExecuteTime) { + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + if (taskCount > 0) { + maxTaskTimeMillis = Math.max(taskExecuteTime, maxTaskTimeMillis); + minTaskTimeMillis = Math.min(taskExecuteTime, minTaskTimeMillis); + } else { + maxTaskTimeMillis = taskExecuteTime; + minTaskTimeMillis = taskExecuteTime; + } + taskCount = taskCount + 1; + totalTaskTimeMillis += taskExecuteTime; + } finally { + writeLock.unlock(); + } } - return statistics; + + /** + * Get the summary statistics of the instance at the current time. + * + * @return data snapshot + */ + public Summary summarize() { + Lock readLock = lock.readLock(); + Summary statistics; + readLock.lock(); + try { + statistics = new Summary( + this.totalTaskTimeMillis, + this.maxTaskTimeMillis, + this.minTaskTimeMillis, + this.taskCount); + } finally { + readLock.unlock(); + } + return statistics; + } + } /** @@ -166,4 +279,5 @@ public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin { return totalTaskCount > 0L ? getTotalTaskTimeMillis() / totalTaskCount : -1; } } + } diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java index d7866a28..d4751554 100644 --- a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java @@ -20,6 +20,7 @@ package cn.hippo4j.core.plugin.impl; import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; +import lombok.extern.slf4j.Slf4j; import org.junit.Assert; import org.junit.Test; @@ -30,6 +31,7 @@ import java.util.concurrent.TimeUnit; /** * test for {@link TaskTimeRecordPlugin} */ +@Slf4j public class TaskTimeRecordPluginTest { @Test @@ -49,20 +51,29 @@ public class TaskTimeRecordPluginTest { 3, 3, 1000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); - TaskTimeRecordPlugin plugin = new TaskTimeRecordPlugin(); + TaskTimeRecordPlugin plugin = new TaskTimeRecordPlugin(3); executor.register(plugin); executor.submit(() -> ThreadUtil.sleep(1000L)); executor.submit(() -> ThreadUtil.sleep(3000L)); executor.submit(() -> ThreadUtil.sleep(2000L)); + executor.submit(() -> ThreadUtil.sleep(2000L)); // waiting for shutdown executor.shutdown(); while (!executor.isTerminated()) { } TaskTimeRecordPlugin.Summary summary = plugin.summarize(); - Assert.assertEquals(1, summary.getMinTaskTimeMillis() / 1000L); - Assert.assertEquals(3, summary.getMaxTaskTimeMillis() / 1000L); - Assert.assertEquals(2, summary.getAvgTaskTimeMillis() / 1000L); - Assert.assertEquals(6, summary.getTotalTaskTimeMillis() / 1000L); + Assert.assertTrue(testInDeviation(summary.getMinTaskTimeMillis(), 1000L, 300L)); + Assert.assertTrue(testInDeviation(summary.getMaxTaskTimeMillis(), 3000L, 300L)); + Assert.assertTrue(testInDeviation(summary.getAvgTaskTimeMillis(), 2000L, 300L)); + Assert.assertTrue(testInDeviation(summary.getTotalTaskTimeMillis(), 8000L, 300L)); + } + + private boolean testInDeviation(long except, long actual, long offer) { + long exceptLower = except - offer; + long exceptUpper = except + offer; + log.info("test {} < [{}] < {}", exceptLower, actual, exceptUpper); + return exceptLower < actual && actual < exceptUpper; } + }