feat: Add a new thread-pool that supports the registration of callback interfaces

pull/854/head
huangchengxing 3 years ago
parent 4e68645ccb
commit 54b0adb695

@ -0,0 +1,320 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.executor;
import cn.hippo4j.core.plugin.*;
import cn.hippo4j.core.plugin.manager.ThreadPoolPluginManager;
import cn.hippo4j.core.plugin.manager.ThreadPoolPluginSupport;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
/**
* <p>Extensible thread-pool executor. <br />
* Support the callback extension points provided on the basis of {@link ThreadPoolExecutor}.
* Each extension point corresponds to a different {@link ThreadPoolPlugin} interface,
* users can customize plug-ins and implement one or more {@link ThreadPoolPlugin} interface
* to enable plugins to sense thread pool behavior and provide extended functions.
*
* @see ThreadPoolPluginManager
* @see ThreadPoolPlugin
*/
public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements ThreadPoolPluginSupport {
/**
* thread pool id
*/
@Getter
@NonNull
private final String threadPoolId;
/**
* action aware registry
*/
@Getter
private final ThreadPoolPluginManager threadPoolPluginManager;
/**
* handler wrapper, any changes to the current instance {@link RejectedExecutionHandler} should be made through this wrapper
*/
private final RejectedAwareHandlerWrapper handlerWrapper;
/**
* Creates a new {@code ExtensibleThreadPoolExecutor} with the given initial parameters.
*
* @param threadPoolId thread-pool id
* @param threadPoolPluginManager action aware registry
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ExtensibleThreadPoolExecutor(
@NonNull String threadPoolId,
@NonNull ThreadPoolPluginManager threadPoolPluginManager,
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
@NonNull BlockingQueue<Runnable> workQueue,
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
// pool extended info
this.threadPoolId = threadPoolId;
this.threadPoolPluginManager = threadPoolPluginManager;
// proxy handler to support Aware callback
while (handler instanceof RejectedAwareHandlerWrapper) {
handler = ((RejectedAwareHandlerWrapper) handler).getHandler();
}
this.handlerWrapper = new RejectedAwareHandlerWrapper(threadPoolPluginManager, handler);
super.setRejectedExecutionHandler(handlerWrapper);
}
/**
* {@inheritDoc}
*
* <p><b>Before calling the parent class method, {@link ExecuteAwarePlugin#beforeExecute} will be called first.
*
* @param thread the thread that will run task {@code r}
* @param runnable the task that will be executed
*/
@Override
protected void beforeExecute(Thread thread, Runnable runnable) {
Collection<ExecuteAwarePlugin> executeAwarePluginList = threadPoolPluginManager.getExecuteAwarePluginList();
executeAwarePluginList.forEach(aware -> aware.beforeExecute(thread, runnable));
}
/**
* {@inheritDoc}
*
* <p><b>Before calling the superclass method, {@link TaskAwarePlugin#beforeTaskExecute} will be called first.
*
* @param runnable the task to execute
*/
@Override
public void execute(@NonNull Runnable runnable) {
Collection<TaskAwarePlugin> taskAwarePluginList = threadPoolPluginManager.getTaskAwarePluginList();
for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) {
runnable = taskAwarePlugin.beforeTaskExecute(runnable);
}
super.execute(runnable);
}
/**
* {@inheritDoc}
*
* <p><b>After calling the superclass method, {@link ExecuteAwarePlugin#afterExecute} will be called last.
*
* @param runnable the runnable that has completed
* @param throwable the exception that caused termination, or null if
* execution completed normally
*/
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
Collection<ExecuteAwarePlugin> executeAwarePluginList = threadPoolPluginManager.getExecuteAwarePluginList();
executeAwarePluginList.forEach(aware -> aware.afterExecute(runnable, throwable));
}
/**
* {@inheritDoc}
*
* <p><b>Before calling the superclass method,
* {@link ShutdownAwarePlugin#beforeShutdown} will be called first.
* and then will be call {@link ShutdownAwarePlugin#afterShutdown}
*
* @throws SecurityException {@inheritDoc}
*/
@Override
public void shutdown() {
Collection<ShutdownAwarePlugin> shutdownAwarePluginList = threadPoolPluginManager.getShutdownAwarePluginList();
shutdownAwarePluginList.forEach(aware -> aware.beforeShutdown(this));
super.shutdown();
shutdownAwarePluginList.forEach(aware -> aware.afterShutdown(this, Collections.emptyList()));
}
/**
* {@inheritDoc}
*
* <p><b>Before calling the superclass method,
* {@link ShutdownAwarePlugin#beforeShutdown} will be called first.
* and then will be call {@link ShutdownAwarePlugin#afterShutdown}
*
* @throws SecurityException
*/
@Override
public List<Runnable> shutdownNow() {
Collection<ShutdownAwarePlugin> shutdownAwarePluginList = threadPoolPluginManager.getShutdownAwarePluginList();
shutdownAwarePluginList.forEach(aware -> aware.beforeShutdown(this));
List<Runnable> tasks = super.shutdownNow();
shutdownAwarePluginList.forEach(aware -> aware.afterShutdown(this, tasks));
return tasks;
}
/**
* {@inheritDoc}.
*
* <p><b>Before calling the superclass method, {@link ShutdownAwarePlugin#afterTerminated} will be called first.
*/
@Override
protected void terminated() {
super.terminated();
Collection<ShutdownAwarePlugin> shutdownAwarePluginList = threadPoolPluginManager.getShutdownAwarePluginList();
shutdownAwarePluginList.forEach(aware -> aware.afterTerminated(this));
}
/**
* {@inheritDoc}
*
* <p><b>Before calling the superclass method, {@link TaskAwarePlugin#beforeTaskCreate} will be called first.
*
* @param runnable the runnable task being wrapped
* @param value the default value for the returned future
* @return a {@code RunnableFuture} which, when run, will run the
* underlying runnable and which, as a {@code Future}, will yield
* the given value as its result and provide for cancellation of
* the underlying task
* @since 1.6
*/
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
Collection<TaskAwarePlugin> taskAwarePluginList = threadPoolPluginManager.getTaskAwarePluginList();
for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) {
runnable = taskAwarePlugin.beforeTaskCreate(this, runnable, value);
}
return super.newTaskFor(runnable, value);
}
/**
* {@inheritDoc}
*
* <p><b>Before calling the superclass method, {@link TaskAwarePlugin#beforeTaskCreate} will be called first.
*
* @param callable the callable task being wrapped
* @return a {@code RunnableFuture} which, when run, will call the
* underlying callable and which, as a {@code Future}, will yield
* the callable's result as its result and provide for
* cancellation of the underlying task
* @since 1.6
*/
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
Collection<TaskAwarePlugin> taskAwarePluginList = threadPoolPluginManager.getTaskAwarePluginList();
for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) {
callable = taskAwarePlugin.beforeTaskCreate(this, callable);
}
return super.newTaskFor(callable);
}
/**
* Sets a new handler for unexecutable tasks.
*
* @param handler the new handler
* @throws NullPointerException if handler is null
* @see #getRejectedExecutionHandler
*/
@Override
public void setRejectedExecutionHandler(@NonNull RejectedExecutionHandler handler) {
while (handler instanceof RejectedAwareHandlerWrapper) {
handler = ((RejectedAwareHandlerWrapper) handler).getHandler();
}
handlerWrapper.setHandler(handler);
}
/**
* Returns the current handler for unexecutable tasks.
*
* @return the current handler
* @see #setRejectedExecutionHandler(RejectedExecutionHandler)
*/
@Override
public RejectedExecutionHandler getRejectedExecutionHandler() {
return handlerWrapper.getHandler();
}
/**
* Get thread-pool executor.
*
* @return thread-pool executor
*/
@Override
public ThreadPoolExecutor getThreadPoolExecutor() {
return this;
}
/**
* Wrapper of original {@link RejectedExecutionHandler} of {@link ThreadPoolExecutor},
* It's used to support the {@link RejectedAwarePlugin} on the basis of the {@link RejectedExecutionHandler}.
*
* @see RejectedAwarePlugin
*/
@AllArgsConstructor
private static class RejectedAwareHandlerWrapper implements RejectedExecutionHandler {
/**
* thread-pool action aware registry
*/
private final ThreadPoolPluginManager registry;
/**
* original target
*/
@NonNull
@Setter
@Getter
private RejectedExecutionHandler handler;
/**
* Call {@link RejectedAwarePlugin#beforeRejectedExecution}, then reject the task
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
Collection<RejectedAwarePlugin> rejectedAwarePluginList = registry.getRejectedAwarePluginList();
rejectedAwarePluginList.forEach(aware -> aware.beforeRejectedExecution(r, executor));
handler.rejectedExecution(r, executor);
}
}
}

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
/**
* Callback during task execution.
*/
public interface ExecuteAwarePlugin extends ThreadPoolPlugin {
/**
* Callback before task execution.
*
* @param thread thread of executing task
* @param runnable task
* @see ExtensibleThreadPoolExecutor#beforeExecute
*/
default void beforeExecute(Thread thread, Runnable runnable) {
}
/**
* Callback after task execution.
*
* @param runnable runnable
* @param throwable exception thrown during execution
* @see ExtensibleThreadPoolExecutor#afterExecute
*/
default void afterExecute(Runnable runnable, Throwable throwable) {
// do nothing
}
}

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.util.ArrayList;
import java.util.List;
/**
* Plug in runtime information.
*/
@RequiredArgsConstructor
@Getter
public class PluginRuntime {
/**
* plugin id
*/
private final String pluginId;
/**
* runtime info
*/
private final List<Info> infoList = new ArrayList<>();
/**
* Add a runtime info item.
*
* @param name name
* @param value value
* @return runtime info item
*/
public PluginRuntime addInfo(String name, Object value) {
infoList.add(new Info(name, value));
return this;
}
@Getter
@RequiredArgsConstructor
public static class Info {
private final String name;
private final Object value;
}
}

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Callback when task is rejected.
*/
public interface RejectedAwarePlugin extends ThreadPoolPlugin {
/**
* Callback before task is rejected.
*
* @param runnable task
* @param executor executor
*/
default void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
// do nothing
}
}

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Callback before thread-pool shutdown.
*/
public interface ShutdownAwarePlugin extends ThreadPoolPlugin {
/**
* Callback before pool shutdown.
*
* @param executor executor
* @see ThreadPoolExecutor#shutdown()
* @see ThreadPoolExecutor#shutdownNow()
*/
default void beforeShutdown(ThreadPoolExecutor executor) {
// do nothing
}
/**
* Callback after pool shutdown.
*
* @param executor executor
* @param remainingTasks remainingTasks, or empty if no tasks left or {@link ThreadPoolExecutor#shutdown()} called
* @see ThreadPoolExecutor#shutdown()
* @see ThreadPoolExecutor#shutdownNow()
*/
default void afterShutdown(ThreadPoolExecutor executor, List<Runnable> remainingTasks) {
// do nothing
}
/**
* Callback after pool terminated.
*
* @param executor executor
* @see ThreadPoolExecutor#terminated()
*/
default void afterTerminated(ExtensibleThreadPoolExecutor executor) {
// do nothing
}
}

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Callback during task submit in thread-pool.
*/
public interface TaskAwarePlugin extends ThreadPoolPlugin {
/**
* Callback during the {@link java.util.concurrent.RunnableFuture} task create in thread-pool.
*
* @param executor executor
* @param runnable original task
* @return Tasks that really need to be performed
* @see ThreadPoolExecutor#newTaskFor(Runnable, Object)
*/
default <V> Runnable beforeTaskCreate(ThreadPoolExecutor executor, Runnable runnable, V value) {
return runnable;
}
/**
* Callback during the {@link java.util.concurrent.RunnableFuture} task create in thread-pool.
*
* @param executor executor
* @param future original task
* @return Tasks that really need to be performed
* @see ThreadPoolExecutor#newTaskFor(Callable)
*/
default <V> Callable<V> beforeTaskCreate(ThreadPoolExecutor executor, Callable<V> future) {
return future;
}
/**
* Callback when task is execute.
*
* @param runnable runnable
* @return tasks to be execute
* @see ExtensibleThreadPoolExecutor#execute
*/
default Runnable beforeTaskExecute(Runnable runnable) {
return runnable;
}
}

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.manager.ThreadPoolPluginManager;
import cn.hippo4j.core.plugin.manager.ThreadPoolPluginSupport;
/**
* <p>A marker superinterface indicating that
* an instance class is eligible to be sense and intercept
* some operations of the specific thread-pool instance.
*
* <p>Generally, any thread-pool that implements the {@link ThreadPoolPluginSupport}
* can be register multiple plugins by {@link ThreadPoolPluginSupport#register},
* and the plugin will provide some extension function of original
* {@link java.util.concurrent.ThreadPoolExecutor} does not support.
*
* <p>During runtime, plugins can dynamically modify some configurable parameters
* and provide some runtime information by {@link #getPluginRuntime()}.
* When the thread-pool is destroyed, the plugin will also be destroyed.
*
* @see ExtensibleThreadPoolExecutor
* @see ThreadPoolPluginManager
* @see TaskAwarePlugin
* @see ExecuteAwarePlugin
* @see ShutdownAwarePlugin
* @see RejectedAwarePlugin
*/
public interface ThreadPoolPlugin {
/**
* Get id.
*
* @return id
*/
String getId();
/**
* Callback when plugin register into manager
*
* @see ThreadPoolPluginManager#register
*/
default void start() {
// do nothing
}
/**
* Callback when plugin unregister from manager
*
* @see ThreadPoolPluginManager#unregister
* @see ThreadPoolPluginManager#clear
*/
default void stop() {
// do nothing
}
/**
* Get plugin runtime info.
*
* @return plugin runtime info
*/
default PluginRuntime getPluginRuntime() {
return new PluginRuntime(getId());
}
}

