Merge branch 'opengoofy:develop' into develop

pull/1007/head
Lijx 3 years ago committed by GitHub
commit 4d4dc3c505
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,3 +1,3 @@
# These are supported funding model platforms # These are supported funding model platforms
custom: ['https://hippo4j.cn/docs/community/sponsor'] custom: ['https://hippo4j.cn/community/sponsor']

@ -47,6 +47,16 @@ spring:
thread-pool-types: dynamic # 采集线程池的类型。egdynamic、web、adapter。可任意配置默认 dynamic thread-pool-types: dynamic # 采集线程池的类型。egdynamic、web、adapter。可任意配置默认 dynamic
``` ```
如果使用 `micrometer` 类型的监控指标,需要添加以下依赖。
```xml
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-spring-boot-starter-monitor-micrometer</artifactId>
<version>1.4.3-upgrade</version>
</dependency>
```
项目启动,访问 `http://localhost:29999/actuator/prometheus` 出现 `dynamic_thread_pool_` 前缀的指标,即为成功。 项目启动,访问 `http://localhost:29999/actuator/prometheus` 出现 `dynamic_thread_pool_` 前缀的指标,即为成功。
![](https://images-machen.oss-cn-beijing.aliyuncs.com/image-20220912220401016.png) ![](https://images-machen.oss-cn-beijing.aliyuncs.com/image-20220912220401016.png)

@ -116,7 +116,7 @@ const config = {
}, },
{ {
href: 'https://xiaomage.info/knowledge-planet', href: 'https://xiaomage.info/knowledge-planet',
label: '🥇代码实战课', label: '知识星球',
position: 'left', position: 'left',
}, },
{ {

@ -47,6 +47,16 @@ spring:
thread-pool-types: dynamic # 采集线程池的类型。egdynamic、web、adapter。可任意配置默认 dynamic thread-pool-types: dynamic # 采集线程池的类型。egdynamic、web、adapter。可任意配置默认 dynamic
``` ```
如果使用 `micrometer` 类型的监控指标,需要添加以下依赖。
```xml
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-spring-boot-starter-monitor-micrometer</artifactId>
<version>1.4.3-upgrade</version>
</dependency>
```
项目启动,访问 `http://localhost:29999/actuator/prometheus` 出现 `dynamic_thread_pool_` 前缀的指标,即为成功。 项目启动,访问 `http://localhost:29999/actuator/prometheus` 出现 `dynamic_thread_pool_` 前缀的指标,即为成功。
![](https://images-machen.oss-cn-beijing.aliyuncs.com/image-20220912220401016.png) ![](https://images-machen.oss-cn-beijing.aliyuncs.com/image-20220912220401016.png)

@ -47,6 +47,16 @@ spring:
thread-pool-types: dynamic # 采集线程池的类型。egdynamic、web、adapter。可任意配置默认 dynamic thread-pool-types: dynamic # 采集线程池的类型。egdynamic、web、adapter。可任意配置默认 dynamic
``` ```
如果使用 `micrometer` 类型的监控指标,需要添加以下依赖。
```xml
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-spring-boot-starter-monitor-micrometer</artifactId>
<version>1.4.3-upgrade</version>
</dependency>
```
项目启动,访问 `http://localhost:29999/actuator/prometheus` 出现 `dynamic_thread_pool_` 前缀的指标,即为成功。 项目启动,访问 `http://localhost:29999/actuator/prometheus` 出现 `dynamic_thread_pool_` 前缀的指标,即为成功。
![](https://images-machen.oss-cn-beijing.aliyuncs.com/image-20220912220401016.png) ![](https://images-machen.oss-cn-beijing.aliyuncs.com/image-20220912220401016.png)

@ -47,6 +47,16 @@ spring:
thread-pool-types: dynamic # 采集线程池的类型。egdynamic、web、adapter。可任意配置默认 dynamic thread-pool-types: dynamic # 采集线程池的类型。egdynamic、web、adapter。可任意配置默认 dynamic
``` ```
如果使用 `micrometer` 类型的监控指标,需要添加以下依赖。
```xml
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-spring-boot-starter-monitor-micrometer</artifactId>
<version>1.4.3-upgrade</version>
</dependency>
```
项目启动,访问 `http://localhost:29999/actuator/prometheus` 出现 `dynamic_thread_pool_` 前缀的指标,即为成功。 项目启动,访问 `http://localhost:29999/actuator/prometheus` 出现 `dynamic_thread_pool_` 前缀的指标,即为成功。
![](https://images-machen.oss-cn-beijing.aliyuncs.com/image-20220912220401016.png) ![](https://images-machen.oss-cn-beijing.aliyuncs.com/image-20220912220401016.png)

@ -17,5 +17,74 @@
package cn.hippo4j.common.executor; package cn.hippo4j.common.executor;
import cn.hippo4j.common.design.builder.ThreadFactoryBuilder;
import cn.hippo4j.common.toolkit.MapUtil;
import cn.hippo4j.common.toolkit.ReflectUtil;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.stream.IntStream;
public final class ExecutorFactoryTest { public final class ExecutorFactoryTest {
ThreadFactory threadFactory = new ThreadFactoryBuilder().prefix("test").build();
/**
* data range min
*/
Integer rangeMin = 1;
/**
* data range max
*/
Integer rangeMax = 10;
/**
* default test index
*/
Integer defaultIndex = 0;
@Test
public void assertNewSingleScheduledExecutorService() {
// init data snapshot
ThreadPoolManager poolManager = (ThreadPoolManager) ReflectUtil.getFieldValue(ExecutorFactory.Managed.class, "THREAD_POOL_MANAGER");
String poolName = (String) ReflectUtil.getFieldValue(ExecutorFactory.Managed.class, "DEFAULT_NAMESPACE");
Map<String, Map<String, Set<ExecutorService>>> manager = (Map<String, Map<String, Set<ExecutorService>>>) ReflectUtil.getFieldValue(poolManager, "resourcesManager");
Map<String, Set<ExecutorService>> initRelationMap = manager.get(poolName);
int defaultManagerSize = manager.size();
int defaultRelationSize = MapUtil.isEmpty(initRelationMap) ? 0 : initRelationMap.size();
// test begin
ScheduledExecutorService executorService = ExecutorFactory.Managed.newSingleScheduledExecutorService(String.format("test-group-%s", defaultIndex), threadFactory);
Assert.assertNotNull(executorService);
// check default init
Assert.assertEquals(1, manager.size() - defaultManagerSize);
// check multiple registrations and check to see if it is still an instance
IntStream.rangeClosed(rangeMin, rangeMax).forEach(index -> ExecutorFactory.Managed.newSingleScheduledExecutorService(String.format("test-group-%s", index), threadFactory));
Assert.assertEquals(1, manager.size() - defaultManagerSize);
// check group size
Map<String, Set<ExecutorService>> relationMap = manager.get(poolName);
Assert.assertEquals(11, relationMap.size() - defaultRelationSize);
// check the number of threads between the group and the thread pool
IntStream.rangeClosed(rangeMin, rangeMax).forEach(index -> {
String relationKey = String.format("test-group-%s", index);
Assert.assertNotNull(relationMap.get(relationKey));
Assert.assertEquals(1, relationMap.get(relationKey).size());
});
// instantiate the same group a second time and check the corresponding quantitative relationship
IntStream.rangeClosed(defaultIndex, rangeMax).forEach(index -> ExecutorFactory.Managed.newSingleScheduledExecutorService(String.format("test-group-%s", index), threadFactory));
// chek group size
Assert.assertEquals(11, manager.get(poolName).size() - defaultRelationSize);
// check the number of threads between the group and the thread pool
IntStream.rangeClosed(rangeMin, rangeMax).forEach(index -> Assert.assertEquals(2, relationMap.get(String.format("test-group-%s", index)).size()));
}
} }

@ -37,6 +37,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
/** /**
@ -45,6 +46,12 @@ import java.util.concurrent.atomic.AtomicLong;
@Slf4j @Slf4j
public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean { public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean {
/**
* A flag used to indicate whether destroy() method has been called,
* after the flag is set to false, calling destroy() method again will not take effect
*/
private final AtomicBoolean active;
/** /**
* Wait for tasks to complete on shutdown * Wait for tasks to complete on shutdown
*/ */
@ -92,11 +99,22 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
threadPoolId, new DefaultThreadPoolPluginManager().setPluginComparator(AnnotationAwareOrderComparator.INSTANCE), threadPoolId, new DefaultThreadPoolPluginManager().setPluginComparator(AnnotationAwareOrderComparator.INSTANCE),
corePoolSize, maximumPoolSize, keepAliveTime, unit, corePoolSize, maximumPoolSize, keepAliveTime, unit,
blockingQueue, threadFactory, rejectedExecutionHandler); blockingQueue, threadFactory, rejectedExecutionHandler);
log.info("Initializing ExecutorService {}", threadPoolId); log.info("Initializing ExecutorService '{}'", threadPoolId);
this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown; this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown;
// Init default plugins. // Init default plugins.
new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis) new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis)
.doRegister(this); .doRegister(this);
this.active = new AtomicBoolean(true);
}
/**
* <p>Whether the current instance is in the active state. <br />
* It returns false when the xx method is called at least once.
*
* @return true if current instance is in the active state, false otherwise
*/
public boolean isActive() {
return active.get();
} }
/** /**
@ -104,12 +122,21 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
*/ */
@Override @Override
public void destroy() { public void destroy() {
// instance has been destroyed, not need to call this method again
if (!isActive()) {
log.warn("Failed to destroy ExecutorService '{}' because it has already been destroyed", getThreadPoolId());
return;
}
if (isWaitForTasksToCompleteOnShutdown()) { if (isWaitForTasksToCompleteOnShutdown()) {
super.shutdown(); super.shutdown();
} else { } else {
super.shutdownNow(); super.shutdownNow();
} }
getThreadPoolPluginManager().clear(); getThreadPoolPluginManager().clear();
log.info("ExecutorService '{}' has been destroyed", getThreadPoolId());
// modify the flag to false avoid the method being called repeatedly
active.set(false);
} }
/** /**

@ -95,7 +95,7 @@ public class DynamicThreadPoolExecutorTest {
}); });
executor.destroy(); executor.destroy();
// waitting for terminated // waiting for terminated
while (!executor.isTerminated()) { while (!executor.isTerminated()) {
} ; } ;
Assert.assertEquals(2, count.get()); Assert.assertEquals(2, count.get());
@ -119,9 +119,9 @@ public class DynamicThreadPoolExecutorTest {
}); });
executor.destroy(); executor.destroy();
// waitting for terminated // waiting for terminated
while (!executor.isTerminated()) { while (!executor.isTerminated()) {
} ; }
Assert.assertEquals(1, count.get()); Assert.assertEquals(1, count.get());
} }
@ -157,4 +157,21 @@ public class DynamicThreadPoolExecutorTest {
Assert.assertFalse(executor.isWaitForTasksToCompleteOnShutdown()); Assert.assertFalse(executor.isWaitForTasksToCompleteOnShutdown());
} }
@Test
public void testIsActive() {
DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(
1, 1, 1000L, TimeUnit.MILLISECONDS,
1000L, true, 1000L,
new ArrayBlockingQueue<>(1), "test", Thread::new, new ThreadPoolExecutor.DiscardOldestPolicy());
Assert.assertTrue(executor.isActive());
// waiting for terminated
executor.destroy();
while (!executor.isTerminated()) {
}
Assert.assertFalse(executor.isActive());
executor.destroy();
Assert.assertFalse(executor.isActive());
}
} }

@ -92,13 +92,11 @@ public class DingSendMessageHandler extends AbstractRobotSendMessageHandler {
String title = Objects.equals(notifyConfig.getType(), "CONFIG") ? DING_NOTICE_TITLE : DING_ALARM_TITLE; String title = Objects.equals(notifyConfig.getType(), "CONFIG") ? DING_NOTICE_TITLE : DING_ALARM_TITLE;
String text = robotMessageExecuteDTO.getText(); String text = robotMessageExecuteDTO.getText();
ArrayList<String> atMobiles = CollectionUtil.newArrayList(notifyConfig.getReceives().split(",")); ArrayList<String> atMobiles = CollectionUtil.newArrayList(notifyConfig.getReceives().split(","));
HashMap<String, Object> markdown = new HashMap<>(); HashMap<String, Object> markdown = new HashMap<>();
markdown.put("title", title); markdown.put("title", title);
markdown.put("text", text); markdown.put("text", text);
HashMap<String, Object> at = new HashMap<>(); HashMap<String, Object> at = new HashMap<>();
at.put("atMobiles", atMobiles); at.put("atMobiles", atMobiles);
HashMap<String, Object> markdownJson = new HashMap<>(); HashMap<String, Object> markdownJson = new HashMap<>();
markdownJson.put("msgtype", "markdown"); markdownJson.put("msgtype", "markdown");
markdownJson.put("markdown", markdown); markdownJson.put("markdown", markdown);
@ -106,18 +104,29 @@ public class DingSendMessageHandler extends AbstractRobotSendMessageHandler {
try { try {
String responseBody = HttpUtil.post(serverUrl, markdownJson); String responseBody = HttpUtil.post(serverUrl, markdownJson);
DingRobotResponse response = JSONUtil.parseObject(responseBody, DingRobotResponse.class); DingRobotResponse response = JSONUtil.parseObject(responseBody, DingRobotResponse.class);
Assert.isTrue(response != null, "response is null"); Assert.isTrue(response != null, "Response is null.");
if (response.getErrcode() != 0) { if (response.getErrcode() != 0) {
log.error("Ding failed to send message,reason : {}", response.errmsg); log.error("Ding failed to send message, reason : {}", response.errmsg);
} }
} catch (Exception ex) { } catch (Exception ex) {
log.error("Ding failed to send message", ex); log.error("Ding failed to send message.", ex);
} }
} }
/**
* Ding robot response.
*/
@Data @Data
static class DingRobotResponse { static class DingRobotResponse {
/**
* Error code
*/
private Long errcode; private Long errcode;
/**
* Error message
*/
private String errmsg; private String errmsg;
} }
} }

