Merge remote-tracking branch 'upstream/develop' into develop

pull/887/head
weihu 3 years ago
commit 41126d74da

@ -17,9 +17,12 @@
coverage:
status:
patch:
project:
default:
threshold: 0.1%
target: auto
# adjust accordingly based on how flaky your tests are
# this allows a 10% drop from the previous base commit coverage
threshold: 10%
ignore:
- "hippo4j-example/.*"
- "docs/.*"

@ -12,7 +12,7 @@ sidebar_position: 3
## 谁在使用 Hippo4J
共计 20+ 家公司生产接入 Hippo4J。按照公司登记时间排序。
共计 21+ 家公司生产接入 Hippo4J。按照公司登记时间排序。
- [身边云](https://serviceshare.com)
- [Medbanks](https://www.medbanks.cn)
@ -34,3 +34,4 @@ sidebar_position: 3
- [新东方教育科技集团](https://www.xdf.cn/)
- [远眺网络科技有限公司](https://www.yuantiaokj.com/)
- [浙江吉利控股集团有限公司](https://www.geely.com/)
- [三立人(深圳)科技有限公司-焦内](https://www.bananain.com/)

@ -41,7 +41,6 @@ WECHART填写user_id会以@的消息发给用户,填写姓名则是普通
LARK填写ou_开头用户唯一标识会以@的消息发给用户,填写手机号则是普通的@
```
## 钉钉平台
[钉钉创建群机器人](https://www.dingtalk.com/qidian/help-detail-20781541.html)
@ -54,6 +53,10 @@ LARK填写ou_开头用户唯一标识会以@的消息发给用户,填写手
![](https://images-machen.oss-cn-beijing.aliyuncs.com/image-20220530200133377.png?x-oss-process=image/resize,h_500,w_800)
:::tip
如果使用 1.4.3 及以上版本,`警报` 替换为 `告警`
:::
## 企业微信
[企业微信创建群机器人](https://open.work.weixin.qq.com/help2/pc/14931?person_id=1&from=homesearch)

@ -12,7 +12,7 @@ sidebar_position: 3
## 谁在使用 Hippo4J
共计 20+ 家公司生产接入 Hippo4J。按照公司登记时间排序。
共计 21+ 家公司生产接入 Hippo4J。按照公司登记时间排序。
- [身边云](https://serviceshare.com)
- [Medbanks](https://www.medbanks.cn)
@ -34,3 +34,4 @@ sidebar_position: 3
- [新东方教育科技集团](https://www.xdf.cn/)
- [远眺网络科技有限公司](https://www.yuantiaokj.com/)
- [浙江吉利控股集团有限公司](https://www.geely.com/)
- [三立人(深圳)科技有限公司-焦内](https://www.bananain.com/)

@ -54,6 +54,10 @@ LARK填写ou_开头用户唯一标识会以@的消息发给用户,填写手
![](https://images-machen.oss-cn-beijing.aliyuncs.com/image-20220530200133377.png?x-oss-process=image/resize,h_500,w_800)
:::tip
如果使用 1.4.3 及以上版本,`警报` 替换为 `告警`
:::
## 企业微信
[企业微信创建群机器人](https://open.work.weixin.qq.com/help2/pc/14931?person_id=1&from=homesearch)

@ -12,7 +12,7 @@ sidebar_position: 3
## 谁在使用 Hippo4J
共计 20+ 家公司生产接入 Hippo4J。按照公司登记时间排序。
共计 21+ 家公司生产接入 Hippo4J。按照公司登记时间排序。
- [身边云](https://serviceshare.com)
- [Medbanks](https://www.medbanks.cn)
@ -34,3 +34,4 @@ sidebar_position: 3
- [新东方教育科技集团](https://www.xdf.cn/)
- [远眺网络科技有限公司](https://www.yuantiaokj.com/)
- [浙江吉利控股集团有限公司](https://www.geely.com/)
- [三立人(深圳)科技有限公司-焦内](https://www.bananain.com/)

@ -12,7 +12,7 @@ sidebar_position: 3
## 谁在使用 Hippo4J
共计 20+ 家公司生产接入 Hippo4J。按照公司登记时间排序。
共计 21+ 家公司生产接入 Hippo4J。按照公司登记时间排序。
- [身边云](https://serviceshare.com)
- [Medbanks](https://www.medbanks.cn)
@ -34,3 +34,4 @@ sidebar_position: 3
- [新东方教育科技集团](https://www.xdf.cn/)
- [远眺网络科技有限公司](https://www.yuantiaokj.com/)
- [浙江吉利控股集团有限公司](https://www.geely.com/)
- [三立人(深圳)科技有限公司-焦内](https://www.bananain.com/)

@ -10,6 +10,11 @@
<artifactId>hippo4j-core</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>

@ -25,7 +25,9 @@ import lombok.NoArgsConstructor;
import org.springframework.beans.factory.DisposableBean;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
/**
@ -37,10 +39,27 @@ import java.util.concurrent.ThreadPoolExecutor;
@AllArgsConstructor
public class DynamicThreadPoolWrapper implements DisposableBean {
/**
* Determine the unique identifier of the thread pool in the project.
*
* @param tenantId tenant id
* @param itemId project id in the team
* @param threadPoolId Thread pool identifier under the project
*/
private String tenantId, itemId, threadPoolId;
/**
* Whether the thread pool has completed initialization,
* and whether to subscribe to server-side configuration change events.
*
* @param subscribeFlag subscription server configuration id
* @param initFlag initial configuration complete flag
*/
private boolean subscribeFlag, initFlag;
/**
* Thread pool executor.
*/
private ThreadPoolExecutor executor;
public DynamicThreadPoolWrapper(String threadPoolId) {
@ -53,14 +72,62 @@ public class DynamicThreadPoolWrapper implements DisposableBean {
this.subscribeFlag = true;
}
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
* <p>
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
executor.execute(command);
}
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return {@code null} upon <em>successful</em> completion.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
public Future<?> submit(Runnable task) {
return executor.submit(task);
}
/**
* Submits a value-returning task for execution and returns a
* Future representing the pending results of the task. The
* Future's {@code get} method will return the task's result upon
* successful completion.
*
* <p>
* If you would like to immediately block waiting
* for a task, you can use constructions of the form
* {@code result = exec.submit(aCallable).get();}
*
* <p>Note: The {@link Executors} class includes a set of methods
* that can convert some other common closure-like objects,
* for example, {@link java.security.PrivilegedAction} to
* {@link Callable} form so they can be submitted.
*
* @param task the task to submit
* @param <T> the type of the task's result
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
public <T> Future<T> submit(Callable<T> task) {
return executor.submit(task);
}

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.toolkit.SystemClock;
import java.util.Optional;
/**
* <p>An abstract task execution time recording plugin
* for thread-safe statistics the execution time of tasks.
*
* <p>Must override {@link #processTaskTime} to define the processing logic for task execution time. <br />
* Default time precision is milliseconds, may override {@link #currentTime} to redefine the time precision.
*
* @see TaskTimeRecordPlugin
* @see TaskTimeoutNotifyAlarmPlugin
*/
public abstract class AbstractTaskTimerPlugin implements ExecuteAwarePlugin {
/**
* start times of executed tasks
*/
private final ThreadLocal<Long> startTimes = new ThreadLocal<>();
/**
* Record the time when the worker thread starts executing the task.
*
* @param thread thread of executing task
* @param runnable task
* @see ExtensibleThreadPoolExecutor#beforeExecute
*/
@Override
public final void beforeExecute(Thread thread, Runnable runnable) {
startTimes.set(currentTime());
}
/**
* Record the total time for the worker thread to complete the task, and update the time record.
*
* @param runnable runnable
* @param throwable exception thrown during execution
*/
@Override
public final void afterExecute(Runnable runnable, Throwable throwable) {
try {
Optional.ofNullable(startTimes.get())
.map(startTime -> currentTime() - startTime)
.ifPresent(this::processTaskTime);
} finally {
startTimes.remove();
}
}
/**
* Get the current time.
*
* @return current time
*/
protected long currentTime() {
return SystemClock.now();
}
/**
* Processing the execution time of the task.
*
* @param taskExecuteTime execute time of task
*/
protected abstract void processTaskTime(long taskExecuteTime);
}

@ -17,25 +17,19 @@
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.plugin.PluginRuntime;
import cn.hippo4j.core.toolkit.SystemClock;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Record task execution time indicator.
*
* @see TaskTimeoutNotifyAlarmPlugin
*/
@RequiredArgsConstructor
public class TaskTimeRecordPlugin implements ExecuteAwarePlugin {
public class TaskTimeRecordPlugin extends AbstractTaskTimerPlugin {
public static final String PLUGIN_NAME = "task-time-record-plugin";
@ -74,23 +68,6 @@ public class TaskTimeRecordPlugin implements ExecuteAwarePlugin {
return PLUGIN_NAME;
}
/**
* start times of executed tasks
*/
private final ThreadLocal<Long> startTimes = new ThreadLocal<>();
/**
* Record the time when the worker thread starts executing the task.
*
* @param thread thread of executing task
* @param runnable task
* @see ExtensibleThreadPoolExecutor#beforeExecute
*/
@Override
public void beforeExecute(Thread thread, Runnable runnable) {
startTimes.set(SystemClock.now());
}
/**
* Get plugin runtime info.
*
@ -107,44 +84,25 @@ public class TaskTimeRecordPlugin implements ExecuteAwarePlugin {
.addInfo("avgTaskTime", summary.getAvgTaskTimeMillis() + "ms");
}
/**
* Record the total time for the worker thread to complete the task, and update the time record.
*
* @param runnable runnable
* @param throwable exception thrown during execution
*/
@Override
public void afterExecute(Runnable runnable, Throwable throwable) {
try {
Long startTime = startTimes.get();
if (Objects.isNull(startTime)) {
return;
}
long executeTime = SystemClock.now() - startTime;
recordTaskTime(executeTime);
} finally {
startTimes.remove();
}
}
/**
* Refresh time indicators of the current instance.
*
* @param taskExecutionTime millisecond
* @param taskExecuteTime execute time of task
*/
protected void recordTaskTime(long taskExecutionTime) {
@Override
protected void processTaskTime(long taskExecuteTime) {
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
if (taskCount == 0) {
maxTaskTimeMillis = taskExecutionTime;
minTaskTimeMillis = taskExecutionTime;
maxTaskTimeMillis = taskExecuteTime;
minTaskTimeMillis = taskExecuteTime;
} else {
maxTaskTimeMillis = Math.max(taskExecutionTime, maxTaskTimeMillis);
minTaskTimeMillis = Math.min(taskExecutionTime, minTaskTimeMillis);
maxTaskTimeMillis = Math.max(taskExecuteTime, maxTaskTimeMillis);
minTaskTimeMillis = Math.min(taskExecuteTime, minTaskTimeMillis);
}
taskCount = taskCount + 1;
totalTaskTimeMillis += taskExecutionTime;
totalTaskTimeMillis += taskExecuteTime;
} finally {
writeLock.unlock();
}

@ -27,11 +27,10 @@ import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Record task execution time indicator,
* and send alarm notification when the execution time exceeds the threshold.
* Send alarm notification when the execution time exceeds the threshold.
*/
@AllArgsConstructor
public class TaskTimeoutNotifyAlarmPlugin extends TaskTimeRecordPlugin {
public class TaskTimeoutNotifyAlarmPlugin extends AbstractTaskTimerPlugin {
public static final String PLUGIN_NAME = "task-timeout-notify-alarm-plugin";
@ -40,6 +39,15 @@ public class TaskTimeoutNotifyAlarmPlugin extends TaskTimeRecordPlugin {
*/
private final String threadPoolId;
@Getter
@Setter
private Long executeTimeOut;
/**
* thread-pool
*/
private final ThreadPoolExecutor threadPoolExecutor;
/**
* Get id.
*
@ -50,30 +58,21 @@ public class TaskTimeoutNotifyAlarmPlugin extends TaskTimeRecordPlugin {
return PLUGIN_NAME;
}
@Getter
@Setter
private Long executeTimeOut;
/**
* thread-pool
*/
private final ThreadPoolExecutor threadPoolExecutor;
/**
* Check whether the task execution time exceeds {@link #executeTimeOut},
* if it exceeds this time, send an alarm notification.
*
* @param executeTime executeTime in nanosecond
* @param taskExecuteTime execute time of task
*/
@Override
protected void recordTaskTime(long executeTime) {
super.recordTaskTime(executeTime);
if (executeTime <= executeTimeOut) {
protected void processTaskTime(long taskExecuteTime) {
if (taskExecuteTime <= executeTimeOut) {
return;
}
Optional.ofNullable(ApplicationContextHolder.getInstance())
.map(context -> context.getBean(ThreadPoolNotifyAlarmHandler.class))
.ifPresent(handler -> handler.asyncSendExecuteTimeOutAlarm(
threadPoolId, executeTime, executeTimeOut, threadPoolExecutor));
threadPoolId, taskExecuteTime, executeTimeOut, threadPoolExecutor));
}
}

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.executor;
import cn.hippo4j.common.toolkit.ThreadUtil;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.core.task.TaskDecorator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* test for {@link DynamicThreadPoolExecutor}
*/
public class DynamicThreadPoolExecutorTest {
@Test
public void testRedundancyHandler() {
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
1, 1, 1000L, TimeUnit.MILLISECONDS,
1000L, true, 1000L,
new ArrayBlockingQueue<>(1), "test", Thread::new, handler);
Assert.assertEquals(handler, executor.getRedundancyHandler());
handler = new ThreadPoolExecutor.AbortPolicy();
executor.setRedundancyHandler(handler);
Assert.assertEquals(handler, executor.getRedundancyHandler());
}
@Test
public void testTaskDecorator() {
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
1, 1, 1000L, TimeUnit.MILLISECONDS,
1000L, true, 1000L,
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
Assert.assertNull(executor.getTaskDecorator());
TaskDecorator decorator = runnable -> runnable;
executor.setTaskDecorator(decorator);
Assert.assertEquals(decorator, executor.getTaskDecorator());
decorator = runnable -> runnable;
executor.setTaskDecorator(decorator);
Assert.assertEquals(decorator, executor.getTaskDecorator());
}
@Test
public void testExecuteTimeOut() {
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
1, 1, 1000L, TimeUnit.MILLISECONDS,
1000L, true, 1000L,
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
Assert.assertEquals(1000L, executor.getExecuteTimeOut().longValue());
executor.setExecuteTimeOut(500L);
Assert.assertEquals(500L, executor.getExecuteTimeOut().longValue());
}
@Test
public void testDestroyWhenWaitForTask() {
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
1, 1, 1000L, TimeUnit.MILLISECONDS,
1000L, true, 1000L,
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
AtomicInteger count = new AtomicInteger(0);
executor.execute(() -> {
ThreadUtil.sleep(500L);
count.incrementAndGet();
});
executor.execute(() -> {
ThreadUtil.sleep(500L);
count.incrementAndGet();
});
executor.destroy();
// waitting for terminated
while (!executor.isTerminated()) {
} ;
Assert.assertEquals(2, count.get());
}
@Test
public void testDestroyWhenNotWaitForTask() {
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
1, 1, 1000L, TimeUnit.MILLISECONDS,
1000L, false, 1000L,
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
AtomicInteger count = new AtomicInteger(0);
executor.execute(() -> {
ThreadUtil.sleep(500L);
count.incrementAndGet();
});
executor.execute(() -> {
ThreadUtil.sleep(500L);
count.incrementAndGet();
});
executor.destroy();
// waitting for terminated
while (!executor.isTerminated()) {
} ;
Assert.assertEquals(1, count.get());
}
@Test
public void testRejectCount() {
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
1, 1, 1000L, TimeUnit.MILLISECONDS,
1000L, true, 1000L,
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
Assert.assertEquals(0L, executor.getRejectCountNum().longValue());
Assert.assertEquals(0L, executor.getRejectCount().get());
executor.submit(() -> ThreadUtil.sleep(100L));
executor.submit(() -> ThreadUtil.sleep(100L));
executor.submit(() -> ThreadUtil.sleep(100L));
ThreadUtil.sleep(200L);
Assert.assertEquals(1L, executor.getRejectCountNum().longValue());
Assert.assertEquals(1L, executor.getRejectCount().get());
}
@Test
public void testSupportParam() {
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
1, 1, 1000L, TimeUnit.MILLISECONDS,
1000L, true, 1000L,
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
Assert.assertEquals(1000L, executor.getAwaitTerminationMillis());
Assert.assertTrue(executor.isWaitForTasksToCompleteOnShutdown());
executor.setSupportParam(500L, false);
Assert.assertEquals(500L, executor.getAwaitTerminationMillis());
Assert.assertFalse(executor.isWaitForTasksToCompleteOnShutdown());
}
}

@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.executor;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.plugin.RejectedAwarePlugin;
import cn.hippo4j.core.plugin.ShutdownAwarePlugin;
import cn.hippo4j.core.plugin.TaskAwarePlugin;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
import cn.hippo4j.core.plugin.manager.ThreadPoolPluginManager;
import lombok.Getter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* test for {@link ExtensibleThreadPoolExecutor}
*/
public class ExtensibleThreadPoolExecutorTest {
private final RejectedExecutionHandler originalHandler = new ThreadPoolExecutor.DiscardPolicy();
private ExtensibleThreadPoolExecutor executor;
private ThreadPoolPluginManager manager;
@Before
public void initExecutor() {
manager = new DefaultThreadPoolPluginManager();
executor = new ExtensibleThreadPoolExecutor(
"test", manager,
5, 5, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, originalHandler);
}
@Test
public void testGetThreadPoolId() {
Assert.assertEquals("test", executor.getThreadPoolId());
}
@Test
public void testGetThreadPoolExecutor() {
Assert.assertSame(executor, executor.getThreadPoolExecutor());
}
@Test
public void testGetThreadPoolPluginManager() {
Assert.assertSame(manager, executor.getThreadPoolPluginManager());
}
@Test
public void testGetOrSetRejectedHandler() {
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
executor.setRejectedExecutionHandler(handler);
Assert.assertSame(handler, executor.getRejectedExecutionHandler());
}
@Test
public void testInvokeTaskAwarePlugin() {
TestTaskAwarePlugin plugin = new TestTaskAwarePlugin();
executor.register(plugin);
executor.submit(() -> {
});
executor.submit(() -> true);
executor.submit(() -> {
}, false);
executor.execute(() -> {
});
Assert.assertEquals(7, plugin.getInvokeCount().get());
}
@Test
public void testInvokeExecuteAwarePlugin() {
TestExecuteAwarePlugin plugin = new TestExecuteAwarePlugin();
executor.register(plugin);
executor.execute(() -> {
});
ThreadUtil.sleep(500L);
Assert.assertEquals(2, plugin.getInvokeCount().get());
}
@Test
public void testInvokeRejectedAwarePlugin() {
executor.setCorePoolSize(1);
executor.setMaximumPoolSize(1);
TestRejectedAwarePlugin plugin = new TestRejectedAwarePlugin();
executor.register(plugin);
// blocking pool and queue
executor.submit(() -> ThreadUtil.sleep(500L));
executor.submit(() -> ThreadUtil.sleep(500L));
// reject 3 tasks
executor.submit(() -> {
});
executor.submit(() -> {
});
executor.submit(() -> {
});
ThreadUtil.sleep(500L);
Assert.assertEquals(3, plugin.getInvokeCount().get());
}
@Test
public void testInvokeTestShutdownAwarePluginWhenShutdown() throws InterruptedException {
TestShutdownAwarePlugin plugin = new TestShutdownAwarePlugin();
executor.register(plugin);
executor.shutdown();
executor.submit(() -> {
throw new IllegalArgumentException("???");
});
if (executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
Assert.assertEquals(3, plugin.getInvokeCount().get());
}
}
@Test
public void testInvokeTestShutdownAwarePluginWhenShutdownNow() throws InterruptedException {
TestShutdownAwarePlugin plugin = new TestShutdownAwarePlugin();
executor.register(plugin);
executor.shutdownNow();
if (executor.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
Assert.assertEquals(3, plugin.getInvokeCount().get());
}
}
@Getter
private final static class TestTaskAwarePlugin implements TaskAwarePlugin {
private final AtomicInteger invokeCount = new AtomicInteger(0);
private final String id = "TestTaskAwarePlugin";
@Override
public <V> Runnable beforeTaskCreate(ThreadPoolExecutor executor, Runnable runnable, V value) {
invokeCount.incrementAndGet();
return TaskAwarePlugin.super.beforeTaskCreate(executor, runnable, value);
}
@Override
public <V> Callable<V> beforeTaskCreate(ThreadPoolExecutor executor, Callable<V> future) {
invokeCount.incrementAndGet();
return TaskAwarePlugin.super.beforeTaskCreate(executor, future);
}
@Override
public Runnable beforeTaskExecute(Runnable runnable) {
invokeCount.incrementAndGet();
return TaskAwarePlugin.super.beforeTaskExecute(runnable);
}
}
@Getter
private final static class TestExecuteAwarePlugin implements ExecuteAwarePlugin {
private final AtomicInteger invokeCount = new AtomicInteger(0);
private final String id = "TestExecuteAwarePlugin";
@Override
public void beforeExecute(Thread thread, Runnable runnable) {
invokeCount.incrementAndGet();
ExecuteAwarePlugin.super.beforeExecute(thread, runnable);
}
@Override
public void afterExecute(Runnable runnable, Throwable throwable) {
invokeCount.incrementAndGet();
ExecuteAwarePlugin.super.afterExecute(runnable, throwable);
}
}
@Getter
private final static class TestRejectedAwarePlugin implements RejectedAwarePlugin {
private final AtomicInteger invokeCount = new AtomicInteger(0);
private final String id = "TestRejectedAwarePlugin";
@Override
public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
invokeCount.incrementAndGet();
}
}
@Getter
private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin {
private final AtomicInteger invokeCount = new AtomicInteger(0);
private final String id = "TestShutdownAwarePlugin";
@Override
public void beforeShutdown(ThreadPoolExecutor executor) {
invokeCount.incrementAndGet();
ShutdownAwarePlugin.super.beforeShutdown(executor);
}
@Override
public void afterShutdown(ThreadPoolExecutor executor, List<Runnable> remainingTasks) {
invokeCount.incrementAndGet();
ShutdownAwarePlugin.super.afterShutdown(executor, remainingTasks);
}
@Override
public void afterTerminated(ExtensibleThreadPoolExecutor executor) {
invokeCount.incrementAndGet();
ShutdownAwarePlugin.super.afterTerminated(executor);
}
}
}

@ -34,5 +34,20 @@
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.23</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.23</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.example.core;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.impl.TaskTimeRecordPlugin;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
import lombok.SneakyThrows;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.results.format.ResultFormatType;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* benchmark test for {@link cn.hippo4j.core.plugin.impl.TaskTimeRecordPlugin}
*/
@BenchmarkMode(Mode.All)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
@Warmup(iterations = 1)
@Measurement(iterations = 3)
@Fork(1)
@Threads(6)
public class TaskTimeRecordPluginBenchmarkTest {
@SneakyThrows
@Benchmark
public void origin_200(Blackhole blackhole) {
int threadCount = 200;
ThreadPoolExecutor executor = new ThreadPoolExecutor(
threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
executor.prestartAllCoreThreads();
List<Runnable> tasks = getTask(threadCount, blackhole);
tasks.forEach(executor::execute);
executor.shutdown();
while (!executor.isTerminated()) {
}
}
@SneakyThrows
@Benchmark
public void origin_50(Blackhole blackhole) {
int threadCount = 50;
ThreadPoolExecutor executor = new ThreadPoolExecutor(
threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
executor.prestartAllCoreThreads();
List<Runnable> tasks = getTask(threadCount, blackhole);
tasks.forEach(executor::execute);
executor.shutdown();
while (!executor.isTerminated()) {
}
}
@SneakyThrows
@Benchmark
public void not_plugin_50(Blackhole blackhole) {
int threadCount = 50;
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginManager(),
threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
executor.prestartAllCoreThreads();
List<Runnable> tasks = getTask(threadCount, blackhole);
tasks.forEach(executor::execute);
executor.shutdown();
while (!executor.isTerminated()) {
}
}
@SneakyThrows
@Benchmark
public void not_plugin_200(Blackhole blackhole) {
int threadCount = 200;
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginManager(),
threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
executor.prestartAllCoreThreads();
List<Runnable> tasks = getTask(threadCount, blackhole);
tasks.forEach(executor::execute);
executor.shutdown();
while (!executor.isTerminated()) {
}
}
@SneakyThrows
@Benchmark
public void plugin_50(Blackhole blackhole) {
int threadCount = 50;
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginManager(),
threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
executor.prestartAllCoreThreads();
executor.register(new TaskTimeRecordPlugin());
List<Runnable> tasks = getTask(threadCount, blackhole);
tasks.forEach(executor::execute);
executor.shutdown();
while (!executor.isTerminated()) {
}
}
@SneakyThrows
@Benchmark
public void plugin_200(Blackhole blackhole) {
int threadCount = 200;
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginManager(),
threadCount, threadCount, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(threadCount), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
executor.prestartAllCoreThreads();
executor.register(new TaskTimeRecordPlugin());
List<Runnable> tasks = getTask(threadCount, blackhole);
tasks.forEach(executor::execute);
executor.shutdown();
while (!executor.isTerminated()) {
}
}
private List<Runnable> getTask(int count, Blackhole blackhole) {
List<Runnable> tasks = new ArrayList<>(count);
for (int i = 1; i < count * 2; i++) {
int index = i;
tasks.add(() -> blackhole.consume(index));
}
return tasks;
}
public static void main(String[] args) throws Exception {
Options opts = new OptionsBuilder()
.include(TaskTimeRecordPluginBenchmarkTest.class.getSimpleName())
.resultFormat(ResultFormatType.JSON)
.build();
new Runner(opts).run();
}
}

@ -1,4 +1,4 @@
**<font color=#FF0000>[警] </font>%s - 动态线程池运行告警(%s**
**<font color=#FF0000>[警] </font>%s - 动态线程池运行告警(%s**
---

@ -7,7 +7,7 @@
"header": {
"template": "red",
"title": {
"content": "[🔥] %s 动态线程池运行告警(%s",
"content": "[🔥警] %s 动态线程池运行告警(%s",
"tag": "plain_text"
}
},

@ -1,4 +1,4 @@
### <font color='#FF0000'>[警] </font>%s - 动态线程池运行告警(%s
### <font color='#FF0000'>[警] </font>%s - 动态线程池运行告警(%s
> 线程池ID<font color='warning'>%s</font>
> 应用名称:<font color='warning'>%s</font>

@ -73,12 +73,11 @@ public class JWTAuthenticationFilter extends UsernamePasswordAuthenticationFilte
authenticate = authenticationManager.authenticate(
new UsernamePasswordAuthenticationToken(loginUser.getUsername(), loginUser.getPassword(), new ArrayList()));
} catch (BadCredentialsException e) {
log.warn("BadCredentialsException:{}", e.getMessage());
log.warn("Bad credentials exception: {}", e.getMessage());
} catch (Exception e) {
log.error("attemptauthentication error:", e);
} finally {
return authenticate;
log.error("Attempt authentication error", e);
}
return authenticate;
}
@Override

@ -34,7 +34,7 @@ public class RewriteUserInfoApiFilter implements Filter {
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
boolean enableAuthentication = AuthUtil.enableAuthentication;
boolean enableAuthentication = AuthUtil.ENABLE_AUTHENTICATION;
HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
String path = httpRequest.getRequestURI();
if (!enableAuthentication && path.contains("users/info")) {

@ -20,13 +20,19 @@ package cn.hippo4j.auth.toolkit;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* Auth util.
*/
@Component
public class AuthUtil {
public static boolean enableAuthentication;
/**
* Enable authentication
*/
public static boolean ENABLE_AUTHENTICATION;
@Value("${hippo4j.core.auth.enabled:true}")
public void setEnableAuthentication(boolean enabled) {
AuthUtil.enableAuthentication = enabled;
AuthUtil.ENABLE_AUTHENTICATION = enabled;
}
}

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.auth.toolkit;
import cn.hippo4j.common.toolkit.Assert;
import org.junit.Before;
import org.junit.Test;
public final class AuthUtilTest {
private AuthUtil authUtil;
@Before
public void beforeInit() {
authUtil = new AuthUtil();
authUtil.setEnableAuthentication(true);
}
@Test
public void assertGetEnableAuthentication() {
Assert.isTrue(AuthUtil.ENABLE_AUTHENTICATION);
}
}

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.auth.toolkit;
import cn.hippo4j.common.toolkit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Objects;
public final class ReturnTTest {
private ReturnT returnT;
@Before
public void beforeInit() {
returnT = new ReturnT("success");
}
@Test
public void assertGetCode() {
Assert.isTrue(Objects.equals(returnT.getCode(), 200));
}
@Test
public void assertGetMessage() {
Assert.isNull(returnT.getMsg());
}
@Test
public void assertGetContent() {
Assert.isTrue(Objects.equals(returnT.getContent(), "success"));
}
}

@ -17,6 +17,12 @@
package cn.hippo4j.config.controller;
import java.net.URLDecoder;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import cn.hippo4j.common.constant.ConfigModifyTypeConstants;
import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
@ -35,13 +41,14 @@ import cn.hippo4j.config.toolkit.Md5ConfigUtil;
import cn.hippo4j.config.verify.ConfigModificationVerifyServiceChoose;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.net.URLDecoder;
import java.util.Map;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* Server configuration controller.
@ -76,7 +83,7 @@ public class ConfigController {
modifySaveReqDTO.setCorePoolSize(config.getCoreSize());
modifySaveReqDTO.setMaximumPoolSize(config.getMaxSize());
modifySaveReqDTO.setModifyUser(UserContext.getUserName());
modifySaveReqDTO.setModifyAll(StringUtil.isEmpty(identify) ? true : false);
modifySaveReqDTO.setModifyAll(StringUtil.isEmpty(identify));
modifySaveReqDTO.setIdentify(identify);
modifySaveReqDTO.setType(ConfigModifyTypeConstants.THREAD_POOL_INSTANCE);
configModificationVerifyServiceChoose.choose(modifySaveReqDTO.getType()).saveConfigModifyApplication(modifySaveReqDTO);

Loading…
Cancel
Save