@ -0,0 +1,285 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.core.plugin.*;
import lombok.NonNull;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* The default implementation of {@link ThreadPoolPluginManager}.
*/
public class DefaultThreadPoolPluginManager implements ThreadPoolPluginManager {
/**
* lock of this instance
*/
private final ReadWriteLock instanceLock = new ReentrantReadWriteLock();
/**
* Registered {@link ThreadPoolPlugin}.
*/
private final Map<String, ThreadPoolPlugin> registeredPlugins = new HashMap<>(16);
/**
* Registered {@link TaskAwarePlugin}.
*/
private final List<TaskAwarePlugin> taskAwarePluginList = new ArrayList<>();
/**
* Registered {@link ExecuteAwarePlugin}.
*/
private final List<ExecuteAwarePlugin> executeAwarePluginList = new ArrayList<>();
/**
* Registered {@link RejectedAwarePlugin}.
*/
private final List<RejectedAwarePlugin> rejectedAwarePluginList = new ArrayList<>();
/**
* Registered {@link ShutdownAwarePlugin}.
*/
private final List<ShutdownAwarePlugin> shutdownAwarePluginList = new ArrayList<>();
/**
* Clear all.
*/
@Override
public synchronized void clear() {
Lock writeLock = instanceLock.writeLock();
writeLock.lock();
try {
Collection<ThreadPoolPlugin> plugins = registeredPlugins.values();
registeredPlugins.clear();
taskAwarePluginList.clear();
executeAwarePluginList.clear();
rejectedAwarePluginList.clear();
shutdownAwarePluginList.clear();
plugins.forEach(ThreadPoolPlugin::stop);
} finally {
writeLock.unlock();
}
}
/**
* Register a {@link ThreadPoolPlugin}
*
* @param plugin plugin
* @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()} already exists in the registry
* @see ThreadPoolPlugin#getId()
*/
@Override
public void register(@NonNull ThreadPoolPlugin plugin) {
Lock writeLock = instanceLock.writeLock();
writeLock.lock();
try {
String id = plugin.getId();
Assert.isTrue(!isRegistered(id), "The plug-in with id [" + id + "] has been registered");
// register plugin
registeredPlugins.put(id, plugin);
// quick index
if (plugin instanceof TaskAwarePlugin) {
taskAwarePluginList.add((TaskAwarePlugin) plugin);
}
if (plugin instanceof ExecuteAwarePlugin) {
executeAwarePluginList.add((ExecuteAwarePlugin) plugin);
}
if (plugin instanceof RejectedAwarePlugin) {
rejectedAwarePluginList.add((RejectedAwarePlugin) plugin);
}
if (plugin instanceof ShutdownAwarePlugin) {
shutdownAwarePluginList.add((ShutdownAwarePlugin) plugin);
}
plugin.start();
} finally {
writeLock.unlock();
}
}
/**
* Register plugin if it's not registered.
*
* @param plugin plugin
* @return return true if successful register new plugin, false otherwise
*/
@Override
public boolean tryRegister(ThreadPoolPlugin plugin) {
Lock writeLock = instanceLock.writeLock();
writeLock.lock();
try {
if (registeredPlugins.containsKey(plugin.getId())) {
return false;
}
register(plugin);
return true;
} finally {
writeLock.unlock();
}
}
/**
* Unregister {@link ThreadPoolPlugin}
*
* @param pluginId plugin id
*/
@Override
public void unregister(String pluginId) {
Lock writeLock = instanceLock.writeLock();
writeLock.lock();
try {
Optional.ofNullable(pluginId)
.map(registeredPlugins::remove)
.ifPresent(plugin -> {
// remove quick index if necessary
if (plugin instanceof TaskAwarePlugin) {
taskAwarePluginList.remove(plugin);
}
if (plugin instanceof ExecuteAwarePlugin) {
executeAwarePluginList.remove(plugin);
}
if (plugin instanceof RejectedAwarePlugin) {
rejectedAwarePluginList.remove(plugin);
}
if (plugin instanceof ShutdownAwarePlugin) {
shutdownAwarePluginList.remove(plugin);
}
plugin.stop();
});
} finally {
writeLock.unlock();
}
}
@Override
public Collection<ThreadPoolPlugin> getAllPlugins() {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return registeredPlugins.values();
} finally {
readLock.unlock();
}
}
/**
* Whether the {@link ThreadPoolPlugin} has been registered.
*
* @param pluginId plugin id
* @return ture if target has been registered, false otherwise
*/
@Override
public boolean isRegistered(String pluginId) {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return registeredPlugins.containsKey(pluginId);
} finally {
readLock.unlock();
}
}
/**
* Get {@link ThreadPoolPlugin}
*
* @param pluginId plugin id
* @param <A> plugin type
* @return {@link ThreadPoolPlugin}, null if unregister
*/
@Override
@SuppressWarnings("unchecked")
public <A extends ThreadPoolPlugin> Optional<A> getPlugin(String pluginId) {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return (Optional<A>) Optional.ofNullable(registeredPlugins.get(pluginId));
} finally {
readLock.unlock();
}
}
/**
* Get execute plugin list.
*
* @return {@link ExecuteAwarePlugin}
*/
@Override
public Collection<ExecuteAwarePlugin> getExecuteAwarePluginList() {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return executeAwarePluginList;
} finally {
readLock.unlock();
}
}
/**
* Get rejected plugin list.
*
* @return {@link RejectedAwarePlugin}
*/
@Override
public Collection<RejectedAwarePlugin> getRejectedAwarePluginList() {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return rejectedAwarePluginList;
} finally {
readLock.unlock();
}
}
/**
* Get shutdown plugin list.
*
* @return {@link ShutdownAwarePlugin}
*/
@Override
public Collection<ShutdownAwarePlugin> getShutdownAwarePluginList() {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return shutdownAwarePluginList;
} finally {
readLock.unlock();
}
}
/**
* Get shutdown plugin list.
*
* @return {@link ShutdownAwarePlugin}
*/
@Override
public Collection<TaskAwarePlugin> getTaskAwarePluginList() {
Lock readLock = instanceLock.readLock();
readLock.lock();
try {
return taskAwarePluginList;
} finally {
readLock.unlock();
}
}
}

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.*;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
/**
* Empty thread pool plugin manager.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class EmptyThreadPoolPluginManager implements ThreadPoolPluginManager {
/**
* default instance
*/
public static final EmptyThreadPoolPluginManager INSTANCE = new EmptyThreadPoolPluginManager();
/**
* Clear all.
*/
@Override
public void clear() {
// do nothing
}
/**
* Get all registered plugins.
*
* @return plugins
*/
@Override
public Collection<ThreadPoolPlugin> getAllPlugins() {
return Collections.emptyList();
}
/**
* Register a {@link ThreadPoolPlugin}
*
* @param plugin plugin
* @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()}
* already exists in the registry
* @see ThreadPoolPlugin#getId()
*/
@Override
public void register(ThreadPoolPlugin plugin) {
// do nothing
}
/**
* Register plugin if it's not registered.
*
* @param plugin plugin
* @return return true if successful register new plugin, false otherwise
*/
@Override
public boolean tryRegister(ThreadPoolPlugin plugin) {
return false;
}
/**
* Whether the {@link ThreadPoolPlugin} has been registered.
*
* @param pluginId plugin id
* @return ture if target has been registered, false otherwise
*/
@Override
public boolean isRegistered(String pluginId) {
return false;
}
/**
* Unregister {@link ThreadPoolPlugin}
*
* @param pluginId plugin id
*/
@Override
public void unregister(String pluginId) {
// do nothing
}
/**
* Get {@link ThreadPoolPlugin}
*
* @param pluginId plugin id
* @return {@link ThreadPoolPlugin}
* @throws ClassCastException thrown when the object obtained by name cannot be converted to target type
*/
@Override
public <A extends ThreadPoolPlugin> Optional<A> getPlugin(String pluginId) {
return Optional.empty();
}
/**
* Get execute aware plugin list.
*
* @return {@link ExecuteAwarePlugin}
*/
@Override
public Collection<ExecuteAwarePlugin> getExecuteAwarePluginList() {
return Collections.emptyList();
}
/**
* Get rejected aware plugin list.
*
* @return {@link RejectedAwarePlugin}
*/
@Override
public Collection<RejectedAwarePlugin> getRejectedAwarePluginList() {
return Collections.emptyList();
}
/**
* Get shutdown aware plugin list.
*
* @return {@link ShutdownAwarePlugin}
*/
@Override
public Collection<ShutdownAwarePlugin> getShutdownAwarePluginList() {
return Collections.emptyList();
}
/**
* Get shutdown aware plugin list.
*
* @return {@link ShutdownAwarePlugin}
*/
@Override
public Collection<TaskAwarePlugin> getTaskAwarePluginList() {
return Collections.emptyList();
}
}

