feat: Add plugins to support the default extensions of DynamicThreadPoolExecutor

pull/854/head
huangchengxing 3 years ago
parent 54b0adb695
commit 9396b28535

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.PluginRuntime;
import cn.hippo4j.core.plugin.TaskAwarePlugin;
import lombok.Getter;
import lombok.NonNull;
import org.springframework.core.task.TaskDecorator;
import java.util.ArrayList;
import java.util.List;
/**
* Decorate tasks when they are submitted to thread-pool.
*/
public class TaskDecoratorPlugin implements TaskAwarePlugin {
public static final String PLUGIN_NAME = "task-decorator-plugin";
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return PLUGIN_NAME;
}
/**
* decorators
*/
@Getter
private final List<TaskDecorator> decorators = new ArrayList<>();
/**
* Callback when task is executed.
*
* @param runnable runnable
* @return tasks to be execute
* @see ExtensibleThreadPoolExecutor#execute
*/
@Override
public Runnable beforeTaskExecute(Runnable runnable) {
for (TaskDecorator decorator : decorators) {
runnable = decorator.decorate(runnable);
}
return runnable;
}
/**
* Get plugin runtime info.
*
* @return plugin runtime info
*/
@Override
public PluginRuntime getPluginRuntime() {
return new PluginRuntime(getId())
.addInfo("decorators", decorators);
}
/**
* Add a decorator
*
* @param decorator decorator
*/
public void addDecorator(@NonNull TaskDecorator decorator) {
decorators.remove(decorator);
decorators.add(decorator);
}
/**
* Clear all decorators
*
*/
public void clearDecorators() {
decorators.clear();
}
/**
* Remove decorators
*
*/
public void removeDecorator(TaskDecorator decorator) {
decorators.remove(decorator);
}
}

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.core.plugin.PluginRuntime;
import cn.hippo4j.core.plugin.RejectedAwarePlugin;
import lombok.Getter;
import lombok.Setter;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
/**
* Record the number of tasks rejected by the thread pool.
*/
public class TaskRejectCountRecordPlugin implements RejectedAwarePlugin {
public static final String PLUGIN_NAME = "task-reject-count-record-plugin";
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return PLUGIN_NAME;
}
/**
* rejection count
*/
@Setter
@Getter
private AtomicLong rejectCount = new AtomicLong(0);
/**
* Get plugin runtime info.
*
* @return plugin runtime info
*/
@Override
public PluginRuntime getPluginRuntime() {
return new PluginRuntime(getId())
.addInfo("rejectCount", getRejectCountNum());
}
/**
* Record rejection count.
*
* @param r task
* @param executor executor
*/
@Override
public void beforeRejectedExecution(Runnable r, ThreadPoolExecutor executor) {
rejectCount.incrementAndGet();
}
/**
* Get reject count num
*
* @return reject count num
*/
public Long getRejectCountNum() {
return rejectCount.get();
}
}

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.plugin.RejectedAwarePlugin;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Send alert notification when a task is rejected.
*/
public class TaskRejectNotifyAlarmPlugin implements RejectedAwarePlugin {
public static final String PLUGIN_NAME = "task-reject-notify-alarm-plugin";
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return PLUGIN_NAME;
}
/**
* Callback before task is rejected.
*
* @param runnable task
* @param executor executor
*/
@Override
public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
if (!(executor instanceof ExtensibleThreadPoolExecutor)) {
return;
}
String threadPoolId = ((ExtensibleThreadPoolExecutor) executor).getThreadPoolId();
Optional.ofNullable(ApplicationContextHolder.getInstance())
.map(context -> context.getBean(ThreadPoolNotifyAlarmHandler.class))
.ifPresent(handler -> handler.asyncSendRejectedAlarm(threadPoolId));
}
}

