From 9396b28535d85d61f5302eb3ef917e7a4459d57b Mon Sep 17 00:00:00 2001 From: huangchengxing <841396397@qq.com> Date: Thu, 27 Oct 2022 12:50:44 +0800 Subject: [PATCH] feat: Add plugins to support the default extensions of DynamicThreadPoolExecutor --- .../core/plugin/impl/TaskDecoratorPlugin.java | 105 +++++++++ .../impl/TaskRejectCountRecordPlugin.java | 83 +++++++ .../impl/TaskRejectNotifyAlarmPlugin.java | 61 +++++ .../plugin/impl/TaskTimeRecordPlugin.java | 213 ++++++++++++++++++ .../impl/TaskTimeoutNotifyAlarmPlugin.java | 79 +++++++ .../ThreadPoolExecutorShutdownPlugin.java | 152 +++++++++++++ .../DefaultThreadPoolPluginRegistrar.java | 82 +++++++ .../manager/ThreadPoolPluginRegistrar.java | 42 ++++ 8 files changed, 817 insertions(+) create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPlugin.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPlugin.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPlugin.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPlugin.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginRegistrar.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/ThreadPoolPluginRegistrar.java diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPlugin.java new file mode 100644 index 00000000..99212e18 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPlugin.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.PluginRuntime; +import cn.hippo4j.core.plugin.TaskAwarePlugin; +import lombok.Getter; +import lombok.NonNull; +import org.springframework.core.task.TaskDecorator; + +import java.util.ArrayList; +import java.util.List; + +/** + * Decorate tasks when they are submitted to thread-pool. + */ +public class TaskDecoratorPlugin implements TaskAwarePlugin { + + public static final String PLUGIN_NAME = "task-decorator-plugin"; + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return PLUGIN_NAME; + } + + /** + * decorators + */ + @Getter + private final List decorators = new ArrayList<>(); + + /** + * Callback when task is executed. + * + * @param runnable runnable + * @return tasks to be execute + * @see ExtensibleThreadPoolExecutor#execute + */ + @Override + public Runnable beforeTaskExecute(Runnable runnable) { + for (TaskDecorator decorator : decorators) { + runnable = decorator.decorate(runnable); + } + return runnable; + } + + /** + * Get plugin runtime info. + * + * @return plugin runtime info + */ + @Override + public PluginRuntime getPluginRuntime() { + return new PluginRuntime(getId()) + .addInfo("decorators", decorators); + } + + /** + * Add a decorator + * + * @param decorator decorator + */ + public void addDecorator(@NonNull TaskDecorator decorator) { + decorators.remove(decorator); + decorators.add(decorator); + } + + /** + * Clear all decorators + * + */ + public void clearDecorators() { + decorators.clear(); + } + + /** + * Remove decorators + * + */ + public void removeDecorator(TaskDecorator decorator) { + decorators.remove(decorator); + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPlugin.java new file mode 100644 index 00000000..0f478911 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPlugin.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.core.plugin.PluginRuntime; +import cn.hippo4j.core.plugin.RejectedAwarePlugin; +import lombok.Getter; +import lombok.Setter; + +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Record the number of tasks rejected by the thread pool. + */ +public class TaskRejectCountRecordPlugin implements RejectedAwarePlugin { + + public static final String PLUGIN_NAME = "task-reject-count-record-plugin"; + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return PLUGIN_NAME; + } + + /** + * rejection count + */ + @Setter + @Getter + private AtomicLong rejectCount = new AtomicLong(0); + + /** + * Get plugin runtime info. + * + * @return plugin runtime info + */ + @Override + public PluginRuntime getPluginRuntime() { + return new PluginRuntime(getId()) + .addInfo("rejectCount", getRejectCountNum()); + } + + /** + * Record rejection count. + * + * @param r task + * @param executor executor + */ + @Override + public void beforeRejectedExecution(Runnable r, ThreadPoolExecutor executor) { + rejectCount.incrementAndGet(); + } + + /** + * Get reject count num + * + * @return reject count num + */ + public Long getRejectCountNum() { + return rejectCount.get(); + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPlugin.java new file mode 100644 index 00000000..6b865642 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPlugin.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.common.config.ApplicationContextHolder; +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; +import cn.hippo4j.core.plugin.RejectedAwarePlugin; + +import java.util.Optional; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Send alert notification when a task is rejected. + */ +public class TaskRejectNotifyAlarmPlugin implements RejectedAwarePlugin { + + public static final String PLUGIN_NAME = "task-reject-notify-alarm-plugin"; + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return PLUGIN_NAME; + } + + /** + * Callback before task is rejected. + * + * @param runnable task + * @param executor executor + */ + @Override + public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { + if (!(executor instanceof ExtensibleThreadPoolExecutor)) { + return; + } + String threadPoolId = ((ExtensibleThreadPoolExecutor) executor).getThreadPoolId(); + Optional.ofNullable(ApplicationContextHolder.getInstance()) + .map(context -> context.getBean(ThreadPoolNotifyAlarmHandler.class)) + .ifPresent(handler -> handler.asyncSendRejectedAlarm(threadPoolId)); + } +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java new file mode 100644 index 00000000..3ce5c5ad --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPlugin.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.ExecuteAwarePlugin; +import cn.hippo4j.core.plugin.PluginRuntime; +import cn.hippo4j.core.toolkit.SystemClock; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.util.Objects; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Record task execution time indicator. + * + * @see TaskTimeoutNotifyAlarmPlugin + */ +@RequiredArgsConstructor +public class TaskTimeRecordPlugin implements ExecuteAwarePlugin { + + public static final String PLUGIN_NAME = "task-time-record-plugin"; + + /** + * Lock instance. + */ + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** + * Total execution milli time of all tasks. + */ + private long totalTaskTimeMillis = 0L; + + /** + * Maximum task milli execution time, default -1. + */ + private long maxTaskTimeMillis = -1L; + + /** + * Minimal task milli execution time, default -1. + */ + private long minTaskTimeMillis = -1L; + + /** + * Count of completed task. + */ + private long taskCount = 0L; + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return PLUGIN_NAME; + } + + /** + * start times of executed tasks + */ + private final ThreadLocal startTimes = new ThreadLocal<>(); + + /** + * Record the time when the worker thread starts executing the task. + * + * @param thread thread of executing task + * @param runnable task + * @see ExtensibleThreadPoolExecutor#beforeExecute + */ + @Override + public void beforeExecute(Thread thread, Runnable runnable) { + startTimes.set(SystemClock.now()); + } + + /** + * Get plugin runtime info. + * + * @return plugin runtime info + */ + @Override + public PluginRuntime getPluginRuntime() { + Summary summary = summarize(); + return new PluginRuntime(getId()) + .addInfo("taskCount", summary.getTaskCount()) + .addInfo("minTaskTime", summary.getMinTaskTimeMillis() + "ms") + .addInfo("maxTaskTime", summary.getMaxTaskTimeMillis() + "ms") + .addInfo("totalTaskTime", summary.getTotalTaskTimeMillis() + "ms") + .addInfo("avgTaskTime", summary.getAvgTaskTimeMillis() + "ms"); + } + + /** + * Record the total time for the worker thread to complete the task, and update the time record. + * + * @param runnable runnable + * @param throwable exception thrown during execution + */ + @Override + public void afterExecute(Runnable runnable, Throwable throwable) { + try { + Long startTime = startTimes.get(); + if (Objects.isNull(startTime)) { + return; + } + long executeTime = SystemClock.now() - startTime; + recordTaskTime(executeTime); + } finally { + startTimes.remove(); + } + } + + /** + * Refresh time indicators of the current instance. + * + * @param taskExecutionTime millisecond + */ + protected void recordTaskTime(long taskExecutionTime) { + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + if (taskCount == 0) { + maxTaskTimeMillis = taskExecutionTime; + minTaskTimeMillis = taskExecutionTime; + } else { + maxTaskTimeMillis = Math.max(taskExecutionTime, maxTaskTimeMillis); + minTaskTimeMillis = Math.min(taskExecutionTime, minTaskTimeMillis); + } + taskCount = taskCount + 1; + totalTaskTimeMillis += taskExecutionTime; + } finally { + writeLock.unlock(); + } + } + + /** + * Get the summary statistics of the instance at the current time. + * + * @return data snapshot + */ + public Summary summarize() { + Lock readLock = lock.readLock(); + Summary statistics; + readLock.lock(); + try { + statistics = new Summary( + this.totalTaskTimeMillis, + this.maxTaskTimeMillis, + this.minTaskTimeMillis, + this.taskCount); + } finally { + readLock.unlock(); + } + return statistics; + } + + /** + * Summary statistics of SyncTimeRecorder instance at a certain time. + */ + @Getter + @RequiredArgsConstructor + public static class Summary { + + /** + * Total execution nano time of all tasks. + */ + private final long totalTaskTimeMillis; + + /** + * Maximum task nano execution time. + */ + private final long maxTaskTimeMillis; + + /** + * Minimal task nano execution time. + */ + private final long minTaskTimeMillis; + + /** + * Count of completed task. + */ + private final long taskCount; + + /** + * Get the avg task time in milliseconds. + * + * @return avg task time + */ + public long getAvgTaskTimeMillis() { + long totalTaskCount = getTaskCount(); + return totalTaskCount > 0L ? getTotalTaskTimeMillis() / totalTaskCount : -1; + } + + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java new file mode 100644 index 00000000..12e522db --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.common.config.ApplicationContextHolder; +import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +import java.util.Optional; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Record task execution time indicator, + * and send alarm notification when the execution time exceeds the threshold. + */ +@AllArgsConstructor +public class TaskTimeoutNotifyAlarmPlugin extends TaskTimeRecordPlugin { + + public static final String PLUGIN_NAME = "task-timeout-notify-alarm-plugin"; + + /** + * threadPoolId + */ + private final String threadPoolId; + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return PLUGIN_NAME; + } + + @Getter + @Setter + private Long executeTimeOut; + + /** + * thread-pool + */ + private final ThreadPoolExecutor threadPoolExecutor; + + /** + * Check whether the task execution time exceeds {@link #executeTimeOut}, + * if it exceeds this time, send an alarm notification. + * + * @param executeTime executeTime in nanosecond + */ + @Override + protected void recordTaskTime(long executeTime) { + super.recordTaskTime(executeTime); + if (executeTime <= executeTimeOut) { + return; + } + Optional.ofNullable(ApplicationContextHolder.getInstance()) + .map(context -> context.getBean(ThreadPoolNotifyAlarmHandler.class)) + .ifPresent(handler -> handler.asyncSendExecuteTimeOutAlarm( + threadPoolId, executeTime, executeTimeOut, threadPoolExecutor)); + } +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPlugin.java new file mode 100644 index 00000000..e61fdfe1 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPlugin.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.common.toolkit.CollectionUtil; +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.PluginRuntime; +import cn.hippo4j.core.plugin.ShutdownAwarePlugin; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.concurrent.*; + +/** + * After the thread pool calls {@link ThreadPoolExecutor#shutdown()} or {@link ThreadPoolExecutor#shutdownNow()}, + * if necessary, cancel the remaining tasks in the pool, + * and wait for the thread pool to terminate until + * the blocked main thread has timed out or the thread pool has completely terminated. + */ +@Accessors(chain = true) +@Getter +@Slf4j +@AllArgsConstructor +public class ThreadPoolExecutorShutdownPlugin implements ShutdownAwarePlugin { + + public static final String PLUGIN_NAME = "thread-pool-executor-shutdown-plugin"; + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return PLUGIN_NAME; + } + + /** + * await termination millis + */ + @Setter + public long awaitTerminationMillis; + + /** + * wait for tasks to complete on shutdown + */ + @Setter + public boolean waitForTasksToCompleteOnShutdown; + + /** + * Callback before pool shutdown. + * + * @param executor executor + */ + @Override + public void beforeShutdown(ThreadPoolExecutor executor) { + if (executor instanceof ExtensibleThreadPoolExecutor) { + ExtensibleThreadPoolExecutor dynamicThreadPoolExecutor = (ExtensibleThreadPoolExecutor) executor; + String threadPoolId = dynamicThreadPoolExecutor.getThreadPoolId(); + if (log.isInfoEnabled()) { + log.info("Before shutting down ExecutorService" + " '" + threadPoolId + "'"); + } + } + } + + /** + * Callback after pool shutdown. + * if {@link #waitForTasksToCompleteOnShutdown} return {@code true}, + * cancel the remaining tasks, + * then wait for pool to terminate according {@link #awaitTerminationMillis} if necessary. + * + * @param executor executor + * @param remainingTasks remainingTasks + */ + @Override + public void afterShutdown(ThreadPoolExecutor executor, List remainingTasks) { + if (executor instanceof ExtensibleThreadPoolExecutor) { + ExtensibleThreadPoolExecutor pool = (ExtensibleThreadPoolExecutor) executor; + if (!waitForTasksToCompleteOnShutdown && CollectionUtil.isNotEmpty(remainingTasks)) { + remainingTasks.forEach(this::cancelRemainingTask); + } + awaitTerminationIfNecessary(pool); + } + } + + /** + * Get plugin runtime info. + * + * @return plugin runtime info + */ + @Override + public PluginRuntime getPluginRuntime() { + return new PluginRuntime(getId()) + .addInfo("awaitTerminationMillis", awaitTerminationMillis) + .addInfo("waitForTasksToCompleteOnShutdown", waitForTasksToCompleteOnShutdown); + } + + /** + * Cancel the given remaining task which never commended execution, + * as returned from {@link ExecutorService#shutdownNow()}. + * + * @param task the task to cancel (typically a {@link RunnableFuture}) + * @see RunnableFuture#cancel(boolean) + * @since 5.0.5 + */ + protected void cancelRemainingTask(Runnable task) { + if (task instanceof Future) { + ((Future) task).cancel(true); + } + } + + /** + * Wait for the executor to terminate, according to the value of {@link #awaitTerminationMillis}. + */ + private void awaitTerminationIfNecessary(ExtensibleThreadPoolExecutor executor) { + String threadPoolId = executor.getThreadPoolId(); + if (this.awaitTerminationMillis <= 0) { + return; + } + try { + boolean isTerminated = executor.awaitTermination(this.awaitTerminationMillis, TimeUnit.MILLISECONDS); + if (!isTerminated && log.isWarnEnabled()) { + log.warn("Timed out while waiting for executor" + " '" + threadPoolId + "'" + " to terminate."); + } + } catch (InterruptedException ex) { + if (log.isWarnEnabled()) { + log.warn("Interrupted while waiting for executor" + " '" + threadPoolId + "'" + " to terminate."); + } + Thread.currentThread().interrupt(); + } + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginRegistrar.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginRegistrar.java new file mode 100644 index 00000000..6177014c --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginRegistrar.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.manager; + +import cn.hippo4j.core.plugin.ThreadPoolPlugin; +import cn.hippo4j.core.plugin.impl.*; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; + +/** + * Register default {@link ThreadPoolPlugin}. + * + * @see TaskDecoratorPlugin + * @see TaskTimeoutNotifyAlarmPlugin + * @see TaskRejectCountRecordPlugin + * @see TaskRejectNotifyAlarmPlugin + * @see ThreadPoolExecutorShutdownPlugin + */ +@NoArgsConstructor +@AllArgsConstructor +public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistrar { + + public static final String REGISTRAR_NAME = "DefaultThreadPoolPluginRegistrar"; + + /** + * execute time out + */ + private long executeTimeOut; + + /** + * await termination millis + */ + private long awaitTerminationMillis; + + /** + * wait for tasks to complete on shutdown + */ + private boolean waitForTasksToCompleteOnShutdown; + + /** + * Get id. + * + * @return id + */ + @Override + public String getId() { + return REGISTRAR_NAME; + } + + /** + * Create and register plugin for the specified thread-pool instance. + * + * @param support thread pool plugin manager delegate + */ + @Override + public void doRegister(ThreadPoolPluginSupport support) { + // callback when task execute + support.register(new TaskDecoratorPlugin()); + support.register(new TaskTimeoutNotifyAlarmPlugin(support.getThreadPoolId(), executeTimeOut, support.getThreadPoolExecutor())); + // callback when task rejected + support.register(new TaskRejectCountRecordPlugin()); + support.register(new TaskRejectNotifyAlarmPlugin()); + // callback when pool shutdown + support.register(new ThreadPoolExecutorShutdownPlugin(awaitTerminationMillis, waitForTasksToCompleteOnShutdown)); + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/ThreadPoolPluginRegistrar.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/ThreadPoolPluginRegistrar.java new file mode 100644 index 00000000..23576c36 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/ThreadPoolPluginRegistrar.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.manager; + +import cn.hippo4j.core.plugin.ThreadPoolPlugin; + +/** + * Registrar of {@link ThreadPoolPlugin}. + */ +public interface ThreadPoolPluginRegistrar { + + /** + * Get id. + * In spring container, the obtained id will be used as the alias of the bean name. + * + * @return id + */ + String getId(); + + /** + * Create and register plugin for the specified thread-pool instance. + * + * @param support thread pool plugin manager delegate + */ + void doRegister(ThreadPoolPluginSupport support); + +}