@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.*;
import java.util.Collection;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* Manager of {@link ThreadPoolPlugin}.
* Bind with the specified thread-pool instance to register and manage plugins.
* when the thread pool is destroyed, please ensure that the manager will also be destroyed.
*
* @see DefaultThreadPoolPluginManager
*/
public interface ThreadPoolPluginManager {
/**
* Get an empty manager.
*
* @return {@link EmptyThreadPoolPluginManager}
*/
static ThreadPoolPluginManager empty() {
return EmptyThreadPoolPluginManager.INSTANCE;
}
/**
* Clear all.
*/
void clear();
/**
* Get all registered plugins.
*
* @return plugins
*/
Collection<ThreadPoolPlugin> getAllPlugins();
/**
* Register a {@link ThreadPoolPlugin}
*
* @param plugin plugin
* @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()}
* already exists in the registry
* @see ThreadPoolPlugin#getId()
*/
void register(ThreadPoolPlugin plugin);
/**
* Register plugin if it's not registered.
*
* @param plugin plugin
* @return return true if successful register new plugin, false otherwise
*/
boolean tryRegister(ThreadPoolPlugin plugin);
/**
* Whether the {@link ThreadPoolPlugin} has been registered.
*
* @param pluginId plugin id
* @return ture if target has been registered, false otherwise
*/
boolean isRegistered(String pluginId);
/**
* Unregister {@link ThreadPoolPlugin}
*
* @param pluginId plugin id
*/
void unregister(String pluginId);
/**
* Get {@link ThreadPoolPlugin}
*
* @param pluginId plugin id
* @param <A> target aware type
* @return {@link ThreadPoolPlugin}
* @throws ClassCastException thrown when the object obtained by name cannot be converted to target type
*/
<A extends ThreadPoolPlugin> Optional<A> getPlugin(String pluginId);
/**
* Get execute aware plugin list.
*
* @return {@link ExecuteAwarePlugin}
*/
Collection<ExecuteAwarePlugin> getExecuteAwarePluginList();
/**
* Get rejected aware plugin list.
*
* @return {@link RejectedAwarePlugin}
*/
Collection<RejectedAwarePlugin> getRejectedAwarePluginList();
/**
* Get shutdown aware plugin list.
*
* @return {@link ShutdownAwarePlugin}
*/
Collection<ShutdownAwarePlugin> getShutdownAwarePluginList();
/**
* Get shutdown aware plugin list.
*
* @return {@link ShutdownAwarePlugin}
*/
Collection<TaskAwarePlugin> getTaskAwarePluginList();
// ==================== default methods ====================
/**
* Get plugin of type.
*
* @param pluginId plugin id
* @param pluginType plugin type
* @return target plugin
*/
default <A extends ThreadPoolPlugin> Optional<A> getPluginOfType(String pluginId, Class<A> pluginType) {
return getPlugin(pluginId)
.filter(pluginType::isInstance)
.map(pluginType::cast);
}
/**
* Get all plugins of type.
*
* @param pluginType plugin type
* @return all plugins of type
*/
default <A extends ThreadPoolPlugin> Collection<A> getAllPluginsOfType(Class<A> pluginType) {
return getAllPlugins().stream()
.filter(pluginType::isInstance)
.map(pluginType::cast)
.collect(Collectors.toList());
}
/**
* Get {@link PluginRuntime} of all registered plugins.
*
* @return {@link PluginRuntime} of all registered plugins
*/
default Collection<PluginRuntime> getAllPluginRuntimes() {
return getAllPlugins().stream()
.map(ThreadPoolPlugin::getPluginRuntime)
.collect(Collectors.toList());
}
/**
* Get {@link PluginRuntime} of registered plugin.
*
* @return {@link PluginRuntime} of registered plugin
*/
default Optional<PluginRuntime> getRuntime(String pluginId) {
return getPlugin(pluginId)
.map(ThreadPoolPlugin::getPluginRuntime);
}
}

