From b5054b8530a8aeabf8c9f3ed892be1902796a82c Mon Sep 17 00:00:00 2001 From: huangchengxing <841396397@qq.com> Date: Mon, 7 Nov 2022 12:57:22 +0800 Subject: [PATCH] refactor: Optimize the lock logic of TaskTimeRecordPlugin --- .../plugin/impl/TaskTimeRecordPlugin.java | 200 ++++++++++++++---- .../plugin/impl/TaskTimeRecordPluginTest.java | 2 +- 2 files changed, 159 insertions(+), 43 deletions(-) diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java index dd6ad982..5114be6e 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java @@ -17,46 +17,61 @@ package cn.hippo4j.core.plugin.impl; +import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.core.plugin.PluginRuntime; import lombok.Getter; import lombok.RequiredArgsConstructor; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; /** - * Record task execution time indicator. + *

Record task execution time indicator.
+ * The initialization size of the timer container can be specified during construction, + * It will route it to different timers in the container according to the {@link Thread#getId}, + * to reduce the lock competition strength for a single timer. */ -@RequiredArgsConstructor public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin { + static final int MAXIMUM_CAPACITY = 1 << 30; public static final String PLUGIN_NAME = "task-time-record-plugin"; /** - * Lock instance + * modulo */ - private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final int modulo; /** - * Total execution milli time of all tasks + * timers */ - private long totalTaskTimeMillis = 0L; + public final List timerTable; /** - * Maximum task milli execution time, default -1 - */ - private long maxTaskTimeMillis = -1L; - - /** - * Minimal task milli execution time, default -1 + * Create a {@link TaskTimeRecordPlugin} + * + * @param initialCapacity initial capacity of timer table */ - private long minTaskTimeMillis = -1L; + public TaskTimeRecordPlugin(int initialCapacity) { + Assert.isTrue(initialCapacity >= 1, "count must great then 0"); + initialCapacity = tableSizeFor(initialCapacity); + timerTable = new ArrayList<>(initialCapacity); + for (int i = 0; i < initialCapacity; i++) { + timerTable.add(new Timer()); + } + modulo = initialCapacity - 1; + } /** - * Count of completed task + * Create a {@link TaskTimeRecordPlugin} */ - private long taskCount = 0L; + public TaskTimeRecordPlugin() { + this(1); + } /** * Get id. @@ -91,21 +106,8 @@ public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin { */ @Override protected void processTaskTime(long taskExecuteTime) { - Lock writeLock = lock.writeLock(); - writeLock.lock(); - try { - if (taskCount == 0) { - maxTaskTimeMillis = taskExecuteTime; - minTaskTimeMillis = taskExecuteTime; - } else { - maxTaskTimeMillis = Math.max(taskExecuteTime, maxTaskTimeMillis); - minTaskTimeMillis = Math.min(taskExecuteTime, minTaskTimeMillis); - } - taskCount = taskCount + 1; - totalTaskTimeMillis += taskExecuteTime; - } finally { - writeLock.unlock(); - } + Timer timer = getTimerForCurrentThread(); + timer.recordTaskTime(taskExecuteTime); } /** @@ -114,19 +116,132 @@ public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin { * @return data snapshot */ public Summary summarize() { - Lock readLock = lock.readLock(); - Summary statistics; - readLock.lock(); - try { - statistics = new Summary( - this.totalTaskTimeMillis, - this.maxTaskTimeMillis, - this.minTaskTimeMillis, - this.taskCount); - } finally { - readLock.unlock(); + // ignore unused timers + List

summaries = timerTable.stream() + .map(Timer::summarize) + .filter(s -> s.getTaskCount() > 0) + .collect(Collectors.toList()); + + // summarize data + long totalTaskTimeMillis = 0L; + long maxTaskTimeMillis = -1L; + long minTaskTimeMillis = -1L; + long taskCount = 0L; + for (Summary summary : summaries) { + if (taskCount > 0) { + maxTaskTimeMillis = Math.max(maxTaskTimeMillis, summary.getMaxTaskTimeMillis()); + minTaskTimeMillis = Math.min(minTaskTimeMillis, summary.getMinTaskTimeMillis()); + } else { + maxTaskTimeMillis = summary.getMaxTaskTimeMillis(); + minTaskTimeMillis = summary.getMinTaskTimeMillis(); + } + totalTaskTimeMillis += summary.getTotalTaskTimeMillis(); + taskCount += summary.getTaskCount(); + } + return new Summary(totalTaskTimeMillis, maxTaskTimeMillis, minTaskTimeMillis, taskCount); + } + + private Timer getTimerForCurrentThread() { + /* + * use table tableSize - 1 to take modulus for tid, and the remainder obtained is the subscript of the timer corresponding to the thread in the table. eg: tid = 10086, tableSize = 8, 10086 & + * (8 - 1) = 4 + */ + long threadId = Thread.currentThread().getId(); + int index = (int) (threadId & modulo); + return timerTable.get(index); + } + + /** + * copy from {@link HashMap#tableSizeFor} + */ + static int tableSizeFor(int cap) { + int n = cap - 1; + n |= n >>> 1; + n |= n >>> 2; + n |= n >>> 4; + n |= n >>> 8; + n |= n >>> 16; + if (n < 0) { + return 1; } - return statistics; + return n >= MAXIMUM_CAPACITY ? MAXIMUM_CAPACITY : n + 1; + } + + /** + *

Independent unit for providing time recording function.
+ * Support thread-safe operations when reading and writing in a concurrent environment. + */ + private static class Timer { + + /** + * Lock instance + */ + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** + * Total execution milli time of all tasks + */ + private long totalTaskTimeMillis = 0L; + + /** + * Maximum task milli execution time, default -1 + */ + private long maxTaskTimeMillis = -1L; + + /** + * Minimal task milli execution time, default -1 + */ + private long minTaskTimeMillis = -1L; + + /** + * Count of completed task + */ + private long taskCount = 0L; + + /** + * Record task execute time. + * + * @param taskExecuteTime task execute time + */ + public void recordTaskTime(long taskExecuteTime) { + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + if (taskCount == 0) { + maxTaskTimeMillis = taskExecuteTime; + minTaskTimeMillis = taskExecuteTime; + } else { + maxTaskTimeMillis = Math.max(taskExecuteTime, maxTaskTimeMillis); + minTaskTimeMillis = Math.min(taskExecuteTime, minTaskTimeMillis); + } + taskCount = taskCount + 1; + totalTaskTimeMillis += taskExecuteTime; + } finally { + writeLock.unlock(); + } + } + + /** + * Get the summary statistics of the instance at the current time. + * + * @return data snapshot + */ + public Summary summarize() { + Lock readLock = lock.readLock(); + Summary statistics; + readLock.lock(); + try { + statistics = new Summary( + this.totalTaskTimeMillis, + this.maxTaskTimeMillis, + this.minTaskTimeMillis, + this.taskCount); + } finally { + readLock.unlock(); + } + return statistics; + } + } /** @@ -166,4 +281,5 @@ public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin { return totalTaskCount > 0L ? getTotalTaskTimeMillis() / totalTaskCount : -1; } } + } diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java index d7866a28..ae3194e7 100644 --- a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java @@ -49,7 +49,7 @@ public class TaskTimeRecordPluginTest { 3, 3, 1000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); - TaskTimeRecordPlugin plugin = new TaskTimeRecordPlugin(); + TaskTimeRecordPlugin plugin = new TaskTimeRecordPlugin(3); executor.register(plugin); executor.submit(() -> ThreadUtil.sleep(1000L)); executor.submit(() -> ThreadUtil.sleep(3000L));