From c645ab19e4351a84f96370130577c27ed970ad34 Mon Sep 17 00:00:00 2001 From: huangchengxing <841396397@qq.com> Date: Thu, 27 Oct 2022 12:14:38 +0800 Subject: [PATCH] feat: Add a new thread-pool that supports the registration of callback interfaces --- .../ExtensibleThreadPoolExecutor.java | 320 ++++++++++++++++++ .../core/plugin/ExecuteAwarePlugin.java | 48 +++ .../cn/hippo4j/core/plugin/PluginRuntime.java | 63 ++++ .../core/plugin/RejectedAwarePlugin.java | 37 ++ .../core/plugin/ShutdownAwarePlugin.java | 63 ++++ .../hippo4j/core/plugin/TaskAwarePlugin.java | 65 ++++ .../hippo4j/core/plugin/ThreadPoolPlugin.java | 82 +++++ .../DefaultThreadPoolPluginManager.java | 285 ++++++++++++++++ .../manager/EmptyThreadPoolPluginManager.java | 154 +++++++++ .../manager/ThreadPoolPluginManager.java | 176 ++++++++++ .../manager/ThreadPoolPluginSupport.java | 168 +++++++++ 11 files changed, 1461 insertions(+) create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ExecuteAwarePlugin.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/plugin/PluginRuntime.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/plugin/RejectedAwarePlugin.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ShutdownAwarePlugin.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/plugin/TaskAwarePlugin.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPlugin.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginManager.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/EmptyThreadPoolPluginManager.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/ThreadPoolPluginManager.java create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/ThreadPoolPluginSupport.java diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java new file mode 100644 index 00000000..4bd96b16 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.executor; + +import cn.hippo4j.core.plugin.*; +import cn.hippo4j.core.plugin.manager.ThreadPoolPluginManager; +import cn.hippo4j.core.plugin.manager.ThreadPoolPluginSupport; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NonNull; +import lombok.Setter; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.*; + +/** + *

Extensible thread-pool executor.
+ * Support the callback extension points provided on the basis of {@link ThreadPoolExecutor}. + * Each extension point corresponds to a different {@link ThreadPoolPlugin} interface, + * users can customize plug-ins and implement one or more {@link ThreadPoolPlugin} interface + * to enable plugins to sense thread pool behavior and provide extended functions. + * + * @see ThreadPoolPluginManager + * @see ThreadPoolPlugin + */ +public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements ThreadPoolPluginSupport { + + /** + * thread pool id + */ + @Getter + @NonNull + private final String threadPoolId; + + /** + * action aware registry + */ + @Getter + private final ThreadPoolPluginManager threadPoolPluginManager; + + /** + * handler wrapper, any changes to the current instance {@link RejectedExecutionHandler} should be made through this wrapper + */ + private final RejectedAwareHandlerWrapper handlerWrapper; + + /** + * Creates a new {@code ExtensibleThreadPoolExecutor} with the given initial parameters. + * + * @param threadPoolId thread-pool id + * @param threadPoolPluginManager action aware registry + * @param corePoolSize the number of threads to keep in the pool, even + * if they are idle, unless {@code allowCoreThreadTimeOut} is set + * @param maximumPoolSize the maximum number of threads to allow in the + * pool + * @param keepAliveTime when the number of threads is greater than + * the core, this is the maximum time that excess idle threads + * will wait for new tasks before terminating. + * @param unit the time unit for the {@code keepAliveTime} argument + * @param workQueue the queue to use for holding tasks before they are + * executed. This queue will hold only the {@code Runnable} + * tasks submitted by the {@code execute} method. + * @param threadFactory the factory to use when the executor + * creates a new thread + * @param handler the handler to use when execution is blocked + * because the thread bounds and queue capacities are reached + * @throws IllegalArgumentException if one of the following holds:
+ * {@code corePoolSize < 0}
+ * {@code keepAliveTime < 0}
+ * {@code maximumPoolSize <= 0}
+ * {@code maximumPoolSize < corePoolSize} + * @throws NullPointerException if {@code workQueue} + * or {@code threadFactory} or {@code handler} is null + */ + public ExtensibleThreadPoolExecutor( + @NonNull String threadPoolId, + @NonNull ThreadPoolPluginManager threadPoolPluginManager, + int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, + @NonNull BlockingQueue workQueue, + @NonNull ThreadFactory threadFactory, + @NonNull RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + + // pool extended info + this.threadPoolId = threadPoolId; + this.threadPoolPluginManager = threadPoolPluginManager; + + // proxy handler to support Aware callback + while (handler instanceof RejectedAwareHandlerWrapper) { + handler = ((RejectedAwareHandlerWrapper) handler).getHandler(); + } + this.handlerWrapper = new RejectedAwareHandlerWrapper(threadPoolPluginManager, handler); + super.setRejectedExecutionHandler(handlerWrapper); + } + + /** + * {@inheritDoc} + * + *

Before calling the parent class method, {@link ExecuteAwarePlugin#beforeExecute} will be called first. + * + * @param thread the thread that will run task {@code r} + * @param runnable the task that will be executed + */ + @Override + protected void beforeExecute(Thread thread, Runnable runnable) { + Collection executeAwarePluginList = threadPoolPluginManager.getExecuteAwarePluginList(); + executeAwarePluginList.forEach(aware -> aware.beforeExecute(thread, runnable)); + } + + /** + * {@inheritDoc} + * + *

Before calling the superclass method, {@link TaskAwarePlugin#beforeTaskExecute} will be called first. + * + * @param runnable the task to execute + */ + @Override + public void execute(@NonNull Runnable runnable) { + Collection taskAwarePluginList = threadPoolPluginManager.getTaskAwarePluginList(); + for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) { + runnable = taskAwarePlugin.beforeTaskExecute(runnable); + } + super.execute(runnable); + } + + /** + * {@inheritDoc} + * + *

After calling the superclass method, {@link ExecuteAwarePlugin#afterExecute} will be called last. + * + * @param runnable the runnable that has completed + * @param throwable the exception that caused termination, or null if + * execution completed normally + */ + @Override + protected void afterExecute(Runnable runnable, Throwable throwable) { + Collection executeAwarePluginList = threadPoolPluginManager.getExecuteAwarePluginList(); + executeAwarePluginList.forEach(aware -> aware.afterExecute(runnable, throwable)); + } + + /** + * {@inheritDoc} + * + *

Before calling the superclass method, + * {@link ShutdownAwarePlugin#beforeShutdown} will be called first. + * and then will be call {@link ShutdownAwarePlugin#afterShutdown} + * + * @throws SecurityException {@inheritDoc} + */ + @Override + public void shutdown() { + Collection shutdownAwarePluginList = threadPoolPluginManager.getShutdownAwarePluginList(); + shutdownAwarePluginList.forEach(aware -> aware.beforeShutdown(this)); + super.shutdown(); + shutdownAwarePluginList.forEach(aware -> aware.afterShutdown(this, Collections.emptyList())); + } + + /** + * {@inheritDoc} + * + *

Before calling the superclass method, + * {@link ShutdownAwarePlugin#beforeShutdown} will be called first. + * and then will be call {@link ShutdownAwarePlugin#afterShutdown} + * + * @throws SecurityException + */ + @Override + public List shutdownNow() { + Collection shutdownAwarePluginList = threadPoolPluginManager.getShutdownAwarePluginList(); + shutdownAwarePluginList.forEach(aware -> aware.beforeShutdown(this)); + List tasks = super.shutdownNow(); + shutdownAwarePluginList.forEach(aware -> aware.afterShutdown(this, tasks)); + return tasks; + } + + /** + * {@inheritDoc}. + * + *

Before calling the superclass method, {@link ShutdownAwarePlugin#afterTerminated} will be called first. + */ + @Override + protected void terminated() { + super.terminated(); + Collection shutdownAwarePluginList = threadPoolPluginManager.getShutdownAwarePluginList(); + shutdownAwarePluginList.forEach(aware -> aware.afterTerminated(this)); + } + + /** + * {@inheritDoc} + * + *

Before calling the superclass method, {@link TaskAwarePlugin#beforeTaskCreate} will be called first. + * + * @param runnable the runnable task being wrapped + * @param value the default value for the returned future + * @return a {@code RunnableFuture} which, when run, will run the + * underlying runnable and which, as a {@code Future}, will yield + * the given value as its result and provide for cancellation of + * the underlying task + * @since 1.6 + */ + @Override + protected RunnableFuture newTaskFor(Runnable runnable, T value) { + Collection taskAwarePluginList = threadPoolPluginManager.getTaskAwarePluginList(); + for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) { + runnable = taskAwarePlugin.beforeTaskCreate(this, runnable, value); + } + return super.newTaskFor(runnable, value); + } + + /** + * {@inheritDoc} + * + *

Before calling the superclass method, {@link TaskAwarePlugin#beforeTaskCreate} will be called first. + * + * @param callable the callable task being wrapped + * @return a {@code RunnableFuture} which, when run, will call the + * underlying callable and which, as a {@code Future}, will yield + * the callable's result as its result and provide for + * cancellation of the underlying task + * @since 1.6 + */ + @Override + protected RunnableFuture newTaskFor(Callable callable) { + Collection taskAwarePluginList = threadPoolPluginManager.getTaskAwarePluginList(); + for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) { + callable = taskAwarePlugin.beforeTaskCreate(this, callable); + } + return super.newTaskFor(callable); + } + + /** + * Sets a new handler for unexecutable tasks. + * + * @param handler the new handler + * @throws NullPointerException if handler is null + * @see #getRejectedExecutionHandler + */ + @Override + public void setRejectedExecutionHandler(@NonNull RejectedExecutionHandler handler) { + while (handler instanceof RejectedAwareHandlerWrapper) { + handler = ((RejectedAwareHandlerWrapper) handler).getHandler(); + } + handlerWrapper.setHandler(handler); + } + + /** + * Returns the current handler for unexecutable tasks. + * + * @return the current handler + * @see #setRejectedExecutionHandler(RejectedExecutionHandler) + */ + @Override + public RejectedExecutionHandler getRejectedExecutionHandler() { + return handlerWrapper.getHandler(); + } + + /** + * Get thread-pool executor. + * + * @return thread-pool executor + */ + @Override + public ThreadPoolExecutor getThreadPoolExecutor() { + return this; + } + + /** + * Wrapper of original {@link RejectedExecutionHandler} of {@link ThreadPoolExecutor}, + * It's used to support the {@link RejectedAwarePlugin} on the basis of the {@link RejectedExecutionHandler}. + * + * @see RejectedAwarePlugin + */ + @AllArgsConstructor + private static class RejectedAwareHandlerWrapper implements RejectedExecutionHandler { + + /** + * thread-pool action aware registry + */ + private final ThreadPoolPluginManager registry; + + /** + * original target + */ + @NonNull + @Setter + @Getter + private RejectedExecutionHandler handler; + + /** + * Call {@link RejectedAwarePlugin#beforeRejectedExecution}, then reject the task + * + * @param r the runnable task requested to be executed + * @param executor the executor attempting to execute this task + */ + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + Collection rejectedAwarePluginList = registry.getRejectedAwarePluginList(); + rejectedAwarePluginList.forEach(aware -> aware.beforeRejectedExecution(r, executor)); + handler.rejectedExecution(r, executor); + } + + } +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ExecuteAwarePlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ExecuteAwarePlugin.java new file mode 100644 index 00000000..d6c8638a --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ExecuteAwarePlugin.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin; + +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; + +/** + * Callback during task execution. + */ +public interface ExecuteAwarePlugin extends ThreadPoolPlugin { + + /** + * Callback before task execution. + * + * @param thread thread of executing task + * @param runnable task + * @see ExtensibleThreadPoolExecutor#beforeExecute + */ + default void beforeExecute(Thread thread, Runnable runnable) { + } + + /** + * Callback after task execution. + * + * @param runnable runnable + * @param throwable exception thrown during execution + * @see ExtensibleThreadPoolExecutor#afterExecute + */ + default void afterExecute(Runnable runnable, Throwable throwable) { + // do nothing + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/PluginRuntime.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/PluginRuntime.java new file mode 100644 index 00000000..a4177eb4 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/PluginRuntime.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.util.ArrayList; +import java.util.List; + +/** + * Plug in runtime information. + */ +@RequiredArgsConstructor +@Getter +public class PluginRuntime { + + /** + * plugin id + */ + private final String pluginId; + + /** + * runtime info + */ + private final List infoList = new ArrayList<>(); + + /** + * Add a runtime info item. + * + * @param name name + * @param value value + * @return runtime info item + */ + public PluginRuntime addInfo(String name, Object value) { + infoList.add(new Info(name, value)); + return this; + } + + @Getter + @RequiredArgsConstructor + public static class Info { + + private final String name; + private final Object value; + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/RejectedAwarePlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/RejectedAwarePlugin.java new file mode 100644 index 00000000..3df9629b --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/RejectedAwarePlugin.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Callback when task is rejected. + */ +public interface RejectedAwarePlugin extends ThreadPoolPlugin { + + /** + * Callback before task is rejected. + * + * @param runnable task + * @param executor executor + */ + default void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { + // do nothing + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ShutdownAwarePlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ShutdownAwarePlugin.java new file mode 100644 index 00000000..52396077 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ShutdownAwarePlugin.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin; + +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; + +import java.util.List; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Callback before thread-pool shutdown. + */ +public interface ShutdownAwarePlugin extends ThreadPoolPlugin { + + /** + * Callback before pool shutdown. + * + * @param executor executor + * @see ThreadPoolExecutor#shutdown() + * @see ThreadPoolExecutor#shutdownNow() + */ + default void beforeShutdown(ThreadPoolExecutor executor) { + // do nothing + } + + /** + * Callback after pool shutdown. + * + * @param executor executor + * @param remainingTasks remainingTasks, or empty if no tasks left or {@link ThreadPoolExecutor#shutdown()} called + * @see ThreadPoolExecutor#shutdown() + * @see ThreadPoolExecutor#shutdownNow() + */ + default void afterShutdown(ThreadPoolExecutor executor, List remainingTasks) { + // do nothing + } + + /** + * Callback after pool terminated. + * + * @param executor executor + * @see ThreadPoolExecutor#terminated() + */ + default void afterTerminated(ExtensibleThreadPoolExecutor executor) { + // do nothing + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/TaskAwarePlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/TaskAwarePlugin.java new file mode 100644 index 00000000..33c56599 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/TaskAwarePlugin.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin; + +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; + +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Callback during task submit in thread-pool. + */ +public interface TaskAwarePlugin extends ThreadPoolPlugin { + + /** + * Callback during the {@link java.util.concurrent.RunnableFuture} task create in thread-pool. + * + * @param executor executor + * @param runnable original task + * @return Tasks that really need to be performed + * @see ThreadPoolExecutor#newTaskFor(Runnable, Object) + */ + default Runnable beforeTaskCreate(ThreadPoolExecutor executor, Runnable runnable, V value) { + return runnable; + } + + /** + * Callback during the {@link java.util.concurrent.RunnableFuture} task create in thread-pool. + * + * @param executor executor + * @param future original task + * @return Tasks that really need to be performed + * @see ThreadPoolExecutor#newTaskFor(Callable) + */ + default Callable beforeTaskCreate(ThreadPoolExecutor executor, Callable future) { + return future; + } + + /** + * Callback when task is execute. + * + * @param runnable runnable + * @return tasks to be execute + * @see ExtensibleThreadPoolExecutor#execute + */ + default Runnable beforeTaskExecute(Runnable runnable) { + return runnable; + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPlugin.java new file mode 100644 index 00000000..76238432 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPlugin.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin; + +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.manager.ThreadPoolPluginManager; +import cn.hippo4j.core.plugin.manager.ThreadPoolPluginSupport; + +/** + *

A marker superinterface indicating that + * an instance class is eligible to be sense and intercept + * some operations of the specific thread-pool instance. + * + *

Generally, any thread-pool that implements the {@link ThreadPoolPluginSupport} + * can be register multiple plugins by {@link ThreadPoolPluginSupport#register}, + * and the plugin will provide some extension function of original + * {@link java.util.concurrent.ThreadPoolExecutor} does not support. + * + *

During runtime, plugins can dynamically modify some configurable parameters + * and provide some runtime information by {@link #getPluginRuntime()}. + * When the thread-pool is destroyed, the plugin will also be destroyed. + * + * @see ExtensibleThreadPoolExecutor + * @see ThreadPoolPluginManager + * @see TaskAwarePlugin + * @see ExecuteAwarePlugin + * @see ShutdownAwarePlugin + * @see RejectedAwarePlugin + */ +public interface ThreadPoolPlugin { + + /** + * Get id. + * + * @return id + */ + String getId(); + + /** + * Callback when plugin register into manager + * + * @see ThreadPoolPluginManager#register + */ + default void start() { + // do nothing + } + + /** + * Callback when plugin unregister from manager + * + * @see ThreadPoolPluginManager#unregister + * @see ThreadPoolPluginManager#clear + */ + default void stop() { + // do nothing + } + + /** + * Get plugin runtime info. + * + * @return plugin runtime info + */ + default PluginRuntime getPluginRuntime() { + return new PluginRuntime(getId()); + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginManager.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginManager.java new file mode 100644 index 00000000..bf7cf538 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginManager.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.manager; + +import cn.hippo4j.common.toolkit.Assert; +import cn.hippo4j.core.plugin.*; +import lombok.NonNull; + +import java.util.*; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * The default implementation of {@link ThreadPoolPluginManager}. + */ +public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager { + + /** + * lock of this instance + */ + private final ReadWriteLock instanceLock = new ReentrantReadWriteLock(); + + /** + * Registered {@link ThreadPoolPlugin}. + */ + private final Map registeredPlugins = new HashMap<>(16); + + /** + * Registered {@link TaskAwarePlugin}. + */ + private final List taskAwarePluginList = new ArrayList<>(); + + /** + * Registered {@link ExecuteAwarePlugin}. + */ + private final List executeAwarePluginList = new ArrayList<>(); + + /** + * Registered {@link RejectedAwarePlugin}. + */ + private final List rejectedAwarePluginList = new ArrayList<>(); + + /** + * Registered {@link ShutdownAwarePlugin}. + */ + private final List shutdownAwarePluginList = new ArrayList<>(); + + /** + * Clear all. + */ + @Override + public synchronized void clear() { + Lock writeLock = instanceLock.writeLock(); + writeLock.lock(); + try { + Collection plugins = registeredPlugins.values(); + registeredPlugins.clear(); + taskAwarePluginList.clear(); + executeAwarePluginList.clear(); + rejectedAwarePluginList.clear(); + shutdownAwarePluginList.clear(); + plugins.forEach(ThreadPoolPlugin::stop); + } finally { + writeLock.unlock(); + } + } + + /** + * Register a {@link ThreadPoolPlugin} + * + * @param plugin plugin + * @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()} already exists in the registry + * @see ThreadPoolPlugin#getId() + */ + @Override + public void register(@NonNull ThreadPoolPlugin plugin) { + Lock writeLock = instanceLock.writeLock(); + writeLock.lock(); + try { + String id = plugin.getId(); + Assert.isTrue(!isRegistered(id), "The plug-in with id [" + id + "] has been registered"); + + // register plugin + registeredPlugins.put(id, plugin); + // quick index + if (plugin instanceof TaskAwarePlugin) { + taskAwarePluginList.add((TaskAwarePlugin) plugin); + } + if (plugin instanceof ExecuteAwarePlugin) { + executeAwarePluginList.add((ExecuteAwarePlugin) plugin); + } + if (plugin instanceof RejectedAwarePlugin) { + rejectedAwarePluginList.add((RejectedAwarePlugin) plugin); + } + if (plugin instanceof ShutdownAwarePlugin) { + shutdownAwarePluginList.add((ShutdownAwarePlugin) plugin); + } + plugin.start(); + } finally { + writeLock.unlock(); + } + } + + /** + * Register plugin if it's not registered. + * + * @param plugin plugin + * @return return true if successful register new plugin, false otherwise + */ + @Override + public boolean tryRegister(ThreadPoolPlugin plugin) { + Lock writeLock = instanceLock.writeLock(); + writeLock.lock(); + try { + if (registeredPlugins.containsKey(plugin.getId())) { + return false; + } + register(plugin); + return true; + } finally { + writeLock.unlock(); + } + } + + /** + * Unregister {@link ThreadPoolPlugin} + * + * @param pluginId plugin id + */ + @Override + public void unregister(String pluginId) { + Lock writeLock = instanceLock.writeLock(); + writeLock.lock(); + try { + Optional.ofNullable(pluginId) + .map(registeredPlugins::remove) + .ifPresent(plugin -> { + // remove quick index if necessary + if (plugin instanceof TaskAwarePlugin) { + taskAwarePluginList.remove(plugin); + } + if (plugin instanceof ExecuteAwarePlugin) { + executeAwarePluginList.remove(plugin); + } + if (plugin instanceof RejectedAwarePlugin) { + rejectedAwarePluginList.remove(plugin); + } + if (plugin instanceof ShutdownAwarePlugin) { + shutdownAwarePluginList.remove(plugin); + } + plugin.stop(); + }); + } finally { + writeLock.unlock(); + } + } + + @Override + public Collection getAllPlugins() { + Lock readLock = instanceLock.readLock(); + readLock.lock(); + try { + return registeredPlugins.values(); + } finally { + readLock.unlock(); + } + } + + /** + * Whether the {@link ThreadPoolPlugin} has been registered. + * + * @param pluginId plugin id + * @return ture if target has been registered, false otherwise + */ + @Override + public boolean isRegistered(String pluginId) { + Lock readLock = instanceLock.readLock(); + readLock.lock(); + try { + return registeredPlugins.containsKey(pluginId); + } finally { + readLock.unlock(); + } + } + + /** + * Get {@link ThreadPoolPlugin} + * + * @param pluginId plugin id + * @param plugin type + * @return {@link ThreadPoolPlugin}, null if unregister + */ + @Override + @SuppressWarnings("unchecked") + public Optional getPlugin(String pluginId) { + Lock readLock = instanceLock.readLock(); + readLock.lock(); + try { + return (Optional) Optional.ofNullable(registeredPlugins.get(pluginId)); + } finally { + readLock.unlock(); + } + } + + /** + * Get execute plugin list. + * + * @return {@link ExecuteAwarePlugin} + */ + @Override + public Collection getExecuteAwarePluginList() { + Lock readLock = instanceLock.readLock(); + readLock.lock(); + try { + return executeAwarePluginList; + } finally { + readLock.unlock(); + } + } + + /** + * Get rejected plugin list. + * + * @return {@link RejectedAwarePlugin} + */ + @Override + public Collection getRejectedAwarePluginList() { + Lock readLock = instanceLock.readLock(); + readLock.lock(); + try { + return rejectedAwarePluginList; + } finally { + readLock.unlock(); + } + } + + /** + * Get shutdown plugin list. + * + * @return {@link ShutdownAwarePlugin} + */ + @Override + public Collection getShutdownAwarePluginList() { + Lock readLock = instanceLock.readLock(); + readLock.lock(); + try { + return shutdownAwarePluginList; + } finally { + readLock.unlock(); + } + } + + /** + * Get shutdown plugin list. + * + * @return {@link ShutdownAwarePlugin} + */ + @Override + public Collection getTaskAwarePluginList() { + Lock readLock = instanceLock.readLock(); + readLock.lock(); + try { + return taskAwarePluginList; + } finally { + readLock.unlock(); + } + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/EmptyThreadPoolPluginManager.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/EmptyThreadPoolPluginManager.java new file mode 100644 index 00000000..dff975b4 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/EmptyThreadPoolPluginManager.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.manager; + +import cn.hippo4j.core.plugin.*; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import java.util.Collection; +import java.util.Collections; +import java.util.Optional; + +/** + * Empty thread pool plugin manager. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class EmptyThreadPoolPluginManager implements ThreadPoolPluginManager { + + /** + * default instance + */ + public static final EmptyThreadPoolPluginManager INSTANCE = new EmptyThreadPoolPluginManager(); + + /** + * Clear all. + */ + @Override + public void clear() { + // do nothing + } + + /** + * Get all registered plugins. + * + * @return plugins + */ + @Override + public Collection getAllPlugins() { + return Collections.emptyList(); + } + + /** + * Register a {@link ThreadPoolPlugin} + * + * @param plugin plugin + * @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()} + * already exists in the registry + * @see ThreadPoolPlugin#getId() + */ + @Override + public void register(ThreadPoolPlugin plugin) { + // do nothing + } + + /** + * Register plugin if it's not registered. + * + * @param plugin plugin + * @return return true if successful register new plugin, false otherwise + */ + @Override + public boolean tryRegister(ThreadPoolPlugin plugin) { + return false; + } + + /** + * Whether the {@link ThreadPoolPlugin} has been registered. + * + * @param pluginId plugin id + * @return ture if target has been registered, false otherwise + */ + @Override + public boolean isRegistered(String pluginId) { + return false; + } + + /** + * Unregister {@link ThreadPoolPlugin} + * + * @param pluginId plugin id + */ + @Override + public void unregister(String pluginId) { + // do nothing + } + + /** + * Get {@link ThreadPoolPlugin} + * + * @param pluginId plugin id + * @return {@link ThreadPoolPlugin} + * @throws ClassCastException thrown when the object obtained by name cannot be converted to target type + */ + @Override + public Optional getPlugin(String pluginId) { + return Optional.empty(); + } + + /** + * Get execute aware plugin list. + * + * @return {@link ExecuteAwarePlugin} + */ + @Override + public Collection getExecuteAwarePluginList() { + return Collections.emptyList(); + } + + /** + * Get rejected aware plugin list. + * + * @return {@link RejectedAwarePlugin} + */ + @Override + public Collection getRejectedAwarePluginList() { + return Collections.emptyList(); + } + + /** + * Get shutdown aware plugin list. + * + * @return {@link ShutdownAwarePlugin} + */ + @Override + public Collection getShutdownAwarePluginList() { + return Collections.emptyList(); + } + + /** + * Get shutdown aware plugin list. + * + * @return {@link ShutdownAwarePlugin} + */ + @Override + public Collection getTaskAwarePluginList() { + return Collections.emptyList(); + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/ThreadPoolPluginManager.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/ThreadPoolPluginManager.java new file mode 100644 index 00000000..dd2a7777 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/ThreadPoolPluginManager.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.manager; + +import cn.hippo4j.core.plugin.*; + +import java.util.Collection; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Manager of {@link ThreadPoolPlugin}. + * Bind with the specified thread-pool instance to register and manage plugins. + * when the thread pool is destroyed, please ensure that the manager will also be destroyed. + * + * @see DefaultThreadPoolPluginManager + */ +public interface ThreadPoolPluginManager { + + /** + * Get an empty manager. + * + * @return {@link EmptyThreadPoolPluginManager} + */ + static ThreadPoolPluginManager empty() { + return EmptyThreadPoolPluginManager.INSTANCE; + } + + /** + * Clear all. + */ + void clear(); + + /** + * Get all registered plugins. + * + * @return plugins + */ + Collection getAllPlugins(); + + /** + * Register a {@link ThreadPoolPlugin} + * + * @param plugin plugin + * @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()} + * already exists in the registry + * @see ThreadPoolPlugin#getId() + */ + void register(ThreadPoolPlugin plugin); + + /** + * Register plugin if it's not registered. + * + * @param plugin plugin + * @return return true if successful register new plugin, false otherwise + */ + boolean tryRegister(ThreadPoolPlugin plugin); + + /** + * Whether the {@link ThreadPoolPlugin} has been registered. + * + * @param pluginId plugin id + * @return ture if target has been registered, false otherwise + */ + boolean isRegistered(String pluginId); + + /** + * Unregister {@link ThreadPoolPlugin} + * + * @param pluginId plugin id + */ + void unregister(String pluginId); + + /** + * Get {@link ThreadPoolPlugin} + * + * @param pluginId plugin id + * @param target aware type + * @return {@link ThreadPoolPlugin} + * @throws ClassCastException thrown when the object obtained by name cannot be converted to target type + */ + Optional getPlugin(String pluginId); + + /** + * Get execute aware plugin list. + * + * @return {@link ExecuteAwarePlugin} + */ + Collection getExecuteAwarePluginList(); + + /** + * Get rejected aware plugin list. + * + * @return {@link RejectedAwarePlugin} + */ + Collection getRejectedAwarePluginList(); + + /** + * Get shutdown aware plugin list. + * + * @return {@link ShutdownAwarePlugin} + */ + Collection getShutdownAwarePluginList(); + + /** + * Get shutdown aware plugin list. + * + * @return {@link ShutdownAwarePlugin} + */ + Collection getTaskAwarePluginList(); + + // ==================== default methods ==================== + + /** + * Get plugin of type. + * + * @param pluginId plugin id + * @param pluginType plugin type + * @return target plugin + */ + default Optional getPluginOfType(String pluginId, Class pluginType) { + return getPlugin(pluginId) + .filter(pluginType::isInstance) + .map(pluginType::cast); + } + + /** + * Get all plugins of type. + * + * @param pluginType plugin type + * @return all plugins of type + */ + default Collection getAllPluginsOfType(Class pluginType) { + return getAllPlugins().stream() + .filter(pluginType::isInstance) + .map(pluginType::cast) + .collect(Collectors.toList()); + } + + /** + * Get {@link PluginRuntime} of all registered plugins. + * + * @return {@link PluginRuntime} of all registered plugins + */ + default Collection getAllPluginRuntimes() { + return getAllPlugins().stream() + .map(ThreadPoolPlugin::getPluginRuntime) + .collect(Collectors.toList()); + } + + /** + * Get {@link PluginRuntime} of registered plugin. + * + * @return {@link PluginRuntime} of registered plugin + */ + default Optional getRuntime(String pluginId) { + return getPlugin(pluginId) + .map(ThreadPoolPlugin::getPluginRuntime); + } + +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/ThreadPoolPluginSupport.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/ThreadPoolPluginSupport.java new file mode 100644 index 00000000..c3bd4187 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/ThreadPoolPluginSupport.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.manager; + +import cn.hippo4j.core.plugin.*; +import org.checkerframework.checker.nullness.qual.NonNull; + +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Used to support the binding of {@link ThreadPoolPluginManager} and {@link ThreadPoolExecutor}. + */ +public interface ThreadPoolPluginSupport extends ThreadPoolPluginManager { + + /** + * Get thread pool action aware registry. + * + * @return {@link ThreadPoolPluginManager} + */ + @NonNull + ThreadPoolPluginManager getThreadPoolPluginManager(); + + /** + * Get thread-pool id + * + * @return thread-pool id + */ + String getThreadPoolId(); + + /** + * Get thread-pool executor. + * + * @return thread-pool executor + */ + ThreadPoolExecutor getThreadPoolExecutor(); + + // ======================== delegate methods ======================== + + /** + * Clear all. + */ + @Override + default void clear() { + getThreadPoolPluginManager().clear(); + } + + /** + * Register a {@link ThreadPoolPlugin} + * + * @param plugin aware + */ + @Override + default void register(ThreadPoolPlugin plugin) { + getThreadPoolPluginManager().register(plugin); + } + + /** + * Register plugin if it's not registered. + * + * @param plugin plugin + * @return return true if successful register new plugin, false otherwise + */ + @Override + default boolean tryRegister(ThreadPoolPlugin plugin) { + return getThreadPoolPluginManager().tryRegister(plugin); + } + + /** + * Whether the {@link ThreadPoolPlugin} has been registered. + * + * @param pluginId name + * @return ture if target has been registered, false otherwise + */ + @Override + default boolean isRegistered(String pluginId) { + return getThreadPoolPluginManager().isRegistered(pluginId); + } + + /** + * Unregister {@link ThreadPoolPlugin} + * + * @param pluginId name + */ + @Override + default void unregister(String pluginId) { + getThreadPoolPluginManager().unregister(pluginId); + } + + /** + * Get all registered plugins. + * + * @return plugins + */ + @Override + default Collection getAllPlugins() { + return getThreadPoolPluginManager().getAllPlugins(); + } + + /** + * Get {@link ThreadPoolPlugin} + * + * @param pluginId target name + * @return {@link ThreadPoolPlugin}, null if unregister + * @throws ClassCastException thrown when the object obtained by name cannot be converted to target type + */ + @Override + default Optional getPlugin(String pluginId) { + return getThreadPoolPluginManager().getPlugin(pluginId); + } + + /** + * Get execute aware list. + * + * @return {@link ExecuteAwarePlugin} + */ + @Override + default Collection getExecuteAwarePluginList() { + return getThreadPoolPluginManager().getExecuteAwarePluginList(); + } + + /** + * Get rejected aware list. + * + * @return {@link RejectedAwarePlugin} + */ + @Override + default Collection getRejectedAwarePluginList() { + return getThreadPoolPluginManager().getRejectedAwarePluginList(); + } + + /** + * Get shutdown aware list. + * + * @return {@link ShutdownAwarePlugin} + */ + @Override + default Collection getShutdownAwarePluginList() { + return getThreadPoolPluginManager().getShutdownAwarePluginList(); + } + + /** + * Get shutdown aware list. + * + * @return {@link ShutdownAwarePlugin} + */ + @Override + default Collection getTaskAwarePluginList() { + return getThreadPoolPluginManager().getTaskAwarePluginList(); + } + +}