@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.*;
import org.checkerframework.checker.nullness.qual.NonNull;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Used to support the binding of {@link ThreadPoolPluginManager} and {@link ThreadPoolExecutor}.
*/
public interface ThreadPoolPluginSupport extends ThreadPoolPluginManager {
/**
* Get thread pool action aware registry.
*
* @return {@link ThreadPoolPluginManager}
*/
@NonNull
ThreadPoolPluginManager getThreadPoolPluginManager();
/**
* Get thread-pool id
*
* @return thread-pool id
*/
String getThreadPoolId();
/**
* Get thread-pool executor.
*
* @return thread-pool executor
*/
ThreadPoolExecutor getThreadPoolExecutor();
// ======================== delegate methods ========================
/**
* Clear all.
*/
@Override
default void clear() {
getThreadPoolPluginManager().clear();
}
/**
* Register a {@link ThreadPoolPlugin}
*
* @param plugin aware
*/
@Override
default void register(ThreadPoolPlugin plugin) {
getThreadPoolPluginManager().register(plugin);
}
/**
* Register plugin if it's not registered.
*
* @param plugin plugin
* @return return true if successful register new plugin, false otherwise
*/
@Override
default boolean tryRegister(ThreadPoolPlugin plugin) {
return getThreadPoolPluginManager().tryRegister(plugin);
}
/**
* Whether the {@link ThreadPoolPlugin} has been registered.
*
* @param pluginId name
* @return ture if target has been registered, false otherwise
*/
@Override
default boolean isRegistered(String pluginId) {
return getThreadPoolPluginManager().isRegistered(pluginId);
}
/**
* Unregister {@link ThreadPoolPlugin}
*
* @param pluginId name
*/
@Override
default void unregister(String pluginId) {
getThreadPoolPluginManager().unregister(pluginId);
}
/**
* Get all registered plugins.
*
* @return plugins
*/
@Override
default Collection<ThreadPoolPlugin> getAllPlugins() {
return getThreadPoolPluginManager().getAllPlugins();
}
/**
* Get {@link ThreadPoolPlugin}
*
* @param pluginId target name
* @return {@link ThreadPoolPlugin}, null if unregister
* @throws ClassCastException thrown when the object obtained by name cannot be converted to target type
*/
@Override
default <A extends ThreadPoolPlugin> Optional<A> getPlugin(String pluginId) {
return getThreadPoolPluginManager().getPlugin(pluginId);
}
/**
* Get execute aware list.
*
* @return {@link ExecuteAwarePlugin}
*/
@Override
default Collection<ExecuteAwarePlugin> getExecuteAwarePluginList() {
return getThreadPoolPluginManager().getExecuteAwarePluginList();
}
/**
* Get rejected aware list.
*
* @return {@link RejectedAwarePlugin}
*/
@Override
default Collection<RejectedAwarePlugin> getRejectedAwarePluginList() {
return getThreadPoolPluginManager().getRejectedAwarePluginList();
}
/**
* Get shutdown aware list.
*
* @return {@link ShutdownAwarePlugin}
*/
@Override
default Collection<ShutdownAwarePlugin> getShutdownAwarePluginList() {
return getThreadPoolPluginManager().getShutdownAwarePluginList();
}
/**
* Get shutdown aware list.
*
* @return {@link ShutdownAwarePlugin}
*/
@Override
default Collection<TaskAwarePlugin> getTaskAwarePluginList() {
return getThreadPoolPluginManager().getTaskAwarePluginList();
}
}
Loading…
Cancel
Save