Implement the ability to notify changes to the web container thread pool under the Config mode. (#1133)

* Add notification logic after modifying web thread pool parameters.

* Refactor the notification module to enable notification capability for the web container.

* Add application profile enum.

* Revert the bugfix changes and split them into issue #1134.

* Revert @Order annotation in AbstractWebThreadPoolService.

* fix profile enum name with test environment.

* Fix: remove unused imports.

* Delete ProfileEnum class.

* Modify the way of obtaining the ID of the web thread pool.

* Move the IExecutorProperties class to the common module.

* Narrow the scope of @SuppressWarnings annotation usage.
pull/1154/head
yanrongzhen 2 years ago committed by GitHub
parent ffaa4fbd4d
commit f54d915dec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -18,20 +18,10 @@
package cn.hippo4j.adapter.web.jetty; package cn.hippo4j.adapter.web.jetty;
import cn.hippo4j.adapter.web.DefaultAbstractWebThreadPoolService; import cn.hippo4j.adapter.web.DefaultAbstractWebThreadPoolService;
import cn.hippo4j.adapter.web.IWebThreadPoolHandlerSupport;
import cn.hippo4j.common.constant.ChangeThreadPoolConstants;
import cn.hippo4j.common.enums.WebContainerEnum;
import cn.hippo4j.common.model.ThreadPoolBaseInfo;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import cn.hippo4j.common.toolkit.ReflectUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.springframework.boot.web.embedded.jetty.JettyWebServer; import org.springframework.boot.web.embedded.jetty.JettyWebServer;
import org.springframework.boot.web.server.WebServer; import org.springframework.boot.web.server.WebServer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
/** /**

@ -91,7 +91,7 @@ public class TomcatWebThreadPoolHandlerSupport implements IWebThreadPoolHandlerS
long keepAliveTime = tomcatThreadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS); long keepAliveTime = tomcatThreadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS);
parameterInfo.setCoreSize(minThreads); parameterInfo.setCoreSize(minThreads);
parameterInfo.setMaxSize(maxThreads); parameterInfo.setMaxSize(maxThreads);
parameterInfo.setKeepAliveTime((int) keepAliveTime); parameterInfo.setKeepAliveTime(keepAliveTime);
} catch (Exception ex) { } catch (Exception ex) {
log.error("Failed to get the tomcat thread pool parameter.", ex); log.error("Failed to get the tomcat thread pool parameter.", ex);
} }

@ -22,12 +22,6 @@ import java.util.Objects;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import cn.hippo4j.adapter.web.DefaultAbstractWebThreadPoolService; import cn.hippo4j.adapter.web.DefaultAbstractWebThreadPoolService;
import cn.hippo4j.adapter.web.IWebThreadPoolHandlerSupport;
import cn.hippo4j.common.enums.WebContainerEnum;
import cn.hippo4j.common.model.ThreadPoolBaseInfo;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import io.undertow.Undertow; import io.undertow.Undertow;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;

@ -82,7 +82,7 @@ public class UndertowWebThreadPoolHandlerSupport implements IWebThreadPoolHandle
XnioWorker xnioWorker = (XnioWorker) executor; XnioWorker xnioWorker = (XnioWorker) executor;
int minThreads = xnioWorker.getOption(Options.WORKER_TASK_CORE_THREADS); int minThreads = xnioWorker.getOption(Options.WORKER_TASK_CORE_THREADS);
int maxThreads = xnioWorker.getOption(Options.WORKER_TASK_MAX_THREADS); int maxThreads = xnioWorker.getOption(Options.WORKER_TASK_MAX_THREADS);
int keepAliveTime = xnioWorker.getOption(Options.WORKER_TASK_KEEPALIVE); long keepAliveTime = xnioWorker.getOption(Options.WORKER_TASK_KEEPALIVE);
parameterInfo.setCoreSize(minThreads); parameterInfo.setCoreSize(minThreads);
parameterInfo.setMaxSize(maxThreads); parameterInfo.setMaxSize(maxThreads);
parameterInfo.setKeepAliveTime(keepAliveTime); parameterInfo.setKeepAliveTime(keepAliveTime);
@ -135,7 +135,7 @@ public class UndertowWebThreadPoolHandlerSupport implements IWebThreadPoolHandle
XnioWorker xnioWorker = (XnioWorker) executor; XnioWorker xnioWorker = (XnioWorker) executor;
Integer coreSize = threadPoolParameterInfo.corePoolSizeAdapt(); Integer coreSize = threadPoolParameterInfo.corePoolSizeAdapt();
Integer maxSize = threadPoolParameterInfo.maximumPoolSizeAdapt(); Integer maxSize = threadPoolParameterInfo.maximumPoolSizeAdapt();
Integer keepAliveTime = threadPoolParameterInfo.getKeepAliveTime(); int keepAliveTime = threadPoolParameterInfo.getKeepAliveTime().intValue();
int originalCoreSize = xnioWorker.getOption(Options.WORKER_TASK_CORE_THREADS); int originalCoreSize = xnioWorker.getOption(Options.WORKER_TASK_CORE_THREADS);
int originalMaximumPoolSize = xnioWorker.getOption(Options.WORKER_TASK_MAX_THREADS); int originalMaximumPoolSize = xnioWorker.getOption(Options.WORKER_TASK_MAX_THREADS);
int originalKeepAliveTime = xnioWorker.getOption(Options.WORKER_TASK_KEEPALIVE); int originalKeepAliveTime = xnioWorker.getOption(Options.WORKER_TASK_KEEPALIVE);

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.config.springboot.starter.config; package cn.hippo4j.common.api;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
@ -27,7 +27,7 @@ import lombok.NoArgsConstructor;
@Data @Data
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
public class DynamicThreadPoolNotifyProperties { public class ExecutorNotifyProperties {
/** /**
* Thread pool run alarm interval. unit: s * Thread pool run alarm interval. unit: s

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.common.api;
/**
* Interface for thread pool configuration.
*/
public interface IExecutorProperties {
/**
* Thread pool id
*/
String getThreadPoolId();
/**
* Core pool size
*/
Integer getCorePoolSize();
/**
* Maximum pool size
*/
Integer getMaximumPoolSize();
/**
* Keep alive time
*/
Long getKeepAliveTime();
/**
* Notify configs
*/
ExecutorNotifyProperties getNotify();
}

@ -76,7 +76,7 @@ public interface ThreadPoolParameter {
* *
* @return * @return
*/ */
Integer getKeepAliveTime(); Long getKeepAliveTime();
/** /**
* Get execute time out * Get execute time out

@ -94,7 +94,7 @@ public class ThreadPoolParameterInfo implements ThreadPoolParameter, Serializabl
/** /**
* Keep alive time * Keep alive time
*/ */
private Integer keepAliveTime; private Long keepAliveTime;
/** /**
* Execute time out * Execute time out

@ -29,7 +29,7 @@ public class ContentUtilTest {
":1,\"capacityAlarm\":80,\"livenessAlarm\":80,\"allowCoreThreadTimeOut\":1}"; ":1,\"capacityAlarm\":80,\"livenessAlarm\":80,\"allowCoreThreadTimeOut\":1}";
ThreadPoolParameterInfo threadPoolParameterInfo = ThreadPoolParameterInfo.builder().tenantId("prescription") ThreadPoolParameterInfo threadPoolParameterInfo = ThreadPoolParameterInfo.builder().tenantId("prescription")
.itemId("dynamic-threadpool-example").tpId("message-consume").content("描述信息").corePoolSize(1) .itemId("dynamic-threadpool-example").tpId("message-consume").content("描述信息").corePoolSize(1)
.maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513).executeTimeOut(null).rejectedType(4) .maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513L).executeTimeOut(null).rejectedType(4)
.isAlarm(1).capacityAlarm(80).livenessAlarm(80).allowCoreThreadTimeOut(1).build(); .isAlarm(1).capacityAlarm(80).livenessAlarm(80).allowCoreThreadTimeOut(1).build();
Assert.isTrue(testText.equals(ContentUtil.getPoolContent(threadPoolParameterInfo))); Assert.isTrue(testText.equals(ContentUtil.getPoolContent(threadPoolParameterInfo)));
} }

@ -53,7 +53,7 @@ public class Md5UtilTest {
String md5Result = "ef5ea7cb47377fb9fb85a7125e76715d"; String md5Result = "ef5ea7cb47377fb9fb85a7125e76715d";
ThreadPoolParameterInfo threadPoolParameterInfo = ThreadPoolParameterInfo.builder().tenantId("prescription") ThreadPoolParameterInfo threadPoolParameterInfo = ThreadPoolParameterInfo.builder().tenantId("prescription")
.itemId("dynamic-threadpool-example").tpId("message-consume").content("描述信息").corePoolSize(1) .itemId("dynamic-threadpool-example").tpId("message-consume").content("描述信息").corePoolSize(1)
.maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513).executeTimeOut(null).rejectedType(4) .maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513L).executeTimeOut(null).rejectedType(4)
.isAlarm(1).capacityAlarm(80).livenessAlarm(80).allowCoreThreadTimeOut(1).build(); .isAlarm(1).capacityAlarm(80).livenessAlarm(80).allowCoreThreadTimeOut(1).build();
Assert.isTrue(md5Result.equals(Md5Util.getTpContentMd5(threadPoolParameterInfo))); Assert.isTrue(md5Result.equals(Md5Util.getTpContentMd5(threadPoolParameterInfo)));
} }

@ -30,7 +30,7 @@ public class SingletonTest {
Assert.assertEquals("hippo4j", Singleton.get("userName")); Assert.assertEquals("hippo4j", Singleton.get("userName"));
ThreadPoolParameterInfo threadPoolParameterInfo = ThreadPoolParameterInfo.builder().tenantId("prescription") ThreadPoolParameterInfo threadPoolParameterInfo = ThreadPoolParameterInfo.builder().tenantId("prescription")
.itemId("dynamic-threadpool-example").tpId("message-consume").content("描述信息").corePoolSize(1) .itemId("dynamic-threadpool-example").tpId("message-consume").content("描述信息").corePoolSize(1)
.maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513).executeTimeOut(null).rejectedType(4) .maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513L).executeTimeOut(null).rejectedType(4)
.isAlarm(1).capacityAlarm(80).livenessAlarm(80).allowCoreThreadTimeOut(1).build(); .isAlarm(1).capacityAlarm(80).livenessAlarm(80).allowCoreThreadTimeOut(1).build();
Singleton.put(threadPoolParameterInfo); Singleton.put(threadPoolParameterInfo);
Assert.assertEquals(threadPoolParameterInfo, Singleton.get(ThreadPoolParameterInfo.class.getName())); Assert.assertEquals(threadPoolParameterInfo, Singleton.get(ThreadPoolParameterInfo.class.getName()));

@ -44,7 +44,7 @@ import static cn.hippo4j.message.platform.constant.LarkAlarmConstants.*;
*/ */
@Slf4j @Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor
public class LarkSendMessageHandler implements SendMessageHandler<AlarmNotifyRequest, ChangeParameterNotifyRequest> { public class LarkSendMessageHandler implements SendMessageHandler {
@Override @Override
public String getType() { public String getType() {

@ -32,7 +32,7 @@ import java.util.Objects;
/** /**
* Abstract robot send message handler. * Abstract robot send message handler.
*/ */
public abstract class AbstractRobotSendMessageHandler implements SendMessageHandler<AlarmNotifyRequest, ChangeParameterNotifyRequest> { public abstract class AbstractRobotSendMessageHandler implements SendMessageHandler {
/** /**
* Build message actual content. * Build message actual content.

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.message.request;
import cn.hippo4j.message.request.base.BaseNotifyRequest;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* Change parameter notify request for web thread pool.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class WebChangeParameterNotifyRequest extends BaseNotifyRequest {
private String active;
private String appName;
private String identify;
private Integer beforeCorePoolSize;
private Integer nowCorePoolSize;
private Integer beforeMaximumPoolSize;
private Integer nowMaximumPoolSize;
private Long beforeKeepAliveTime;
private Long nowKeepAliveTime;
}

@ -18,6 +18,7 @@
package cn.hippo4j.message.request.base; package cn.hippo4j.message.request.base;
import cn.hippo4j.common.api.NotifyRequest; import cn.hippo4j.common.api.NotifyRequest;
import cn.hippo4j.message.enums.NotifyTypeEnum;
import lombok.Data; import lombok.Data;
/** /**

@ -25,6 +25,7 @@ import cn.hippo4j.message.dto.NotifyConfigDTO;
import cn.hippo4j.message.enums.NotifyTypeEnum; import cn.hippo4j.message.enums.NotifyTypeEnum;
import cn.hippo4j.message.request.AlarmNotifyRequest; import cn.hippo4j.message.request.AlarmNotifyRequest;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest; import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.request.WebChangeParameterNotifyRequest;
import lombok.Getter; import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -53,11 +54,7 @@ public class Hippo4jBaseSendMessageService implements Hippo4jSendMessageService,
@Override @Override
public void sendAlarmMessage(NotifyTypeEnum typeEnum, AlarmNotifyRequest alarmNotifyRequest) { public void sendAlarmMessage(NotifyTypeEnum typeEnum, AlarmNotifyRequest alarmNotifyRequest) {
String threadPoolId = alarmNotifyRequest.getThreadPoolId(); String threadPoolId = alarmNotifyRequest.getThreadPoolId();
String buildKey = new StringBuilder() String buildKey = generateAlarmKey(threadPoolId);
.append(threadPoolId)
.append("+")
.append("ALARM")
.toString();
List<NotifyConfigDTO> notifyList = notifyConfigs.get(buildKey); List<NotifyConfigDTO> notifyList = notifyConfigs.get(buildKey);
if (CollectionUtil.isEmpty(notifyList)) { if (CollectionUtil.isEmpty(notifyList)) {
return; return;
@ -82,11 +79,7 @@ public class Hippo4jBaseSendMessageService implements Hippo4jSendMessageService,
@Override @Override
public void sendChangeMessage(ChangeParameterNotifyRequest changeParameterNotifyRequest) { public void sendChangeMessage(ChangeParameterNotifyRequest changeParameterNotifyRequest) {
String threadPoolId = changeParameterNotifyRequest.getThreadPoolId(); String threadPoolId = changeParameterNotifyRequest.getThreadPoolId();
String buildKey = new StringBuilder() String buildKey = generateConfigKey(threadPoolId);
.append(threadPoolId)
.append("+")
.append("CONFIG")
.toString();
List<NotifyConfigDTO> notifyList = notifyConfigs.get(buildKey); List<NotifyConfigDTO> notifyList = notifyConfigs.get(buildKey);
if (CollectionUtil.isEmpty(notifyList)) { if (CollectionUtil.isEmpty(notifyList)) {
log.warn("[{}] Please configure alarm notification on the server.", threadPoolId); log.warn("[{}] Please configure alarm notification on the server.", threadPoolId);
@ -106,6 +99,45 @@ public class Hippo4jBaseSendMessageService implements Hippo4jSendMessageService,
}); });
} }
@Override
public void sendChangeMessage(WebChangeParameterNotifyRequest webChangeParameterNotifyRequest) {
String threadPoolId = webChangeParameterNotifyRequest.getThreadPoolId();
String buildKey = generateConfigKey(threadPoolId);
List<NotifyConfigDTO> notifyList = notifyConfigs.get(buildKey);
if (CollectionUtil.isEmpty(notifyList)) {
log.warn("[{}] Please configure alarm notification on the server.", threadPoolId);
return;
}
notifyList.forEach(each -> {
try {
SendMessageHandler messageHandler = sendMessageHandlers.get(each.getPlatform());
if (messageHandler == null) {
log.warn("[{}] Please configure alarm notification on the server.", threadPoolId);
return;
}
messageHandler.sendWebChangeMessage(each, webChangeParameterNotifyRequest);
} catch (Exception ex) {
log.warn("Failed to send thread pool change notification. key: [{}]", threadPoolId, ex);
}
});
}
private String generateConfigKey(String threadPoolId) {
return new StringBuilder()
.append(threadPoolId)
.append("+")
.append("CONFIG")
.toString();
}
private String generateAlarmKey(String threadPoolId) {
return new StringBuilder()
.append(threadPoolId)
.append("+")
.append("ALARM")
.toString();
}
/** /**
* Is send alarm. * Is send alarm.
* *

@ -20,6 +20,7 @@ package cn.hippo4j.message.service;
import cn.hippo4j.message.enums.NotifyTypeEnum; import cn.hippo4j.message.enums.NotifyTypeEnum;
import cn.hippo4j.message.request.AlarmNotifyRequest; import cn.hippo4j.message.request.AlarmNotifyRequest;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest; import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.request.WebChangeParameterNotifyRequest;
/** /**
* Hippo-4j send message service. * Hippo-4j send message service.
@ -40,4 +41,11 @@ public interface Hippo4jSendMessageService {
* @param changeParameterNotifyRequest change parameter notify request * @param changeParameterNotifyRequest change parameter notify request
*/ */
void sendChangeMessage(ChangeParameterNotifyRequest changeParameterNotifyRequest); void sendChangeMessage(ChangeParameterNotifyRequest changeParameterNotifyRequest);
/**
* Send web thread pool parameter change notification.
*
* @param webChangeParameterNotifyRequest change parameter notify request
*/
void sendChangeMessage(WebChangeParameterNotifyRequest webChangeParameterNotifyRequest);
} }

@ -19,11 +19,14 @@ package cn.hippo4j.message.service;
import cn.hippo4j.message.dto.NotifyConfigDTO; import cn.hippo4j.message.dto.NotifyConfigDTO;
import cn.hippo4j.common.api.NotifyRequest; import cn.hippo4j.common.api.NotifyRequest;
import cn.hippo4j.message.request.AlarmNotifyRequest;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.request.WebChangeParameterNotifyRequest;
/** /**
* Send message handler. * Send message handler.
*/ */
public interface SendMessageHandler<T extends NotifyRequest, R extends NotifyRequest> { public interface SendMessageHandler {
/** /**
* Get the message send type. * Get the message send type.
@ -38,7 +41,7 @@ public interface SendMessageHandler<T extends NotifyRequest, R extends NotifyReq
* @param notifyConfig notify config * @param notifyConfig notify config
* @param alarmNotifyRequest alarm notify request * @param alarmNotifyRequest alarm notify request
*/ */
void sendAlarmMessage(NotifyConfigDTO notifyConfig, T alarmNotifyRequest); void sendAlarmMessage(NotifyConfigDTO notifyConfig, AlarmNotifyRequest alarmNotifyRequest);
/** /**
* Send change message. * Send change message.
@ -46,5 +49,15 @@ public interface SendMessageHandler<T extends NotifyRequest, R extends NotifyReq
* @param notifyConfig notify config * @param notifyConfig notify config
* @param changeParameterNotifyRequest change parameter notify request * @param changeParameterNotifyRequest change parameter notify request
*/ */
void sendChangeMessage(NotifyConfigDTO notifyConfig, R changeParameterNotifyRequest); void sendChangeMessage(NotifyConfigDTO notifyConfig, ChangeParameterNotifyRequest changeParameterNotifyRequest);
/**
* Send web change message.
*
* @param notifyConfig notify config
* @param changeParameterNotifyRequest parameter notify request
*/
default void sendWebChangeMessage(NotifyConfigDTO notifyConfig, WebChangeParameterNotifyRequest changeParameterNotifyRequest) throws IllegalAccessException {
throw new IllegalAccessException("Please implement this method before using it.");
}
} }

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.message.service;
import cn.hippo4j.common.api.ThreadPoolConfigChange;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.request.WebChangeParameterNotifyRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
/**
* Web thread-pool config change handler.
*/
@RequiredArgsConstructor
@Slf4j
public class WebThreadPoolConfigChangeHandler implements ThreadPoolConfigChange<WebChangeParameterNotifyRequest> {
@Value("${spring.profiles.active:UNKNOWN}")
private String active;
@Value("${spring.dynamic.thread-pool.item-id:}")
private String itemId;
@Value("${spring.application.name:UNKNOWN}")
private String applicationName;
private final Hippo4jSendMessageService hippo4jSendMessageService;
/**
* Send pool config change message for web.
*
* @param requestParam change parameter notify request
*/
@Override
public void sendPoolConfigChange(WebChangeParameterNotifyRequest requestParam) {
try {
requestParam.setActive(active.toUpperCase());
String appName = StringUtil.isBlank(itemId) ? applicationName : itemId;
requestParam.setAppName(appName);
requestParam.setIdentify(IdentifyUtil.getIdentify());
hippo4jSendMessageService.sendChangeMessage(requestParam);
} catch (Throwable th) {
log.error("send web thread pool config change message failed.", th);
}
}
}

@ -81,7 +81,7 @@ public class ConfigInfoBase implements Serializable {
/** /**
* keepAliveTime * keepAliveTime
*/ */
private Integer keepAliveTime; private Long keepAliveTime;
/** /**
* executeTimeOut * executeTimeOut

@ -17,14 +17,7 @@
package cn.hippo4j.config.springboot1x.starter.web.jetty; package cn.hippo4j.config.springboot1x.starter.web.jetty;
import cn.hippo4j.adapter.web.IWebThreadPoolHandlerSupport;
import cn.hippo4j.adapter.web.jetty.JettyWebThreadPoolHandlerSupport; import cn.hippo4j.adapter.web.jetty.JettyWebThreadPoolHandlerSupport;
import cn.hippo4j.common.enums.WebContainerEnum;
import cn.hippo4j.common.model.ThreadPoolBaseInfo;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import cn.hippo4j.common.web.exception.ServiceException;
import cn.hippo4j.config.springboot1x.starter.web.AbstractWebThreadPoolService1x; import cn.hippo4j.config.springboot1x.starter.web.AbstractWebThreadPoolService1x;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.embedded.jetty.JettyEmbeddedServletContainer; import org.springframework.boot.context.embedded.jetty.JettyEmbeddedServletContainer;

@ -78,8 +78,9 @@ public class BootstrapConfigProperties implements BootstrapPropertiesInterface {
/** /**
* web config * web config
* @since 1.5.0
*/ */
private WebThreadPoolProperties web; private WebExecutorProperties web;
/** /**
* Notify platforms. * Notify platforms.

@ -17,9 +17,11 @@
package cn.hippo4j.config.springboot.starter.config; package cn.hippo4j.config.springboot.starter.config;
import cn.hippo4j.adapter.web.WebThreadPoolService;
import cn.hippo4j.common.api.ThreadPoolCheckAlarm; import cn.hippo4j.common.api.ThreadPoolCheckAlarm;
import cn.hippo4j.common.api.ThreadPoolConfigChange; import cn.hippo4j.common.api.ThreadPoolConfigChange;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.config.springboot.starter.monitor.ThreadPoolMonitorExecutor; import cn.hippo4j.config.springboot.starter.monitor.ThreadPoolMonitorExecutor;
import cn.hippo4j.config.springboot.starter.notify.ConfigModeNotifyConfigBuilder; import cn.hippo4j.config.springboot.starter.notify.ConfigModeNotifyConfigBuilder;
import cn.hippo4j.config.springboot.starter.refresher.event.AdapterExecutorsRefreshListener; import cn.hippo4j.config.springboot.starter.refresher.event.AdapterExecutorsRefreshListener;
@ -34,11 +36,7 @@ import cn.hippo4j.core.enable.MarkerConfiguration;
import cn.hippo4j.core.handler.DynamicThreadPoolBannerHandler; import cn.hippo4j.core.handler.DynamicThreadPoolBannerHandler;
import cn.hippo4j.message.api.NotifyConfigBuilder; import cn.hippo4j.message.api.NotifyConfigBuilder;
import cn.hippo4j.message.config.MessageConfiguration; import cn.hippo4j.message.config.MessageConfiguration;
import cn.hippo4j.message.service.AlarmControlHandler; import cn.hippo4j.message.service.*;
import cn.hippo4j.message.service.DefaultThreadPoolCheckAlarmHandler;
import cn.hippo4j.message.service.DefaultThreadPoolConfigChangeHandler;
import cn.hippo4j.message.service.Hippo4jBaseSendMessageService;
import cn.hippo4j.message.service.Hippo4jSendMessageService;
import cn.hippo4j.springboot.starter.adapter.web.WebAdapterConfiguration; import cn.hippo4j.springboot.starter.adapter.web.WebAdapterConfiguration;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.ObjectProvider;
@ -54,6 +52,8 @@ import org.springframework.context.annotation.Import;
import org.springframework.core.Ordered; import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
import java.util.Optional;
/** /**
* Dynamic thread-pool auto-configuration. * Dynamic thread-pool auto-configuration.
* *
@ -82,8 +82,8 @@ public class DynamicThreadPoolAutoConfiguration {
} }
@Bean @Bean
public NotifyConfigBuilder notifyConfigBuilder(AlarmControlHandler alarmControlHandler) { public NotifyConfigBuilder notifyConfigBuilder(AlarmControlHandler alarmControlHandler, WebThreadPoolService webThreadPoolService) {
return new ConfigModeNotifyConfigBuilder(alarmControlHandler, bootstrapConfigProperties); return new ConfigModeNotifyConfigBuilder(alarmControlHandler, bootstrapConfigProperties, webThreadPoolService);
} }
@Bean @Bean
@ -94,10 +94,21 @@ public class DynamicThreadPoolAutoConfiguration {
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
public ThreadPoolConfigChange defaultThreadPoolConfigChangeHandler(Hippo4jSendMessageService hippo4jSendMessageService) { public DefaultThreadPoolConfigChangeHandler defaultThreadPoolConfigChangeHandler(Hippo4jSendMessageService hippo4jSendMessageService) {
return new DefaultThreadPoolConfigChangeHandler(hippo4jSendMessageService); return new DefaultThreadPoolConfigChangeHandler(hippo4jSendMessageService);
} }
@Bean
@ConditionalOnMissingBean
public WebThreadPoolConfigChangeHandler webThreadPoolConfigChangeHandler(BootstrapConfigProperties bootstrapConfigProperties,
WebThreadPoolService webThreadPoolService,
Hippo4jSendMessageService hippo4jSendMessageService) {
if (bootstrapConfigProperties.getWeb() != null && StringUtil.isBlank(bootstrapConfigProperties.getWeb().getThreadPoolId())) {
bootstrapConfigProperties.getWeb().setThreadPoolId(webThreadPoolService.getWebContainerType().name());
}
return new WebThreadPoolConfigChangeHandler(hippo4jSendMessageService);
}
@Bean @Bean
public DynamicThreadPoolPostProcessor dynamicThreadPoolPostProcessor(ApplicationContextHolder hippo4JApplicationContextHolder) { public DynamicThreadPoolPostProcessor dynamicThreadPoolPostProcessor(ApplicationContextHolder hippo4JApplicationContextHolder) {
return new DynamicThreadPoolPostProcessor(bootstrapConfigProperties); return new DynamicThreadPoolPostProcessor(bootstrapConfigProperties);
@ -110,7 +121,7 @@ public class DynamicThreadPoolAutoConfiguration {
@Bean @Bean
@SuppressWarnings("all") @SuppressWarnings("all")
public DynamicThreadPoolRefreshListener hippo4jExecutorsListener(ThreadPoolConfigChange threadPoolConfigChange, public DynamicThreadPoolRefreshListener hippo4jExecutorsListener(DefaultThreadPoolConfigChangeHandler threadPoolConfigChange,
ConfigModeNotifyConfigBuilder configModeNotifyConfigBuilder, ConfigModeNotifyConfigBuilder configModeNotifyConfigBuilder,
Hippo4jBaseSendMessageService hippoBaseSendMessageService) { Hippo4jBaseSendMessageService hippoBaseSendMessageService) {
return new DynamicThreadPoolRefreshListener(threadPoolConfigChange, configModeNotifyConfigBuilder, hippoBaseSendMessageService); return new DynamicThreadPoolRefreshListener(threadPoolConfigChange, configModeNotifyConfigBuilder, hippoBaseSendMessageService);
@ -127,8 +138,9 @@ public class DynamicThreadPoolAutoConfiguration {
} }
@Bean @Bean
public WebExecutorRefreshListener hippo4jWebExecutorListener() { @SuppressWarnings("all")
return new WebExecutorRefreshListener(); public WebExecutorRefreshListener hippo4jWebExecutorListener(WebThreadPoolConfigChangeHandler threadPoolConfigChange) {
return new WebExecutorRefreshListener(threadPoolConfigChange);
} }
@Bean @Bean

@ -17,6 +17,8 @@
package cn.hippo4j.config.springboot.starter.config; package cn.hippo4j.config.springboot.starter.config;
import cn.hippo4j.common.api.ExecutorNotifyProperties;
import cn.hippo4j.common.api.IExecutorProperties;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
@ -31,7 +33,7 @@ import lombok.experimental.Accessors;
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
@Accessors(chain = true) @Accessors(chain = true)
public class ExecutorProperties { public class ExecutorProperties implements IExecutorProperties {
/** /**
* Thread pool id * Thread pool id
@ -101,7 +103,7 @@ public class ExecutorProperties {
/** /**
* Notify * Notify
*/ */
private DynamicThreadPoolNotifyProperties notify; private ExecutorNotifyProperties notify;
/** /**
* Nodes, application startup is not affect, change properties is effect * Nodes, application startup is not affect, change properties is effect

@ -17,13 +17,20 @@
package cn.hippo4j.config.springboot.starter.config; package cn.hippo4j.config.springboot.starter.config;
import cn.hippo4j.common.api.ExecutorNotifyProperties;
import cn.hippo4j.common.api.IExecutorProperties;
import lombok.Data; import lombok.Data;
/** /**
* Web thread pool properties. * Web thread pool executor properties.
*/ */
@Data @Data
public class WebThreadPoolProperties { public class WebExecutorProperties implements IExecutorProperties {
/**
* Thread pool id
*/
private String threadPoolId;
/** /**
* Core pool size * Core pool size
@ -38,7 +45,7 @@ public class WebThreadPoolProperties {
/** /**
* Keep alive time * Keep alive time
*/ */
private Integer keepAliveTime; private Long keepAliveTime;
/** /**
* Nodes, application startup is not affect, change properties is effect * Nodes, application startup is not affect, change properties is effect
@ -49,4 +56,9 @@ public class WebThreadPoolProperties {
* these propertied is enabled? * these propertied is enabled?
*/ */
private Boolean enable = true; private Boolean enable = true;
/**
* Notify config
*/
private ExecutorNotifyProperties notify;
} }

@ -17,11 +17,12 @@
package cn.hippo4j.config.springboot.starter.notify; package cn.hippo4j.config.springboot.starter.notify;
import cn.hippo4j.adapter.web.WebThreadPoolService;
import cn.hippo4j.common.api.ExecutorNotifyProperties;
import cn.hippo4j.common.api.IExecutorProperties;
import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.StringUtil; import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties; import cn.hippo4j.config.springboot.starter.config.*;
import cn.hippo4j.config.springboot.starter.config.ExecutorProperties;
import cn.hippo4j.config.springboot.starter.config.NotifyPlatformProperties;
import cn.hippo4j.message.api.NotifyConfigBuilder; import cn.hippo4j.message.api.NotifyConfigBuilder;
import cn.hippo4j.message.dto.NotifyConfigDTO; import cn.hippo4j.message.dto.NotifyConfigDTO;
import cn.hippo4j.message.service.AlarmControlHandler; import cn.hippo4j.message.service.AlarmControlHandler;
@ -42,16 +43,23 @@ public class ConfigModeNotifyConfigBuilder implements NotifyConfigBuilder {
private final BootstrapConfigProperties configProperties; private final BootstrapConfigProperties configProperties;
private final WebThreadPoolService webThreadPoolService;
@Override @Override
public Map<String, List<NotifyConfigDTO>> buildNotify() { public Map<String, List<NotifyConfigDTO>> buildNotify() {
Map<String, List<NotifyConfigDTO>> resultMap = new HashMap<>(); Map<String, List<NotifyConfigDTO>> resultMap = new HashMap<>();
boolean globalAlarm = Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getAlarm()).orElse(true); boolean globalAlarm = Optional.ofNullable(configProperties.getDefaultExecutor())
.map(ExecutorProperties::getAlarm)
.orElse(true);
List<ExecutorProperties> executors = configProperties.getExecutors(); List<ExecutorProperties> executors = configProperties.getExecutors();
if (CollectionUtil.isEmpty(executors)) { if (CollectionUtil.isEmpty(executors)) {
log.warn("Failed to build notify, executors configuration is empty."); log.warn("Failed to build notify, executors configuration is empty.");
return resultMap; return resultMap;
} }
List<ExecutorProperties> actual = executors.stream().filter(each -> Optional.ofNullable(each.getAlarm()).orElse(false)).collect(Collectors.toList()); List<ExecutorProperties> actual = executors.stream()
.filter(each -> Optional.ofNullable(each.getAlarm())
.orElse(false))
.collect(Collectors.toList());
if (!globalAlarm && CollectionUtil.isEmpty(actual)) { if (!globalAlarm && CollectionUtil.isEmpty(actual)) {
return resultMap; return resultMap;
} }
@ -60,6 +68,16 @@ public class ConfigModeNotifyConfigBuilder implements NotifyConfigBuilder {
initCacheAndLock(buildSingleNotifyConfig); initCacheAndLock(buildSingleNotifyConfig);
resultMap.putAll(buildSingleNotifyConfig); resultMap.putAll(buildSingleNotifyConfig);
} }
// register notify config for web
WebExecutorProperties webProperties = configProperties.getWeb();
if (StringUtil.isBlank(webProperties.getThreadPoolId())) {
webProperties.setThreadPoolId(webThreadPoolService.getWebContainerType().name());
}
Map<String, List<NotifyConfigDTO>> webSingleNotifyConfigMap =
buildSingleNotifyConfig(webProperties);
initCacheAndLock(webSingleNotifyConfigMap);
resultMap.putAll(webSingleNotifyConfigMap);
return resultMap; return resultMap;
} }
@ -69,9 +87,9 @@ public class ConfigModeNotifyConfigBuilder implements NotifyConfigBuilder {
* @param executorProperties * @param executorProperties
* @return * @return
*/ */
public Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig(ExecutorProperties executorProperties) { public Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig(IExecutorProperties executorProperties) {
Map<String, List<NotifyConfigDTO>> resultMap = new HashMap<>();
String threadPoolId = executorProperties.getThreadPoolId(); String threadPoolId = executorProperties.getThreadPoolId();
Map<String, List<NotifyConfigDTO>> resultMap = new HashMap<>();
String alarmBuildKey = threadPoolId + "+ALARM"; String alarmBuildKey = threadPoolId + "+ALARM";
List<NotifyConfigDTO> alarmNotifyConfigs = new ArrayList<>(); List<NotifyConfigDTO> alarmNotifyConfigs = new ArrayList<>();
List<NotifyPlatformProperties> notifyPlatforms = configProperties.getNotifyPlatforms(); List<NotifyPlatformProperties> notifyPlatforms = configProperties.getNotifyPlatforms();
@ -82,10 +100,7 @@ public class ConfigModeNotifyConfigBuilder implements NotifyConfigBuilder {
notifyConfig.setType("ALARM"); notifyConfig.setType("ALARM");
notifyConfig.setSecret(platformProperties.getSecret()); notifyConfig.setSecret(platformProperties.getSecret());
notifyConfig.setSecretKey(getToken(platformProperties)); notifyConfig.setSecretKey(getToken(platformProperties));
int interval = Optional.ofNullable(executorProperties.getNotify()) notifyConfig.setInterval(buildInterval(executorProperties));
.map(each -> each.getInterval())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getNotify()).map(each -> each.getInterval()).orElse(5));
notifyConfig.setInterval(interval);
notifyConfig.setReceives(buildReceive(executorProperties)); notifyConfig.setReceives(buildReceive(executorProperties));
alarmNotifyConfigs.add(notifyConfig); alarmNotifyConfigs.add(notifyConfig);
} }
@ -113,12 +128,21 @@ public class ConfigModeNotifyConfigBuilder implements NotifyConfigBuilder {
.forEach(each -> alarmControlHandler.initCacheAndLock(each.getTpId(), each.getPlatform(), each.getInterval()))); .forEach(each -> alarmControlHandler.initCacheAndLock(each.getTpId(), each.getPlatform(), each.getInterval())));
} }
private String buildReceive(ExecutorProperties executorProperties) { private int buildInterval(IExecutorProperties executorProperties) {
String receives = Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getNotify()).map(each -> each.getReceives()).orElse(""); return Optional.ofNullable(executorProperties.getNotify())
if (executorProperties.getNotify() != null && StringUtil.isNotEmpty(executorProperties.getNotify().getReceives())) { .map(ExecutorNotifyProperties::getInterval)
receives = executorProperties.getNotify().getReceives(); .orElse(Optional.ofNullable(configProperties.getDefaultExecutor())
} .map(ExecutorProperties::getNotify)
return receives; .map(ExecutorNotifyProperties::getInterval)
.orElse(5));
}
private String buildReceive(IExecutorProperties executorProperties) {
return Optional.ofNullable(executorProperties.getNotify())
.map(ExecutorNotifyProperties::getReceives)
.orElse(Optional.ofNullable(configProperties.getDefaultExecutor())
.map(ExecutorProperties::getNotify)
.map(ExecutorNotifyProperties::getReceives).orElse(""));
} }
private String getToken(NotifyPlatformProperties platformProperties) { private String getToken(NotifyPlatformProperties platformProperties) {

@ -30,6 +30,7 @@ import cn.hippo4j.config.springboot.starter.support.GlobalCoreThreadPoolManage;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.message.dto.NotifyConfigDTO; import cn.hippo4j.message.dto.NotifyConfigDTO;
import cn.hippo4j.message.enums.NotifyTypeEnum;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest; import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.service.GlobalNotifyAlarmManage; import cn.hippo4j.message.service.GlobalNotifyAlarmManage;
import cn.hippo4j.message.service.Hippo4jBaseSendMessageService; import cn.hippo4j.message.service.Hippo4jBaseSendMessageService;
@ -166,7 +167,8 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
boolean checkNotifyConfig = false; boolean checkNotifyConfig = false;
boolean checkNotifyAlarm = false; boolean checkNotifyAlarm = false;
List<String> changeKeys = new ArrayList<>(); List<String> changeKeys = new ArrayList<>();
Map<String, List<NotifyConfigDTO>> newDynamicThreadPoolNotifyMap = configModeNotifyConfigBuilder.buildSingleNotifyConfig(executorProperties); Map<String, List<NotifyConfigDTO>> newDynamicThreadPoolNotifyMap =
configModeNotifyConfigBuilder.buildSingleNotifyConfig(executorProperties);
Map<String, List<NotifyConfigDTO>> notifyConfigs = hippo4jBaseSendMessageService.getNotifyConfigs(); Map<String, List<NotifyConfigDTO>> notifyConfigs = hippo4jBaseSendMessageService.getNotifyConfigs();
if (CollectionUtil.isNotEmpty(notifyConfigs)) { if (CollectionUtil.isNotEmpty(notifyConfigs)) {
for (Map.Entry<String, List<NotifyConfigDTO>> each : newDynamicThreadPoolNotifyMap.entrySet()) { for (Map.Entry<String, List<NotifyConfigDTO>> each : newDynamicThreadPoolNotifyMap.entrySet()) {

@ -19,16 +19,17 @@ package cn.hippo4j.config.springboot.starter.refresher.event;
import cn.hippo4j.adapter.web.WebThreadPoolHandlerChoose; import cn.hippo4j.adapter.web.WebThreadPoolHandlerChoose;
import cn.hippo4j.adapter.web.WebThreadPoolService; import cn.hippo4j.adapter.web.WebThreadPoolService;
import cn.hippo4j.common.api.ThreadPoolConfigChange;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.model.ThreadPoolParameter; import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo; import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties; import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.config.springboot.starter.config.WebThreadPoolProperties; import cn.hippo4j.config.springboot.starter.config.WebExecutorProperties;
import cn.hippo4j.message.request.WebChangeParameterNotifyRequest;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import static cn.hippo4j.config.springboot.starter.refresher.event.Hippo4jConfigDynamicRefreshEventOrder.WEB_EXECUTOR_LISTENER; import static cn.hippo4j.config.springboot.starter.refresher.event.Hippo4jConfigDynamicRefreshEventOrder.WEB_EXECUTOR_LISTENER;
@ -37,10 +38,17 @@ import static cn.hippo4j.config.springboot.starter.refresher.event.Hippo4jConfig
*/ */
@Slf4j @Slf4j
@Order(WEB_EXECUTOR_LISTENER) @Order(WEB_EXECUTOR_LISTENER)
public class WebExecutorRefreshListener extends AbstractRefreshListener<WebThreadPoolProperties> { @SuppressWarnings("rawtypes")
public class WebExecutorRefreshListener extends AbstractRefreshListener<WebExecutorProperties> {
private final ThreadPoolConfigChange configChange;
public WebExecutorRefreshListener(ThreadPoolConfigChange configChange) {
this.configChange = configChange;
}
@Override @Override
public String getNodes(WebThreadPoolProperties properties) { public String getNodes(WebExecutorProperties properties) {
return properties.getNodes(); return properties.getNodes();
} }
@ -71,6 +79,7 @@ public class WebExecutorRefreshListener extends AbstractRefreshListener<WebThrea
|| !Objects.equals(beforeParameter.getMaxSize(), nowParameter.getMaxSize()) || !Objects.equals(beforeParameter.getMaxSize(), nowParameter.getMaxSize())
|| !Objects.equals(beforeParameter.getKeepAliveTime(), nowParameter.getKeepAliveTime())) { || !Objects.equals(beforeParameter.getKeepAliveTime(), nowParameter.getKeepAliveTime())) {
webThreadPoolService.updateWebThreadPool(nowParameter); webThreadPoolService.updateWebThreadPool(nowParameter);
configChange.sendPoolConfigChange(buildChangeRequest(beforeParameter, nowParameter, webThreadPoolService));
} }
} }
} catch (Exception ex) { } catch (Exception ex) {
@ -78,9 +87,28 @@ public class WebExecutorRefreshListener extends AbstractRefreshListener<WebThrea
} }
} }
/**
* Constructing a request for web thread pool parameter change notification
* @param before
* @param now
* @return
*/
private WebChangeParameterNotifyRequest buildChangeRequest(ThreadPoolParameter before, ThreadPoolParameter now,
WebThreadPoolService webThreadPoolService) {
WebChangeParameterNotifyRequest changeNotifyRequest = WebChangeParameterNotifyRequest.builder()
.beforeCorePoolSize(before.getCoreSize())
.nowCorePoolSize(now.getCoreSize())
.beforeMaximumPoolSize(before.getMaxSize())
.nowMaximumPoolSize(now.getMaxSize())
.beforeKeepAliveTime(before.getKeepAliveTime())
.nowKeepAliveTime(now.getKeepAliveTime()).build();
changeNotifyRequest.setThreadPoolId(webThreadPoolService.getWebContainerType().name());
return changeNotifyRequest;
}
private ThreadPoolParameterInfo buildWebPoolParameter(BootstrapConfigProperties bindableCoreProperties) { private ThreadPoolParameterInfo buildWebPoolParameter(BootstrapConfigProperties bindableCoreProperties) {
ThreadPoolParameterInfo threadPoolParameterInfo = null; ThreadPoolParameterInfo threadPoolParameterInfo = null;
WebThreadPoolProperties webThreadPoolProperties = bindableCoreProperties.getWeb(); WebExecutorProperties webThreadPoolProperties = bindableCoreProperties.getWeb();
if (webThreadPoolProperties != null && webThreadPoolProperties.getEnable() && match(webThreadPoolProperties)) { if (webThreadPoolProperties != null && webThreadPoolProperties.getEnable() && match(webThreadPoolProperties)) {
threadPoolParameterInfo = ThreadPoolParameterInfo.builder() threadPoolParameterInfo = ThreadPoolParameterInfo.builder()

@ -23,7 +23,7 @@ import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.toolkit.ReflectUtil; import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.common.toolkit.StringUtil; import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties; import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.config.springboot.starter.config.DynamicThreadPoolNotifyProperties; import cn.hippo4j.common.api.ExecutorNotifyProperties;
import cn.hippo4j.config.springboot.starter.config.ExecutorProperties; import cn.hippo4j.config.springboot.starter.config.ExecutorProperties;
import cn.hippo4j.core.executor.DynamicThreadPool; import cn.hippo4j.core.executor.DynamicThreadPool;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
@ -229,7 +229,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
* @return thread-pool notify alarm * @return thread-pool notify alarm
*/ */
private ThreadPoolNotifyAlarm buildThreadPoolNotifyAlarm(ExecutorProperties executorProperties) { private ThreadPoolNotifyAlarm buildThreadPoolNotifyAlarm(ExecutorProperties executorProperties) {
DynamicThreadPoolNotifyProperties notify = Optional.ofNullable(executorProperties).map(ExecutorProperties::getNotify).orElse(null); ExecutorNotifyProperties notify = Optional.ofNullable(executorProperties).map(ExecutorProperties::getNotify).orElse(null);
boolean isAlarm = Optional.ofNullable(executorProperties.getAlarm()) boolean isAlarm = Optional.ofNullable(executorProperties.getAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getAlarm).orElse(true)); .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getAlarm).orElse(true));
int activeAlarm = Optional.ofNullable(executorProperties.getActiveAlarm()) int activeAlarm = Optional.ofNullable(executorProperties.getActiveAlarm())
@ -237,11 +237,11 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
int capacityAlarm = Optional.ofNullable(executorProperties.getCapacityAlarm()) int capacityAlarm = Optional.ofNullable(executorProperties.getCapacityAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getCapacityAlarm).orElse(80)); .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getCapacityAlarm).orElse(80));
int interval = Optional.ofNullable(notify) int interval = Optional.ofNullable(notify)
.map(DynamicThreadPoolNotifyProperties::getInterval) .map(ExecutorNotifyProperties::getInterval)
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNotify).map(DynamicThreadPoolNotifyProperties::getInterval).orElse(5)); .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNotify).map(ExecutorNotifyProperties::getInterval).orElse(5));
String receive = Optional.ofNullable(notify) String receive = Optional.ofNullable(notify)
.map(DynamicThreadPoolNotifyProperties::getReceives) .map(ExecutorNotifyProperties::getReceives)
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNotify).map(DynamicThreadPoolNotifyProperties::getReceives).orElse("")); .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNotify).map(ExecutorNotifyProperties::getReceives).orElse(""));
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(isAlarm, activeAlarm, capacityAlarm); ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(isAlarm, activeAlarm, capacityAlarm);
threadPoolNotifyAlarm.setInterval(interval); threadPoolNotifyAlarm.setInterval(interval);
threadPoolNotifyAlarm.setReceives(receive); threadPoolNotifyAlarm.setReceives(receive);

Loading…
Cancel
Save