@ -0,0 +1,213 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.plugin.PluginRuntime;
import cn.hippo4j.core.toolkit.SystemClock;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Record task execution time indicator.
*
* @see TaskTimeoutNotifyAlarmPlugin
*/
@RequiredArgsConstructor
public class TaskTimeRecordPlugin implements ExecuteAwarePlugin {
public static final String PLUGIN_NAME = "task-time-record-plugin";
/**
* Lock instance.
*/
private final ReadWriteLock lock = new ReentrantReadWriteLock();
/**
* Total execution milli time of all tasks.
*/
private long totalTaskTimeMillis = 0L;
/**
* Maximum task milli execution time, default -1.
*/
private long maxTaskTimeMillis = -1L;
/**
* Minimal task milli execution time, default -1.
*/
private long minTaskTimeMillis = -1L;
/**
* Count of completed task.
*/
private long taskCount = 0L;
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return PLUGIN_NAME;
}
/**
* start times of executed tasks
*/
private final ThreadLocal<Long> startTimes = new ThreadLocal<>();
/**
* Record the time when the worker thread starts executing the task.
*
* @param thread thread of executing task
* @param runnable task
* @see ExtensibleThreadPoolExecutor#beforeExecute
*/
@Override
public void beforeExecute(Thread thread, Runnable runnable) {
startTimes.set(SystemClock.now());
}
/**
* Get plugin runtime info.
*
* @return plugin runtime info
*/
@Override
public PluginRuntime getPluginRuntime() {
Summary summary = summarize();
return new PluginRuntime(getId())
.addInfo("taskCount", summary.getTaskCount())
.addInfo("minTaskTime", summary.getMinTaskTimeMillis() + "ms")
.addInfo("maxTaskTime", summary.getMaxTaskTimeMillis() + "ms")
.addInfo("totalTaskTime", summary.getTotalTaskTimeMillis() + "ms")
.addInfo("avgTaskTime", summary.getAvgTaskTimeMillis() + "ms");
}
/**
* Record the total time for the worker thread to complete the task, and update the time record.
*
* @param runnable runnable
* @param throwable exception thrown during execution
*/
@Override
public void afterExecute(Runnable runnable, Throwable throwable) {
try {
Long startTime = startTimes.get();
if (Objects.isNull(startTime)) {
return;
}
long executeTime = SystemClock.now() - startTime;
recordTaskTime(executeTime);
} finally {
startTimes.remove();
}
}
/**
* Refresh time indicators of the current instance.
*
* @param taskExecutionTime millisecond
*/
protected void recordTaskTime(long taskExecutionTime) {
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
if (taskCount == 0) {
maxTaskTimeMillis = taskExecutionTime;
minTaskTimeMillis = taskExecutionTime;
} else {
maxTaskTimeMillis = Math.max(taskExecutionTime, maxTaskTimeMillis);
minTaskTimeMillis = Math.min(taskExecutionTime, minTaskTimeMillis);
}
taskCount = taskCount + 1;
totalTaskTimeMillis += taskExecutionTime;
} finally {
writeLock.unlock();
}
}
/**
* Get the summary statistics of the instance at the current time.
*
* @return data snapshot
*/
public Summary summarize() {
Lock readLock = lock.readLock();
Summary statistics;
readLock.lock();
try {
statistics = new Summary(
this.totalTaskTimeMillis,
this.maxTaskTimeMillis,
this.minTaskTimeMillis,
this.taskCount);
} finally {
readLock.unlock();
}
return statistics;
}
/**
* Summary statistics of SyncTimeRecorder instance at a certain time.
*/
@Getter
@RequiredArgsConstructor
public static class Summary {
/**
* Total execution nano time of all tasks.
*/
private final long totalTaskTimeMillis;
/**
* Maximum task nano execution time.
*/
private final long maxTaskTimeMillis;
/**
* Minimal task nano execution time.
*/
private final long minTaskTimeMillis;
/**
* Count of completed task.
*/
private final long taskCount;
/**
* Get the avg task time in milliseconds.
*
* @return avg task time
*/
public long getAvgTaskTimeMillis() {
long totalTaskCount = getTaskCount();
return totalTaskCount > 0L ? getTotalTaskTimeMillis() / totalTaskCount : -1;
}
}
}

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Record task execution time indicator,
* and send alarm notification when the execution time exceeds the threshold.
*/
@AllArgsConstructor
public class TaskTimeoutNotifyAlarmPlugin extends TaskTimeRecordPlugin {
public static final String PLUGIN_NAME = "task-timeout-notify-alarm-plugin";
/**
* threadPoolId
*/
private final String threadPoolId;
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return PLUGIN_NAME;
}
@Getter
@Setter
private Long executeTimeOut;
/**
* thread-pool
*/
private final ThreadPoolExecutor threadPoolExecutor;
/**
* Check whether the task execution time exceeds {@link #executeTimeOut},
* if it exceeds this time, send an alarm notification.
*
* @param executeTime executeTime in nanosecond
*/
@Override
protected void recordTaskTime(long executeTime) {
super.recordTaskTime(executeTime);
if (executeTime <= executeTimeOut) {
return;
}
Optional.ofNullable(ApplicationContextHolder.getInstance())
.map(context -> context.getBean(ThreadPoolNotifyAlarmHandler.class))
.ifPresent(handler -> handler.asyncSendExecuteTimeOutAlarm(
threadPoolId, executeTime, executeTimeOut, threadPoolExecutor));
}
}

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.PluginRuntime;
import cn.hippo4j.core.plugin.ShutdownAwarePlugin;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.*;
/**
* After the thread pool calls {@link ThreadPoolExecutor#shutdown()} or {@link ThreadPoolExecutor#shutdownNow()},
* if necessary, cancel the remaining tasks in the pool,
* and wait for the thread pool to terminate until
* the blocked main thread has timed out or the thread pool has completely terminated.
*/
@Accessors(chain = true)
@Getter
@Slf4j
@AllArgsConstructor
public class ThreadPoolExecutorShutdownPlugin implements ShutdownAwarePlugin {
public static final String PLUGIN_NAME = "thread-pool-executor-shutdown-plugin";
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return PLUGIN_NAME;
}
/**
* await termination millis
*/
@Setter
public long awaitTerminationMillis;
/**
* wait for tasks to complete on shutdown
*/
@Setter
public boolean waitForTasksToCompleteOnShutdown;
/**
* Callback before pool shutdown.
*
* @param executor executor
*/
@Override
public void beforeShutdown(ThreadPoolExecutor executor) {
if (executor instanceof ExtensibleThreadPoolExecutor) {
ExtensibleThreadPoolExecutor dynamicThreadPoolExecutor = (ExtensibleThreadPoolExecutor) executor;
String threadPoolId = dynamicThreadPoolExecutor.getThreadPoolId();
if (log.isInfoEnabled()) {
log.info("Before shutting down ExecutorService" + " '" + threadPoolId + "'");
}
}
}
/**
* Callback after pool shutdown.
* if {@link #waitForTasksToCompleteOnShutdown} return {@code true}
* cancel the remaining tasks,
* then wait for pool to terminate according {@link #awaitTerminationMillis} if necessary.
*
* @param executor executor
* @param remainingTasks remainingTasks
*/
@Override
public void afterShutdown(ThreadPoolExecutor executor, List<Runnable> remainingTasks) {
if (executor instanceof ExtensibleThreadPoolExecutor) {
ExtensibleThreadPoolExecutor pool = (ExtensibleThreadPoolExecutor) executor;
if (!waitForTasksToCompleteOnShutdown && CollectionUtil.isNotEmpty(remainingTasks)) {
remainingTasks.forEach(this::cancelRemainingTask);
}
awaitTerminationIfNecessary(pool);
}
}
/**
* Get plugin runtime info.
*
* @return plugin runtime info
*/
@Override
public PluginRuntime getPluginRuntime() {
return new PluginRuntime(getId())
.addInfo("awaitTerminationMillis", awaitTerminationMillis)
.addInfo("waitForTasksToCompleteOnShutdown", waitForTasksToCompleteOnShutdown);
}
/**
* Cancel the given remaining task which never commended execution,
* as returned from {@link ExecutorService#shutdownNow()}.
*
* @param task the task to cancel (typically a {@link RunnableFuture})
* @see RunnableFuture#cancel(boolean)
* @since 5.0.5
*/
protected void cancelRemainingTask(Runnable task) {
if (task instanceof Future) {
((Future<?>) task).cancel(true);
}
}
/**
* Wait for the executor to terminate, according to the value of {@link #awaitTerminationMillis}.
*/
private void awaitTerminationIfNecessary(ExtensibleThreadPoolExecutor executor) {
String threadPoolId = executor.getThreadPoolId();
if (this.awaitTerminationMillis <= 0) {
return;
}
try {
boolean isTerminated = executor.awaitTermination(this.awaitTerminationMillis, TimeUnit.MILLISECONDS);
if (!isTerminated && log.isWarnEnabled()) {
log.warn("Timed out while waiting for executor" + " '" + threadPoolId + "'" + " to terminate.");
}
} catch (InterruptedException ex) {
if (log.isWarnEnabled()) {
log.warn("Interrupted while waiting for executor" + " '" + threadPoolId + "'" + " to terminate.");
}
Thread.currentThread().interrupt();
}
}
}

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import cn.hippo4j.core.plugin.impl.*;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
/**
* Register default {@link ThreadPoolPlugin}.
*
* @see TaskDecoratorPlugin
* @see TaskTimeoutNotifyAlarmPlugin
* @see TaskRejectCountRecordPlugin
* @see TaskRejectNotifyAlarmPlugin
* @see ThreadPoolExecutorShutdownPlugin
*/
@NoArgsConstructor
@AllArgsConstructor
public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistrar {
public static final String REGISTRAR_NAME = "DefaultThreadPoolPluginRegistrar";
/**
* execute time out
*/
private long executeTimeOut;
/**
* await termination millis
*/
private long awaitTerminationMillis;
/**
* wait for tasks to complete on shutdown
*/
private boolean waitForTasksToCompleteOnShutdown;
/**
* Get id.
*
* @return id
*/
@Override
public String getId() {
return REGISTRAR_NAME;
}
/**
* Create and register plugin for the specified thread-pool instance.
*
* @param support thread pool plugin manager delegate
*/
@Override
public void doRegister(ThreadPoolPluginSupport support) {
// callback when task execute
support.register(new TaskDecoratorPlugin());
support.register(new TaskTimeoutNotifyAlarmPlugin(support.getThreadPoolId(), executeTimeOut, support.getThreadPoolExecutor()));
// callback when task rejected
support.register(new TaskRejectCountRecordPlugin());
support.register(new TaskRejectNotifyAlarmPlugin());
// callback when pool shutdown
support.register(new ThreadPoolExecutorShutdownPlugin(awaitTerminationMillis, waitForTasksToCompleteOnShutdown));
}
}

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
/**
* Registrar of {@link ThreadPoolPlugin}.
*/
public interface ThreadPoolPluginRegistrar {
/**
* Get id.
* In spring container, the obtained id will be used as the alias of the bean name.
*
* @return id
*/
String getId();
/**
* Create and register plugin for the specified thread-pool instance.
*
* @param support thread pool plugin manager delegate
*/
void doRegister(ThreadPoolPluginSupport support);
}
Loading…
Cancel
Save