@ -21,7 +21,7 @@ import cn.hippo4j.common.api.ThreadPoolCheckAlarm;
import cn.hippo4j.common.api.ThreadPoolConfigChange; import cn.hippo4j.common.api.ThreadPoolConfigChange;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.config.springboot.starter.monitor.ThreadPoolMonitorExecutor; import cn.hippo4j.config.springboot.starter.monitor.ThreadPoolMonitorExecutor;
import cn.hippo4j.config.springboot.starter.notify.CoreNotifyConfigBuilder; import cn.hippo4j.config.springboot.starter.notify.ConfigModeNotifyConfigBuilder;
import cn.hippo4j.config.springboot.starter.refresher.event.AdapterExecutorsRefreshListener; import cn.hippo4j.config.springboot.starter.refresher.event.AdapterExecutorsRefreshListener;
import cn.hippo4j.config.springboot.starter.refresher.event.DynamicThreadPoolRefreshListener; import cn.hippo4j.config.springboot.starter.refresher.event.DynamicThreadPoolRefreshListener;
import cn.hippo4j.config.springboot.starter.refresher.event.PlatformsRefreshListener; import cn.hippo4j.config.springboot.starter.refresher.event.PlatformsRefreshListener;
@ -77,7 +77,7 @@ public class DynamicThreadPoolAutoConfiguration {
@Bean @Bean
public NotifyConfigBuilder notifyConfigBuilder(AlarmControlHandler alarmControlHandler) { public NotifyConfigBuilder notifyConfigBuilder(AlarmControlHandler alarmControlHandler) {
return new CoreNotifyConfigBuilder(alarmControlHandler, bootstrapConfigProperties); return new ConfigModeNotifyConfigBuilder(alarmControlHandler, bootstrapConfigProperties);
} }
@Bean @Bean
@ -105,9 +105,9 @@ public class DynamicThreadPoolAutoConfiguration {
@Bean @Bean
@SuppressWarnings("all") @SuppressWarnings("all")
public DynamicThreadPoolRefreshListener hippo4jExecutorsListener(ThreadPoolConfigChange threadPoolConfigChange, public DynamicThreadPoolRefreshListener hippo4jExecutorsListener(ThreadPoolConfigChange threadPoolConfigChange,
CoreNotifyConfigBuilder coreNotifyConfigBuilder, ConfigModeNotifyConfigBuilder configModeNotifyConfigBuilder,
Hippo4jBaseSendMessageService hippoBaseSendMessageService) { Hippo4jBaseSendMessageService hippoBaseSendMessageService) {
return new DynamicThreadPoolRefreshListener(threadPoolConfigChange, coreNotifyConfigBuilder, hippoBaseSendMessageService); return new DynamicThreadPoolRefreshListener(threadPoolConfigChange, configModeNotifyConfigBuilder, hippoBaseSendMessageService);
} }
@Bean @Bean

@ -32,11 +32,11 @@ import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* Core notify config builder. * Config mode notify config builder.
*/ */
@AllArgsConstructor @AllArgsConstructor
@Slf4j @Slf4j
public class CoreNotifyConfigBuilder implements NotifyConfigBuilder { public class ConfigModeNotifyConfigBuilder implements NotifyConfigBuilder {
private final AlarmControlHandler alarmControlHandler; private final AlarmControlHandler alarmControlHandler;

@ -24,7 +24,7 @@ import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties; import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.config.springboot.starter.config.ExecutorProperties; import cn.hippo4j.config.springboot.starter.config.ExecutorProperties;
import cn.hippo4j.config.springboot.starter.notify.CoreNotifyConfigBuilder; import cn.hippo4j.config.springboot.starter.notify.ConfigModeNotifyConfigBuilder;
import cn.hippo4j.config.springboot.starter.support.GlobalCoreThreadPoolManage; import cn.hippo4j.config.springboot.starter.support.GlobalCoreThreadPoolManage;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
@ -60,7 +60,7 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
private final ThreadPoolConfigChange threadPoolConfigChange; private final ThreadPoolConfigChange threadPoolConfigChange;
private final CoreNotifyConfigBuilder coreNotifyConfigBuilder; private final ConfigModeNotifyConfigBuilder configModeNotifyConfigBuilder;
private final Hippo4jBaseSendMessageService hippo4jBaseSendMessageService; private final Hippo4jBaseSendMessageService hippo4jBaseSendMessageService;
@ -165,7 +165,7 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
boolean checkNotifyConfig = false; boolean checkNotifyConfig = false;
boolean checkNotifyAlarm = false; boolean checkNotifyAlarm = false;
List<String> changeKeys = new ArrayList<>(); List<String> changeKeys = new ArrayList<>();
Map<String, List<NotifyConfigDTO>> newDynamicThreadPoolNotifyMap = coreNotifyConfigBuilder.buildSingleNotifyConfig(executorProperties); Map<String, List<NotifyConfigDTO>> newDynamicThreadPoolNotifyMap = configModeNotifyConfigBuilder.buildSingleNotifyConfig(executorProperties);
Map<String, List<NotifyConfigDTO>> notifyConfigs = hippo4jBaseSendMessageService.getNotifyConfigs(); Map<String, List<NotifyConfigDTO>> notifyConfigs = hippo4jBaseSendMessageService.getNotifyConfigs();
if (CollectionUtil.isNotEmpty(notifyConfigs)) { if (CollectionUtil.isNotEmpty(notifyConfigs)) {
for (Map.Entry<String, List<NotifyConfigDTO>> each : newDynamicThreadPoolNotifyMap.entrySet()) { for (Map.Entry<String, List<NotifyConfigDTO>> each : newDynamicThreadPoolNotifyMap.entrySet()) {
@ -183,7 +183,7 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
} }
} }
if (checkNotifyConfig) { if (checkNotifyConfig) {
coreNotifyConfigBuilder.initCacheAndLock(newDynamicThreadPoolNotifyMap); configModeNotifyConfigBuilder.initCacheAndLock(newDynamicThreadPoolNotifyMap);
hippo4jBaseSendMessageService.putPlatform(newDynamicThreadPoolNotifyMap); hippo4jBaseSendMessageService.putPlatform(newDynamicThreadPoolNotifyMap);
} }
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(executorProperties.getThreadPoolId()); ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(executorProperties.getThreadPoolId());

@ -20,7 +20,7 @@ package cn.hippo4j.config.springboot.starter.refresher.event;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties; import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.config.springboot.starter.config.ExecutorProperties; import cn.hippo4j.config.springboot.starter.config.ExecutorProperties;
import cn.hippo4j.config.springboot.starter.notify.CoreNotifyConfigBuilder; import cn.hippo4j.config.springboot.starter.notify.ConfigModeNotifyConfigBuilder;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.message.dto.NotifyConfigDTO; import cn.hippo4j.message.dto.NotifyConfigDTO;
@ -47,7 +47,7 @@ public class PlatformsRefreshListener extends AbstractRefreshListener<ExecutorPr
DynamicThreadPoolWrapper wrapper = GlobalThreadPoolManage.getExecutorService(threadPoolId); DynamicThreadPoolWrapper wrapper = GlobalThreadPoolManage.getExecutorService(threadPoolId);
if (wrapper != null && !wrapper.isInitFlag()) { if (wrapper != null && !wrapper.isInitFlag()) {
Hippo4jBaseSendMessageService sendMessageService = ApplicationContextHolder.getBean(Hippo4jBaseSendMessageService.class); Hippo4jBaseSendMessageService sendMessageService = ApplicationContextHolder.getBean(Hippo4jBaseSendMessageService.class);
CoreNotifyConfigBuilder configBuilder = ApplicationContextHolder.getBean(CoreNotifyConfigBuilder.class); ConfigModeNotifyConfigBuilder configBuilder = ApplicationContextHolder.getBean(ConfigModeNotifyConfigBuilder.class);
Map<String, List<NotifyConfigDTO>> notifyConfig = configBuilder.buildSingleNotifyConfig(executorProperties); Map<String, List<NotifyConfigDTO>> notifyConfig = configBuilder.buildSingleNotifyConfig(executorProperties);
sendMessageService.putPlatform(notifyConfig); sendMessageService.putPlatform(notifyConfig);
wrapper.setInitFlag(Boolean.TRUE); wrapper.setInitFlag(Boolean.TRUE);

@ -52,11 +52,12 @@ import cn.hippo4j.springboot.starter.monitor.ReportingEventExecutor;
import cn.hippo4j.springboot.starter.monitor.collect.RunTimeInfoCollector; import cn.hippo4j.springboot.starter.monitor.collect.RunTimeInfoCollector;
import cn.hippo4j.springboot.starter.monitor.send.MessageSender; import cn.hippo4j.springboot.starter.monitor.send.MessageSender;
import cn.hippo4j.springboot.starter.monitor.send.http.HttpConnectSender; import cn.hippo4j.springboot.starter.monitor.send.http.HttpConnectSender;
import cn.hippo4j.springboot.starter.notify.ServerNotifyConfigBuilder; import cn.hippo4j.springboot.starter.notify.ServerModeNotifyConfigBuilder;
import cn.hippo4j.springboot.starter.remote.HttpAgent; import cn.hippo4j.springboot.starter.remote.HttpAgent;
import cn.hippo4j.springboot.starter.remote.HttpScheduledHealthCheck; import cn.hippo4j.springboot.starter.remote.HttpScheduledHealthCheck;
import cn.hippo4j.springboot.starter.remote.ServerHealthCheck; import cn.hippo4j.springboot.starter.remote.ServerHealthCheck;
import cn.hippo4j.springboot.starter.remote.ServerHttpAgent; import cn.hippo4j.springboot.starter.remote.ServerHttpAgent;
import cn.hippo4j.springboot.starter.support.AdaptedThreadPoolDestroyPostProcessor;
import cn.hippo4j.springboot.starter.support.DynamicThreadPoolConfigService; import cn.hippo4j.springboot.starter.support.DynamicThreadPoolConfigService;
import cn.hippo4j.springboot.starter.support.DynamicThreadPoolPostProcessor; import cn.hippo4j.springboot.starter.support.DynamicThreadPoolPostProcessor;
import cn.hippo4j.springboot.starter.support.ThreadPoolPluginRegisterPostProcessor; import cn.hippo4j.springboot.starter.support.ThreadPoolPluginRegisterPostProcessor;
@ -68,6 +69,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.info.BuildProperties; import org.springframework.boot.info.BuildProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered; import org.springframework.core.Ordered;
@ -114,10 +116,15 @@ public class DynamicThreadPoolAutoConfiguration {
@SuppressWarnings("all") @SuppressWarnings("all")
public DynamicThreadPoolService dynamicThreadPoolConfigService(HttpAgent httpAgent, public DynamicThreadPoolService dynamicThreadPoolConfigService(HttpAgent httpAgent,
ServerHealthCheck serverHealthCheck, ServerHealthCheck serverHealthCheck,
ServerNotifyConfigBuilder notifyConfigBuilder, ServerModeNotifyConfigBuilder serverModeNotifyConfigBuilder,
Hippo4jBaseSendMessageService hippo4jBaseSendMessageService, Hippo4jBaseSendMessageService hippo4jBaseSendMessageService,
DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig) { DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig) {
return new DynamicThreadPoolConfigService(httpAgent, properties, notifyConfigBuilder, hippo4jBaseSendMessageService, dynamicThreadPoolSubscribeConfig); return new DynamicThreadPoolConfigService(httpAgent, properties, serverModeNotifyConfigBuilder, hippo4jBaseSendMessageService, dynamicThreadPoolSubscribeConfig);
}
@Bean
public AdaptedThreadPoolDestroyPostProcessor adaptedThreadPoolDestroyPostProcessor(ApplicationContext applicationContext) {
return new AdaptedThreadPoolDestroyPostProcessor(applicationContext);
} }
@Bean @Bean
@ -198,7 +205,7 @@ public class DynamicThreadPoolAutoConfiguration {
public NotifyConfigBuilder serverNotifyConfigBuilder(HttpAgent httpAgent, public NotifyConfigBuilder serverNotifyConfigBuilder(HttpAgent httpAgent,
BootstrapProperties properties, BootstrapProperties properties,
AlarmControlHandler alarmControlHandler) { AlarmControlHandler alarmControlHandler) {
return new ServerNotifyConfigBuilder(httpAgent, properties, alarmControlHandler); return new ServerModeNotifyConfigBuilder(httpAgent, properties, alarmControlHandler);
} }
@Bean @Bean

@ -41,11 +41,11 @@ import java.util.Objects;
import static cn.hippo4j.common.constant.Constants.BASE_PATH; import static cn.hippo4j.common.constant.Constants.BASE_PATH;
/** /**
* Server notify config builder. * Server mode notify config builder.
*/ */
@Slf4j @Slf4j
@AllArgsConstructor @AllArgsConstructor
public class ServerNotifyConfigBuilder implements NotifyConfigBuilder { public class ServerModeNotifyConfigBuilder implements NotifyConfigBuilder {
private final HttpAgent httpAgent; private final HttpAgent httpAgent;

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.support;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapter;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.DestructionAwareBeanPostProcessor;
import org.springframework.context.ApplicationContext;
import java.util.Optional;
/**
* <p>Adapted thread pool destroy post processor. <br />
* The processor is used to destroy the internal {@link DynamicThreadPoolExecutor} instance
* in the instance adapted by {@link DynamicThreadPoolAdapter} in the spring context.
*
* @see DynamicThreadPoolAdapter
*/
@RequiredArgsConstructor
@Slf4j
public class AdaptedThreadPoolDestroyPostProcessor implements DestructionAwareBeanPostProcessor {
/**
* Application context.
*/
private final ApplicationContext applicationContext;
/**
* If {@link DynamicThreadPoolAdapterChoose#match} method returns true,
* try to destroy its internal {@link DynamicThreadPoolExecutor} instance.
*
* @param bean the bean instance to check
* @return {@code true} if {@link DynamicThreadPoolAdapterChoose#match} method returns true, false otherwise
* @see DynamicThreadPoolAdapterChoose#match
*/
@Override
public boolean requiresDestruction(Object bean) {
return DynamicThreadPoolAdapterChoose.match(bean);
}
/**
* If the internal {@link DynamicThreadPoolExecutor} instance in the adapted bean is not managed by spring,
* call its {@link DynamicThreadPoolExecutor#destroy()} directly.
*
* @param bean the bean instance to be destroyed
* @param beanName the name of the bean
* @throws BeansException in case of errors
* @see DynamicThreadPoolExecutor#destroy()
*/
@Override
public void postProcessBeforeDestruction(Object bean, String beanName) throws BeansException {
Optional.ofNullable(DynamicThreadPoolAdapterChoose.unwrap(bean))
.map(DynamicThreadPoolExecutor::getThreadPoolId)
// the internal thread pool is also managed by spring, no manual destruction required
.filter(applicationContext::containsBeanDefinition)
.map(GlobalThreadPoolManage::getExecutorService)
.ifPresent(executor -> destroyAdaptedThreadPoolExecutor(beanName, executor));
}
private void destroyAdaptedThreadPoolExecutor(String beanName, DynamicThreadPoolWrapper executor) {
try {
if (log.isDebugEnabled()) {
log.debug("Destroy internal dynamic thread pool '{}' for bean '{}'", executor.getThreadPoolId(), beanName);
}
executor.destroy();
} catch (Exception e) {
log.warn("Failed to destroy internal dynamic thread pool '{}' for bean '{}'", executor.getThreadPoolId(), beanName);
}
}
}

@ -35,7 +35,7 @@ import cn.hippo4j.message.service.Hippo4jBaseSendMessageService;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import cn.hippo4j.springboot.starter.core.DynamicThreadPoolSubscribeConfig; import cn.hippo4j.springboot.starter.core.DynamicThreadPoolSubscribeConfig;
import cn.hippo4j.springboot.starter.notify.ServerNotifyConfigBuilder; import cn.hippo4j.springboot.starter.notify.ServerModeNotifyConfigBuilder;
import cn.hippo4j.springboot.starter.remote.HttpAgent; import cn.hippo4j.springboot.starter.remote.HttpAgent;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -58,7 +58,7 @@ public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolSer
private final BootstrapProperties properties; private final BootstrapProperties properties;
private final ServerNotifyConfigBuilder notifyConfigBuilder; private final ServerModeNotifyConfigBuilder serverModeNotifyConfigBuilder;
private final Hippo4jBaseSendMessageService hippo4jBaseSendMessageService; private final Hippo4jBaseSendMessageService hippo4jBaseSendMessageService;
@ -108,7 +108,7 @@ public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolSer
registerParameter.getActiveAlarm(), registerParameter.getActiveAlarm(),
registerParameter.getCapacityAlarm()); registerParameter.getCapacityAlarm());
GlobalNotifyAlarmManage.put(registerParameter.getThreadPoolId(), threadPoolNotifyAlarm); GlobalNotifyAlarmManage.put(registerParameter.getThreadPoolId(), threadPoolNotifyAlarm);
Map<String, List<NotifyConfigDTO>> builderNotify = notifyConfigBuilder.getAndInitNotify(CollectionUtil.newArrayList(registerParameter.getThreadPoolId())); Map<String, List<NotifyConfigDTO>> builderNotify = serverModeNotifyConfigBuilder.getAndInitNotify(CollectionUtil.newArrayList(registerParameter.getThreadPoolId()));
hippo4jBaseSendMessageService.putPlatform(builderNotify); hippo4jBaseSendMessageService.putPlatform(builderNotify);
} }

Loading…
Cancel
Save