refactor: Extract the logic about calculating task execution time to superclass (#879) (#882)

pull/886/head
黄成兴 2 years ago committed by GitHub
parent 175bfb9c54
commit b43c90bd5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.toolkit.SystemClock;
import java.util.Optional;
/**
* <p>An abstract task execution time recording plugin
* for thread-safe statistics the execution time of tasks.
*
* <p>Must override {@link #processTaskTime} to define the processing logic for task execution time. <br />
* Default time precision is milliseconds, may override {@link #currentTime} to redefine the time precision.
*
* @see TaskTimeRecordPlugin
* @see TaskTimeoutNotifyAlarmPlugin
*/
public abstract class AbstractTaskTimerPlugin implements ExecuteAwarePlugin {
/**
* start times of executed tasks
*/
private final ThreadLocal<Long> startTimes = new ThreadLocal<>();
/**
* Record the time when the worker thread starts executing the task.
*
* @param thread thread of executing task
* @param runnable task
* @see ExtensibleThreadPoolExecutor#beforeExecute
*/
@Override
public final void beforeExecute(Thread thread, Runnable runnable) {
startTimes.set(currentTime());
}
/**
* Record the total time for the worker thread to complete the task, and update the time record.
*
* @param runnable runnable
* @param throwable exception thrown during execution
*/
@Override
public final void afterExecute(Runnable runnable, Throwable throwable) {
try {
Optional.ofNullable(startTimes.get())
.map(startTime -> currentTime() - startTime)
.ifPresent(this::processTaskTime);
} finally {
startTimes.remove();
}
}
/**
* Get the current time.
*
* @return current time
*/
protected long currentTime() {
return SystemClock.now();
}
/**
* Processing the execution time of the task.
*
* @param taskExecuteTime execute time of task
*/
protected abstract void processTaskTime(long taskExecuteTime);
}

@ -17,25 +17,19 @@
package cn.hippo4j.core.plugin.impl; package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.plugin.PluginRuntime; import cn.hippo4j.core.plugin.PluginRuntime;
import cn.hippo4j.core.toolkit.SystemClock;
import lombok.Getter; import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import java.util.Objects;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
* Record task execution time indicator. * Record task execution time indicator.
*
* @see TaskTimeoutNotifyAlarmPlugin
*/ */
@RequiredArgsConstructor @RequiredArgsConstructor
public class TaskTimeRecordPlugin implements ExecuteAwarePlugin { public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin {
public static final String PLUGIN_NAME = "task-time-record-plugin"; public static final String PLUGIN_NAME = "task-time-record-plugin";
@ -74,23 +68,6 @@ public class TaskTimeRecordPlugin implements ExecuteAwarePlugin {
return PLUGIN_NAME; return PLUGIN_NAME;
} }
/**
* start times of executed tasks
*/
private final ThreadLocal<Long> startTimes = new ThreadLocal<>();
/**
* Record the time when the worker thread starts executing the task.
*
* @param thread thread of executing task
* @param runnable task
* @see ExtensibleThreadPoolExecutor#beforeExecute
*/
@Override
public void beforeExecute(Thread thread, Runnable runnable) {
startTimes.set(SystemClock.now());
}
/** /**
* Get plugin runtime info. * Get plugin runtime info.
* *
@ -107,44 +84,25 @@ public class TaskTimeRecordPlugin implements ExecuteAwarePlugin {
.addInfo("avgTaskTime", summary.getAvgTaskTimeMillis() + "ms"); .addInfo("avgTaskTime", summary.getAvgTaskTimeMillis() + "ms");
} }
/**
* Record the total time for the worker thread to complete the task, and update the time record.
*
* @param runnable runnable
* @param throwable exception thrown during execution
*/
@Override
public void afterExecute(Runnable runnable, Throwable throwable) {
try {
Long startTime = startTimes.get();
if (Objects.isNull(startTime)) {
return;
}
long executeTime = SystemClock.now() - startTime;
recordTaskTime(executeTime);
} finally {
startTimes.remove();
}
}
/** /**
* Refresh time indicators of the current instance. * Refresh time indicators of the current instance.
* *
* @param taskExecutionTime millisecond * @param taskExecuteTime execute time of task
*/ */
protected void recordTaskTime(long taskExecutionTime) { @Override
protected void processTaskTime(long taskExecuteTime) {
Lock writeLock = lock.writeLock(); Lock writeLock = lock.writeLock();
writeLock.lock(); writeLock.lock();
try { try {
if (taskCount == 0) { if (taskCount == 0) {
maxTaskTimeMillis = taskExecutionTime; maxTaskTimeMillis = taskExecuteTime;
minTaskTimeMillis = taskExecutionTime; minTaskTimeMillis = taskExecuteTime;
} else { } else {
maxTaskTimeMillis = Math.max(taskExecutionTime, maxTaskTimeMillis); maxTaskTimeMillis = Math.max(taskExecuteTime, maxTaskTimeMillis);
minTaskTimeMillis = Math.min(taskExecutionTime, minTaskTimeMillis); minTaskTimeMillis = Math.min(taskExecuteTime, minTaskTimeMillis);
} }
taskCount = taskCount + 1; taskCount = taskCount + 1;
totalTaskTimeMillis += taskExecutionTime; totalTaskTimeMillis += taskExecuteTime;
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }

@ -27,11 +27,10 @@ import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
/** /**
* Record task execution time indicator, * Send alarm notification when the execution time exceeds the threshold.
* and send alarm notification when the execution time exceeds the threshold.
*/ */
@AllArgsConstructor @AllArgsConstructor
public class TaskTimeoutNotifyAlarmPlugin extends TaskTimeRecordPlugin { public class TaskTimeoutNotifyAlarmPlugin extends AbstractTaskTimerPlugin {
public static final String PLUGIN_NAME = "task-timeout-notify-alarm-plugin"; public static final String PLUGIN_NAME = "task-timeout-notify-alarm-plugin";
@ -40,6 +39,15 @@ public class TaskTimeoutNotifyAlarmPlugin extends TaskTimeRecordPlugin {
*/ */
private final String threadPoolId; private final String threadPoolId;
@Getter
@Setter
private Long executeTimeOut;
/**
* thread-pool
*/
private final ThreadPoolExecutor threadPoolExecutor;
/** /**
* Get id. * Get id.
* *
@ -50,30 +58,21 @@ public class TaskTimeoutNotifyAlarmPlugin extends TaskTimeRecordPlugin {
return PLUGIN_NAME; return PLUGIN_NAME;
} }
@Getter
@Setter
private Long executeTimeOut;
/**
* thread-pool
*/
private final ThreadPoolExecutor threadPoolExecutor;
/** /**
* Check whether the task execution time exceeds {@link #executeTimeOut}, * Check whether the task execution time exceeds {@link #executeTimeOut},
* if it exceeds this time, send an alarm notification. * if it exceeds this time, send an alarm notification.
* *
* @param executeTime executeTime in nanosecond * @param taskExecuteTime execute time of task
*/ */
@Override @Override
protected void recordTaskTime(long executeTime) { protected void processTaskTime(long taskExecuteTime) {
super.recordTaskTime(executeTime); if (taskExecuteTime <= executeTimeOut) {
if (executeTime <= executeTimeOut) {
return; return;
} }
Optional.ofNullable(ApplicationContextHolder.getInstance()) Optional.ofNullable(ApplicationContextHolder.getInstance())
.map(context -> context.getBean(ThreadPoolNotifyAlarmHandler.class)) .map(context -> context.getBean(ThreadPoolNotifyAlarmHandler.class))
.ifPresent(handler -> handler.asyncSendExecuteTimeOutAlarm( .ifPresent(handler -> handler.asyncSendExecuteTimeOutAlarm(
threadPoolId, executeTime, executeTimeOut, threadPoolExecutor)); threadPoolId, taskExecuteTime, executeTimeOut, threadPoolExecutor));
} }
} }

Loading…
Cancel
Save