Refactor the notification module to enable notification capability for the web container.

pull/1133/head
yanrongzhen 3 years ago
parent 0550dbc3a1
commit 382ff3a0ec

@ -22,15 +22,22 @@ import cn.hippo4j.common.model.ThreadPoolBaseInfo;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import io.undertow.Undertow;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Field;
import java.util.concurrent.Executor;
/**
* Abstract web thread pool service.
*/
@Slf4j
@Order(Ordered.HIGHEST_PRECEDENCE)
public abstract class AbstractWebThreadPoolService implements WebThreadPoolService, ApplicationRunner {
private final IWebThreadPoolHandlerSupport support;

@ -18,22 +18,29 @@
package cn.hippo4j.adapter.web;
import cn.hippo4j.common.config.ApplicationContextHolder;
import io.undertow.Undertow;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.web.context.WebServerApplicationContext;
import org.springframework.boot.web.server.WebServer;
import org.springframework.context.ApplicationContext;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Field;
import java.util.concurrent.Executor;
/**
* Default WebThreadPoolService abstract class,
* reuses common capabilities for web container operations.
*/
@Slf4j
public abstract class DefaultAbstractWebThreadPoolService extends AbstractWebThreadPoolService {
public DefaultAbstractWebThreadPoolService(IWebThreadPoolHandlerSupport support) {
super(support);
}
private static final String STARTED_FIELD_NAME = "started";
/**
* Get the internal abstract method of the web container thread pool,
* to be implemented by subclasses.
@ -53,6 +60,19 @@ public abstract class DefaultAbstractWebThreadPoolService extends AbstractWebThr
return getWebServer().getPort();
}
@Override
public boolean isContainerStarted() {
try {
WebServer container = getWebServer();
Field field = ReflectionUtils.findField(WebServer.class, STARTED_FIELD_NAME);
ReflectionUtils.makeAccessible(field);
return (boolean) ReflectionUtils.getField(field, container);
} catch (Throwable th) {
log.error("Failed to get isStarted flag.", th);
return false;
}
}
/**
* Get the thread pool object of the current web container based on the WebServer.
* @param webServer current Web-Server.

@ -30,6 +30,11 @@ import java.util.concurrent.Executor;
*/
public interface WebThreadPoolService {
/**
* Determine if the web container has started.
*/
boolean isContainerStarted();
/**
* Get web thread pool.
*

@ -44,7 +44,7 @@ import static cn.hippo4j.message.platform.constant.LarkAlarmConstants.*;
*/
@Slf4j
@RequiredArgsConstructor
public class LarkSendMessageHandler implements SendMessageHandler<AlarmNotifyRequest, ChangeParameterNotifyRequest> {
public class LarkSendMessageHandler implements SendMessageHandler {
@Override
public String getType() {

@ -32,7 +32,7 @@ import java.util.Objects;
/**
* Abstract robot send message handler.
*/
public abstract class AbstractRobotSendMessageHandler implements SendMessageHandler<AlarmNotifyRequest, ChangeParameterNotifyRequest> {
public abstract class AbstractRobotSendMessageHandler implements SendMessageHandler {
/**
* Build message actual content.

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.message.request;
import cn.hippo4j.message.request.base.BaseNotifyRequest;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* Change parameter notify request for web thread pool.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class WebChangeParameterNotifyRequest extends BaseNotifyRequest {
private String active;
private String appName;
private String identify;
private Integer beforeCorePoolSize;
private Integer nowCorePoolSize;
private Integer beforeMaximumPoolSize;
private Integer nowMaximumPoolSize;
private Long beforeKeepAliveTime;
private Long nowKeepAliveTime;
}

@ -18,6 +18,7 @@
package cn.hippo4j.message.request.base;
import cn.hippo4j.common.api.NotifyRequest;
import cn.hippo4j.message.enums.NotifyTypeEnum;
import lombok.Data;
/**

@ -25,6 +25,7 @@ import cn.hippo4j.message.dto.NotifyConfigDTO;
import cn.hippo4j.message.enums.NotifyTypeEnum;
import cn.hippo4j.message.request.AlarmNotifyRequest;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.request.WebChangeParameterNotifyRequest;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -53,11 +54,7 @@ public class Hippo4jBaseSendMessageService implements Hippo4jSendMessageService,
@Override
public void sendAlarmMessage(NotifyTypeEnum typeEnum, AlarmNotifyRequest alarmNotifyRequest) {
String threadPoolId = alarmNotifyRequest.getThreadPoolId();
String buildKey = new StringBuilder()
.append(threadPoolId)
.append("+")
.append("ALARM")
.toString();
String buildKey = generateAlarmKey(threadPoolId);
List<NotifyConfigDTO> notifyList = notifyConfigs.get(buildKey);
if (CollectionUtil.isEmpty(notifyList)) {
return;
@ -82,11 +79,7 @@ public class Hippo4jBaseSendMessageService implements Hippo4jSendMessageService,
@Override
public void sendChangeMessage(ChangeParameterNotifyRequest changeParameterNotifyRequest) {
String threadPoolId = changeParameterNotifyRequest.getThreadPoolId();
String buildKey = new StringBuilder()
.append(threadPoolId)
.append("+")
.append("CONFIG")
.toString();
String buildKey = generateConfigKey(threadPoolId);
List<NotifyConfigDTO> notifyList = notifyConfigs.get(buildKey);
if (CollectionUtil.isEmpty(notifyList)) {
log.warn("[{}] Please configure alarm notification on the server.", threadPoolId);
@ -106,6 +99,45 @@ public class Hippo4jBaseSendMessageService implements Hippo4jSendMessageService,
});
}
@Override
public void sendChangeMessage(WebChangeParameterNotifyRequest webChangeParameterNotifyRequest) {
String threadPoolId = webChangeParameterNotifyRequest.getThreadPoolId();
String buildKey = generateConfigKey(threadPoolId);
List<NotifyConfigDTO> notifyList = notifyConfigs.get(buildKey);
if (CollectionUtil.isEmpty(notifyList)) {
log.warn("[{}] Please configure alarm notification on the server.", threadPoolId);
return;
}
notifyList.forEach(each -> {
try {
SendMessageHandler messageHandler = sendMessageHandlers.get(each.getPlatform());
if (messageHandler == null) {
log.warn("[{}] Please configure alarm notification on the server.", threadPoolId);
return;
}
messageHandler.sendWebChangeMessage(each, webChangeParameterNotifyRequest);
} catch (Exception ex) {
log.warn("Failed to send thread pool change notification. key: [{}]", threadPoolId, ex);
}
});
}
private String generateConfigKey(String threadPoolId) {
return new StringBuilder()
.append(threadPoolId)
.append("+")
.append("CONFIG")
.toString();
}
private String generateAlarmKey(String threadPoolId) {
return new StringBuilder()
.append(threadPoolId)
.append("+")
.append("ALARM")
.toString();
}
/**
* Is send alarm.
*

@ -20,6 +20,7 @@ package cn.hippo4j.message.service;
import cn.hippo4j.message.enums.NotifyTypeEnum;
import cn.hippo4j.message.request.AlarmNotifyRequest;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.request.WebChangeParameterNotifyRequest;
/**
* Hippo-4j send message service.
@ -40,4 +41,11 @@ public interface Hippo4jSendMessageService {
* @param changeParameterNotifyRequest change parameter notify request
*/
void sendChangeMessage(ChangeParameterNotifyRequest changeParameterNotifyRequest);
/**
* Send web thread pool parameter change notification.
*
* @param webChangeParameterNotifyRequest change parameter notify request
*/
void sendChangeMessage(WebChangeParameterNotifyRequest webChangeParameterNotifyRequest);
}

@ -19,11 +19,14 @@ package cn.hippo4j.message.service;
import cn.hippo4j.message.dto.NotifyConfigDTO;
import cn.hippo4j.common.api.NotifyRequest;
import cn.hippo4j.message.request.AlarmNotifyRequest;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.request.WebChangeParameterNotifyRequest;
/**
* Send message handler.
*/
public interface SendMessageHandler<T extends NotifyRequest, R extends NotifyRequest> {
public interface SendMessageHandler {
/**
* Get the message send type.
@ -38,7 +41,7 @@ public interface SendMessageHandler<T extends NotifyRequest, R extends NotifyReq
* @param notifyConfig notify config
* @param alarmNotifyRequest alarm notify request
*/
void sendAlarmMessage(NotifyConfigDTO notifyConfig, T alarmNotifyRequest);
void sendAlarmMessage(NotifyConfigDTO notifyConfig, AlarmNotifyRequest alarmNotifyRequest);
/**
* Send change message.
@ -46,5 +49,15 @@ public interface SendMessageHandler<T extends NotifyRequest, R extends NotifyReq
* @param notifyConfig notify config
* @param changeParameterNotifyRequest change parameter notify request
*/
void sendChangeMessage(NotifyConfigDTO notifyConfig, R changeParameterNotifyRequest);
void sendChangeMessage(NotifyConfigDTO notifyConfig, ChangeParameterNotifyRequest changeParameterNotifyRequest);
/**
* Send web change message.
*
* @param notifyConfig notify config
* @param changeParameterNotifyRequest parameter notify request
*/
default void sendWebChangeMessage(NotifyConfigDTO notifyConfig, WebChangeParameterNotifyRequest changeParameterNotifyRequest) throws IllegalAccessException {
throw new IllegalAccessException("Please implement this method before using it.");
}
}

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.message.service;
import cn.hippo4j.common.api.ThreadPoolConfigChange;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.request.WebChangeParameterNotifyRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
/**
* Web thread-pool config change handler.
*/
@RequiredArgsConstructor
@Slf4j
public class WebThreadPoolConfigChangeHandler implements ThreadPoolConfigChange<WebChangeParameterNotifyRequest> {
@Value("${spring.application.name:UNKNOWN}")
private String applicationName;
private final Hippo4jSendMessageService hippo4jSendMessageService;
/**
* Send pool config change.
*
* @param requestParam change parameter notify request
*/
@Override
public void sendPoolConfigChange(WebChangeParameterNotifyRequest requestParam) {
try {
requestParam.setAppName(applicationName);
requestParam.setIdentify(IdentifyUtil.getIdentify());
hippo4jSendMessageService.sendChangeMessage(requestParam);
} catch (Throwable th) {
log.error("send web thread pool config change message failed.", th);
}
}
}

@ -20,18 +20,25 @@ package cn.hippo4j.config.springboot1x.starter.web;
import cn.hippo4j.adapter.web.AbstractWebThreadPoolService;
import cn.hippo4j.adapter.web.IWebThreadPoolHandlerSupport;
import cn.hippo4j.common.config.ApplicationContextHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.embedded.EmbeddedServletContainer;
import org.springframework.boot.context.embedded.EmbeddedWebApplicationContext;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Field;
/**
* Abstract class for adapting WebThreadPoolService to Spring 1.x version.
*/
@Slf4j
public abstract class AbstractWebThreadPoolService1x extends AbstractWebThreadPoolService {
public AbstractWebThreadPoolService1x(IWebThreadPoolHandlerSupport support) {
super(support);
}
private static final String STARTED_FIELD_NAME = "started";
/**
* Get the embedded Servlet container from the Spring application context.
*/
@ -46,4 +53,17 @@ public abstract class AbstractWebThreadPoolService1x extends AbstractWebThreadPo
public Integer getPort() {
return getContainer().getPort();
}
@Override
public boolean isContainerStarted() {
try {
EmbeddedServletContainer container = getContainer();
Field field = ReflectionUtils.findField(EmbeddedServletContainer.class, STARTED_FIELD_NAME);
ReflectionUtils.makeAccessible(field);
return (boolean) ReflectionUtils.getField(field, container);
} catch (Throwable th) {
log.error("Failed to get isStarted flag.", th);
return false;
}
}
}

@ -78,8 +78,9 @@ public class BootstrapConfigProperties implements BootstrapPropertiesInterface {
/**
* web config
* @since 1.5.0
*/
private WebThreadPoolProperties web;
private WebExecutorProperties web;
/**
* Notify platforms.

@ -17,6 +17,7 @@
package cn.hippo4j.config.springboot.starter.config;
import cn.hippo4j.adapter.web.WebThreadPoolService;
import cn.hippo4j.common.api.ThreadPoolCheckAlarm;
import cn.hippo4j.common.api.ThreadPoolConfigChange;
import cn.hippo4j.common.config.ApplicationContextHolder;
@ -34,11 +35,7 @@ import cn.hippo4j.core.enable.MarkerConfiguration;
import cn.hippo4j.core.handler.DynamicThreadPoolBannerHandler;
import cn.hippo4j.message.api.NotifyConfigBuilder;
import cn.hippo4j.message.config.MessageConfiguration;
import cn.hippo4j.message.service.AlarmControlHandler;
import cn.hippo4j.message.service.DefaultThreadPoolCheckAlarmHandler;
import cn.hippo4j.message.service.DefaultThreadPoolConfigChangeHandler;
import cn.hippo4j.message.service.Hippo4jBaseSendMessageService;
import cn.hippo4j.message.service.Hippo4jSendMessageService;
import cn.hippo4j.message.service.*;
import cn.hippo4j.springboot.starter.adapter.web.WebAdapterConfiguration;
import lombok.AllArgsConstructor;
import org.springframework.beans.factory.ObjectProvider;
@ -82,8 +79,8 @@ public class DynamicThreadPoolAutoConfiguration {
}
@Bean
public NotifyConfigBuilder notifyConfigBuilder(AlarmControlHandler alarmControlHandler) {
return new ConfigModeNotifyConfigBuilder(alarmControlHandler, bootstrapConfigProperties);
public NotifyConfigBuilder notifyConfigBuilder(AlarmControlHandler alarmControlHandler, WebThreadPoolService webThreadPoolService) {
return new ConfigModeNotifyConfigBuilder(alarmControlHandler, bootstrapConfigProperties, webThreadPoolService);
}
@Bean
@ -94,10 +91,16 @@ public class DynamicThreadPoolAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public ThreadPoolConfigChange defaultThreadPoolConfigChangeHandler(Hippo4jSendMessageService hippo4jSendMessageService) {
public DefaultThreadPoolConfigChangeHandler defaultThreadPoolConfigChangeHandler(Hippo4jSendMessageService hippo4jSendMessageService) {
return new DefaultThreadPoolConfigChangeHandler(hippo4jSendMessageService);
}
@Bean
@ConditionalOnMissingBean
public WebThreadPoolConfigChangeHandler webThreadPoolConfigChangeHandler(Hippo4jSendMessageService hippo4jSendMessageService) {
return new WebThreadPoolConfigChangeHandler(hippo4jSendMessageService);
}
@Bean
public DynamicThreadPoolPostProcessor dynamicThreadPoolPostProcessor(ApplicationContextHolder hippo4JApplicationContextHolder) {
return new DynamicThreadPoolPostProcessor(bootstrapConfigProperties);
@ -110,7 +113,7 @@ public class DynamicThreadPoolAutoConfiguration {
@Bean
@SuppressWarnings("all")
public DynamicThreadPoolRefreshListener hippo4jExecutorsListener(ThreadPoolConfigChange threadPoolConfigChange,
public DynamicThreadPoolRefreshListener hippo4jExecutorsListener(DefaultThreadPoolConfigChangeHandler threadPoolConfigChange,
ConfigModeNotifyConfigBuilder configModeNotifyConfigBuilder,
Hippo4jBaseSendMessageService hippoBaseSendMessageService) {
return new DynamicThreadPoolRefreshListener(threadPoolConfigChange, configModeNotifyConfigBuilder, hippoBaseSendMessageService);
@ -128,7 +131,7 @@ public class DynamicThreadPoolAutoConfiguration {
@Bean
@SuppressWarnings("all")
public WebExecutorRefreshListener hippo4jWebExecutorListener(ThreadPoolConfigChange threadPoolConfigChange) {
public WebExecutorRefreshListener hippo4jWebExecutorListener(WebThreadPoolConfigChangeHandler threadPoolConfigChange) {
return new WebExecutorRefreshListener(threadPoolConfigChange);
}

@ -31,7 +31,7 @@ import lombok.experimental.Accessors;
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
public class ExecutorProperties {
public class ExecutorProperties implements IExecutorProperties {
/**
* Thread pool id

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.springboot.starter.config;
/**
* Interface for thread pool configuration.
*/
public interface IExecutorProperties {
/**
* Core pool size
*/
Integer getCorePoolSize();
/**
* Maximum pool size
*/
Integer getMaximumPoolSize();
/**
* Keep alive time
*/
Long getKeepAliveTime();
/**
* Notify configs
*/
DynamicThreadPoolNotifyProperties getNotify();
}

@ -20,10 +20,10 @@ package cn.hippo4j.config.springboot.starter.config;
import lombok.Data;
/**
* Web thread pool properties.
* Web thread pool executor properties.
*/
@Data
public class WebThreadPoolProperties {
public class WebExecutorProperties implements IExecutorProperties {
/**
* Core pool size
@ -49,4 +49,9 @@ public class WebThreadPoolProperties {
* these propertied is enabled?
*/
private Boolean enable = true;
/**
* Notify config
*/
private DynamicThreadPoolNotifyProperties notify;
}

@ -17,11 +17,10 @@
package cn.hippo4j.config.springboot.starter.notify;
import cn.hippo4j.adapter.web.WebThreadPoolService;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.config.springboot.starter.config.ExecutorProperties;
import cn.hippo4j.config.springboot.starter.config.NotifyPlatformProperties;
import cn.hippo4j.config.springboot.starter.config.*;
import cn.hippo4j.message.api.NotifyConfigBuilder;
import cn.hippo4j.message.dto.NotifyConfigDTO;
import cn.hippo4j.message.service.AlarmControlHandler;
@ -42,24 +41,39 @@ public class ConfigModeNotifyConfigBuilder implements NotifyConfigBuilder {
private final BootstrapConfigProperties configProperties;
private final WebThreadPoolService webThreadPoolService;
@Override
public Map<String, List<NotifyConfigDTO>> buildNotify() {
Map<String, List<NotifyConfigDTO>> resultMap = new HashMap<>();
boolean globalAlarm = Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getAlarm()).orElse(true);
boolean globalAlarm = Optional.ofNullable(configProperties.getDefaultExecutor())
.map(ExecutorProperties::getAlarm)
.orElse(true);
List<ExecutorProperties> executors = configProperties.getExecutors();
if (CollectionUtil.isEmpty(executors)) {
log.warn("Failed to build notify, executors configuration is empty.");
return resultMap;
}
List<ExecutorProperties> actual = executors.stream().filter(each -> Optional.ofNullable(each.getAlarm()).orElse(false)).collect(Collectors.toList());
List<ExecutorProperties> actual = executors.stream()
.filter(each -> Optional.ofNullable(each.getAlarm())
.orElse(false))
.collect(Collectors.toList());
if (!globalAlarm && CollectionUtil.isEmpty(actual)) {
return resultMap;
}
for (ExecutorProperties executorProperties : executors) {
Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig = buildSingleNotifyConfig(executorProperties);
Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig =
buildSingleNotifyConfig(executorProperties.getThreadPoolId(), executorProperties);
initCacheAndLock(buildSingleNotifyConfig);
resultMap.putAll(buildSingleNotifyConfig);
}
// register notify config for web
WebExecutorProperties webProperties = configProperties.getWeb();
Map<String, List<NotifyConfigDTO>> webSingleNotifyConfigMap =
buildSingleNotifyConfig(webThreadPoolService.getWebContainerType().name(), webProperties);
initCacheAndLock(webSingleNotifyConfigMap);
resultMap.putAll(webSingleNotifyConfigMap);
return resultMap;
}
@ -69,9 +83,8 @@ public class ConfigModeNotifyConfigBuilder implements NotifyConfigBuilder {
* @param executorProperties
* @return
*/
public Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig(ExecutorProperties executorProperties) {
public Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig(String threadPoolId, IExecutorProperties executorProperties) {
Map<String, List<NotifyConfigDTO>> resultMap = new HashMap<>();
String threadPoolId = executorProperties.getThreadPoolId();
String alarmBuildKey = threadPoolId + "+ALARM";
List<NotifyConfigDTO> alarmNotifyConfigs = new ArrayList<>();
List<NotifyPlatformProperties> notifyPlatforms = configProperties.getNotifyPlatforms();
@ -82,10 +95,7 @@ public class ConfigModeNotifyConfigBuilder implements NotifyConfigBuilder {
notifyConfig.setType("ALARM");
notifyConfig.setSecret(platformProperties.getSecret());
notifyConfig.setSecretKey(getToken(platformProperties));
int interval = Optional.ofNullable(executorProperties.getNotify())
.map(each -> each.getInterval())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getNotify()).map(each -> each.getInterval()).orElse(5));
notifyConfig.setInterval(interval);
notifyConfig.setInterval(buildInterval(executorProperties));
notifyConfig.setReceives(buildReceive(executorProperties));
alarmNotifyConfigs.add(notifyConfig);
}
@ -113,12 +123,21 @@ public class ConfigModeNotifyConfigBuilder implements NotifyConfigBuilder {
.forEach(each -> alarmControlHandler.initCacheAndLock(each.getTpId(), each.getPlatform(), each.getInterval())));
}
private String buildReceive(ExecutorProperties executorProperties) {
String receives = Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getNotify()).map(each -> each.getReceives()).orElse("");
if (executorProperties.getNotify() != null && StringUtil.isNotEmpty(executorProperties.getNotify().getReceives())) {
receives = executorProperties.getNotify().getReceives();
}
return receives;
private int buildInterval(IExecutorProperties executorProperties) {
return Optional.ofNullable(executorProperties.getNotify())
.map(DynamicThreadPoolNotifyProperties::getInterval)
.orElse(Optional.ofNullable(configProperties.getDefaultExecutor())
.map(ExecutorProperties::getNotify)
.map(DynamicThreadPoolNotifyProperties::getInterval)
.orElse(5));
}
private String buildReceive(IExecutorProperties executorProperties) {
return Optional.ofNullable(executorProperties.getNotify())
.map(DynamicThreadPoolNotifyProperties::getReceives)
.orElse(Optional.ofNullable(configProperties.getDefaultExecutor())
.map(ExecutorProperties::getNotify)
.map(DynamicThreadPoolNotifyProperties::getReceives).orElse(""));
}
private String getToken(NotifyPlatformProperties platformProperties) {

@ -30,6 +30,7 @@ import cn.hippo4j.config.springboot.starter.support.GlobalCoreThreadPoolManage;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.message.dto.NotifyConfigDTO;
import cn.hippo4j.message.enums.NotifyTypeEnum;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.service.GlobalNotifyAlarmManage;
import cn.hippo4j.message.service.Hippo4jBaseSendMessageService;
@ -166,7 +167,8 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
boolean checkNotifyConfig = false;
boolean checkNotifyAlarm = false;
List<String> changeKeys = new ArrayList<>();
Map<String, List<NotifyConfigDTO>> newDynamicThreadPoolNotifyMap = configModeNotifyConfigBuilder.buildSingleNotifyConfig(executorProperties);
Map<String, List<NotifyConfigDTO>> newDynamicThreadPoolNotifyMap =
configModeNotifyConfigBuilder.buildSingleNotifyConfig(executorProperties.getThreadPoolId(), executorProperties);
Map<String, List<NotifyConfigDTO>> notifyConfigs = hippo4jBaseSendMessageService.getNotifyConfigs();
if (CollectionUtil.isNotEmpty(notifyConfigs)) {
for (Map.Entry<String, List<NotifyConfigDTO>> each : newDynamicThreadPoolNotifyMap.entrySet()) {

@ -48,7 +48,7 @@ public class PlatformsRefreshListener extends AbstractRefreshListener<ExecutorPr
if (wrapper != null && !wrapper.isInitFlag()) {
Hippo4jBaseSendMessageService sendMessageService = ApplicationContextHolder.getBean(Hippo4jBaseSendMessageService.class);
ConfigModeNotifyConfigBuilder configBuilder = ApplicationContextHolder.getBean(ConfigModeNotifyConfigBuilder.class);
Map<String, List<NotifyConfigDTO>> notifyConfig = configBuilder.buildSingleNotifyConfig(executorProperties);
Map<String, List<NotifyConfigDTO>> notifyConfig = configBuilder.buildSingleNotifyConfig(threadPoolId, executorProperties);
sendMessageService.putPlatform(notifyConfig);
wrapper.setInitFlag(Boolean.TRUE);
}

@ -19,19 +19,19 @@ package cn.hippo4j.config.springboot.starter.refresher.event;
import cn.hippo4j.adapter.web.WebThreadPoolHandlerChoose;
import cn.hippo4j.adapter.web.WebThreadPoolService;
import cn.hippo4j.common.api.NotifyRequest;
import cn.hippo4j.common.api.ThreadPoolConfigChange;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.config.springboot.starter.config.WebThreadPoolProperties;
import cn.hippo4j.config.springboot.starter.config.WebExecutorProperties;
import cn.hippo4j.message.enums.NotifyTypeEnum;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.request.WebChangeParameterNotifyRequest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.Order;
import java.util.Objects;
import java.util.Optional;
import static cn.hippo4j.config.springboot.starter.refresher.event.Hippo4jConfigDynamicRefreshEventOrder.WEB_EXECUTOR_LISTENER;
@ -41,7 +41,7 @@ import static cn.hippo4j.config.springboot.starter.refresher.event.Hippo4jConfig
@Slf4j
@Order(WEB_EXECUTOR_LISTENER)
@SuppressWarnings("all")
public class WebExecutorRefreshListener extends AbstractRefreshListener<WebThreadPoolProperties> {
public class WebExecutorRefreshListener extends AbstractRefreshListener<WebExecutorProperties> {
private final ThreadPoolConfigChange configChange;
@ -50,7 +50,7 @@ public class WebExecutorRefreshListener extends AbstractRefreshListener<WebThrea
}
@Override
public String getNodes(WebThreadPoolProperties properties) {
public String getNodes(WebExecutorProperties properties) {
return properties.getNodes();
}
@ -79,9 +79,10 @@ public class WebExecutorRefreshListener extends AbstractRefreshListener<WebThrea
}
if (!Objects.equals(beforeParameter.getCoreSize(), nowParameter.getCoreSize())
|| !Objects.equals(beforeParameter.getMaxSize(), nowParameter.getMaxSize())
|| !Objects.equals(beforeParameter.getKeepAliveTime(), nowParameter.getKeepAliveTime())) {
|| !Objects.equals(beforeParameter.getKeepAliveTime(), nowParameter.getKeepAliveTime())
|| !webThreadPoolService.isContainerStarted()) {
webThreadPoolService.updateWebThreadPool(nowParameter);
configChange.sendPoolConfigChange(buildChangeRequest(beforeParameter, nowParameter));
configChange.sendPoolConfigChange(buildChangeRequest(beforeParameter, nowParameter, webThreadPoolService));
}
}
} catch (Exception ex) {
@ -90,24 +91,27 @@ public class WebExecutorRefreshListener extends AbstractRefreshListener<WebThrea
}
/**
* Constructing a request for thread pool parameter change notification
* Constructing a request for web thread pool parameter change notification
* @param before
* @param now
* @return
*/
private ChangeParameterNotifyRequest buildChangeRequest(ThreadPoolParameter before, ThreadPoolParameter now) {
return ChangeParameterNotifyRequest.builder()
private WebChangeParameterNotifyRequest buildChangeRequest(ThreadPoolParameter before, ThreadPoolParameter now,
WebThreadPoolService webThreadPoolService) {
WebChangeParameterNotifyRequest changeNotifyRequest = WebChangeParameterNotifyRequest.builder()
.beforeCorePoolSize(before.getCoreSize())
.nowCorePoolSize(now.getCoreSize())
.beforeMaximumPoolSize(before.getMaxSize())
.nowMaximumPoolSize(now.getMaxSize())
.beforeKeepAliveTime(before.getKeepAliveTime())
.nowKeepAliveTime(now.getKeepAliveTime()).build();
changeNotifyRequest.setThreadPoolId(webThreadPoolService.getWebContainerType().name());
return changeNotifyRequest;
}
private ThreadPoolParameterInfo buildWebPoolParameter(BootstrapConfigProperties bindableCoreProperties) {
ThreadPoolParameterInfo threadPoolParameterInfo = null;
WebThreadPoolProperties webThreadPoolProperties = bindableCoreProperties.getWeb();
WebExecutorProperties webThreadPoolProperties = bindableCoreProperties.getWeb();
if (webThreadPoolProperties != null && webThreadPoolProperties.getEnable() && match(webThreadPoolProperties)) {
threadPoolParameterInfo = ThreadPoolParameterInfo.builder()

Loading…
Cancel
Save