Add notification logic after modifying web thread pool parameters.

pull/1133/head
yanrongzhen 3 years ago
parent 6df52eb796
commit 0550dbc3a1

@ -18,20 +18,10 @@
package cn.hippo4j.adapter.web.jetty;
import cn.hippo4j.adapter.web.DefaultAbstractWebThreadPoolService;
import cn.hippo4j.adapter.web.IWebThreadPoolHandlerSupport;
import cn.hippo4j.common.constant.ChangeThreadPoolConstants;
import cn.hippo4j.common.enums.WebContainerEnum;
import cn.hippo4j.common.model.ThreadPoolBaseInfo;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import cn.hippo4j.common.toolkit.ReflectUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.springframework.boot.web.embedded.jetty.JettyWebServer;
import org.springframework.boot.web.server.WebServer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
/**

@ -91,7 +91,7 @@ public class TomcatWebThreadPoolHandlerSupport implements IWebThreadPoolHandlerS
long keepAliveTime = tomcatThreadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS);
parameterInfo.setCoreSize(minThreads);
parameterInfo.setMaxSize(maxThreads);
parameterInfo.setKeepAliveTime((int) keepAliveTime);
parameterInfo.setKeepAliveTime(keepAliveTime);
} catch (Exception ex) {
log.error("Failed to get the tomcat thread pool parameter.", ex);
}

@ -22,12 +22,6 @@ import java.util.Objects;
import java.util.concurrent.Executor;
import cn.hippo4j.adapter.web.DefaultAbstractWebThreadPoolService;
import cn.hippo4j.adapter.web.IWebThreadPoolHandlerSupport;
import cn.hippo4j.common.enums.WebContainerEnum;
import cn.hippo4j.common.model.ThreadPoolBaseInfo;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import io.undertow.Undertow;
import lombok.extern.slf4j.Slf4j;

@ -82,7 +82,7 @@ public class UndertowWebThreadPoolHandlerSupport implements IWebThreadPoolHandle
XnioWorker xnioWorker = (XnioWorker) executor;
int minThreads = xnioWorker.getOption(Options.WORKER_TASK_CORE_THREADS);
int maxThreads = xnioWorker.getOption(Options.WORKER_TASK_MAX_THREADS);
int keepAliveTime = xnioWorker.getOption(Options.WORKER_TASK_KEEPALIVE);
long keepAliveTime = xnioWorker.getOption(Options.WORKER_TASK_KEEPALIVE);
parameterInfo.setCoreSize(minThreads);
parameterInfo.setMaxSize(maxThreads);
parameterInfo.setKeepAliveTime(keepAliveTime);
@ -135,7 +135,7 @@ public class UndertowWebThreadPoolHandlerSupport implements IWebThreadPoolHandle
XnioWorker xnioWorker = (XnioWorker) executor;
Integer coreSize = threadPoolParameterInfo.corePoolSizeAdapt();
Integer maxSize = threadPoolParameterInfo.maximumPoolSizeAdapt();
Integer keepAliveTime = threadPoolParameterInfo.getKeepAliveTime();
int keepAliveTime = threadPoolParameterInfo.getKeepAliveTime().intValue();
int originalCoreSize = xnioWorker.getOption(Options.WORKER_TASK_CORE_THREADS);
int originalMaximumPoolSize = xnioWorker.getOption(Options.WORKER_TASK_MAX_THREADS);
int originalKeepAliveTime = xnioWorker.getOption(Options.WORKER_TASK_KEEPALIVE);

@ -76,7 +76,7 @@ public interface ThreadPoolParameter {
*
* @return
*/
Integer getKeepAliveTime();
Long getKeepAliveTime();
/**
* Get execute time out

@ -94,7 +94,7 @@ public class ThreadPoolParameterInfo implements ThreadPoolParameter, Serializabl
/**
* Keep alive time
*/
private Integer keepAliveTime;
private Long keepAliveTime;
/**
* Execute time out

@ -29,7 +29,7 @@ public class ContentUtilTest {
":1,\"capacityAlarm\":80,\"livenessAlarm\":80,\"allowCoreThreadTimeOut\":1}";
ThreadPoolParameterInfo threadPoolParameterInfo = ThreadPoolParameterInfo.builder().tenantId("prescription")
.itemId("dynamic-threadpool-example").tpId("message-consume").content("描述信息").corePoolSize(1)
.maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513).executeTimeOut(null).rejectedType(4)
.maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513L).executeTimeOut(null).rejectedType(4)
.isAlarm(1).capacityAlarm(80).livenessAlarm(80).allowCoreThreadTimeOut(1).build();
Assert.isTrue(testText.equals(ContentUtil.getPoolContent(threadPoolParameterInfo)));
}

@ -53,7 +53,7 @@ public class Md5UtilTest {
String md5Result = "ef5ea7cb47377fb9fb85a7125e76715d";
ThreadPoolParameterInfo threadPoolParameterInfo = ThreadPoolParameterInfo.builder().tenantId("prescription")
.itemId("dynamic-threadpool-example").tpId("message-consume").content("描述信息").corePoolSize(1)
.maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513).executeTimeOut(null).rejectedType(4)
.maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513L).executeTimeOut(null).rejectedType(4)
.isAlarm(1).capacityAlarm(80).livenessAlarm(80).allowCoreThreadTimeOut(1).build();
Assert.isTrue(md5Result.equals(Md5Util.getTpContentMd5(threadPoolParameterInfo)));
}

@ -30,7 +30,7 @@ public class SingletonTest {
Assert.assertEquals("hippo4j", Singleton.get("userName"));
ThreadPoolParameterInfo threadPoolParameterInfo = ThreadPoolParameterInfo.builder().tenantId("prescription")
.itemId("dynamic-threadpool-example").tpId("message-consume").content("描述信息").corePoolSize(1)
.maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513).executeTimeOut(null).rejectedType(4)
.maximumPoolSize(2).queueType(1).capacity(4).keepAliveTime(513L).executeTimeOut(null).rejectedType(4)
.isAlarm(1).capacityAlarm(80).livenessAlarm(80).allowCoreThreadTimeOut(1).build();
Singleton.put(threadPoolParameterInfo);
Assert.assertEquals(threadPoolParameterInfo, Singleton.get(ThreadPoolParameterInfo.class.getName()));

@ -81,7 +81,7 @@ public class ConfigInfoBase implements Serializable {
/**
* keepAliveTime
*/
private Integer keepAliveTime;
private Long keepAliveTime;
/**
* executeTimeOut

@ -17,14 +17,7 @@
package cn.hippo4j.config.springboot1x.starter.web.jetty;
import cn.hippo4j.adapter.web.IWebThreadPoolHandlerSupport;
import cn.hippo4j.adapter.web.jetty.JettyWebThreadPoolHandlerSupport;
import cn.hippo4j.common.enums.WebContainerEnum;
import cn.hippo4j.common.model.ThreadPoolBaseInfo;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import cn.hippo4j.common.web.exception.ServiceException;
import cn.hippo4j.config.springboot1x.starter.web.AbstractWebThreadPoolService1x;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.embedded.jetty.JettyEmbeddedServletContainer;

@ -127,8 +127,9 @@ public class DynamicThreadPoolAutoConfiguration {
}
@Bean
public WebExecutorRefreshListener hippo4jWebExecutorListener() {
return new WebExecutorRefreshListener();
@SuppressWarnings("all")
public WebExecutorRefreshListener hippo4jWebExecutorListener(ThreadPoolConfigChange threadPoolConfigChange) {
return new WebExecutorRefreshListener(threadPoolConfigChange);
}
@Bean

@ -38,7 +38,7 @@ public class WebThreadPoolProperties {
/**
* Keep alive time
*/
private Integer keepAliveTime;
private Long keepAliveTime;
/**
* Nodes, application startup is not affect, change properties is effect

@ -19,11 +19,14 @@ package cn.hippo4j.config.springboot.starter.refresher.event;
import cn.hippo4j.adapter.web.WebThreadPoolHandlerChoose;
import cn.hippo4j.adapter.web.WebThreadPoolService;
import cn.hippo4j.common.api.NotifyRequest;
import cn.hippo4j.common.api.ThreadPoolConfigChange;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.config.springboot.starter.config.WebThreadPoolProperties;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.Order;
@ -37,8 +40,15 @@ import static cn.hippo4j.config.springboot.starter.refresher.event.Hippo4jConfig
*/
@Slf4j
@Order(WEB_EXECUTOR_LISTENER)
@SuppressWarnings("all")
public class WebExecutorRefreshListener extends AbstractRefreshListener<WebThreadPoolProperties> {
private final ThreadPoolConfigChange configChange;
public WebExecutorRefreshListener(ThreadPoolConfigChange configChange) {
this.configChange = configChange;
}
@Override
public String getNodes(WebThreadPoolProperties properties) {
return properties.getNodes();
@ -71,6 +81,7 @@ public class WebExecutorRefreshListener extends AbstractRefreshListener<WebThrea
|| !Objects.equals(beforeParameter.getMaxSize(), nowParameter.getMaxSize())
|| !Objects.equals(beforeParameter.getKeepAliveTime(), nowParameter.getKeepAliveTime())) {
webThreadPoolService.updateWebThreadPool(nowParameter);
configChange.sendPoolConfigChange(buildChangeRequest(beforeParameter, nowParameter));
}
}
} catch (Exception ex) {
@ -78,6 +89,22 @@ public class WebExecutorRefreshListener extends AbstractRefreshListener<WebThrea
}
}
/**
* Constructing a request for thread pool parameter change notification
* @param before
* @param now
* @return
*/
private ChangeParameterNotifyRequest buildChangeRequest(ThreadPoolParameter before, ThreadPoolParameter now) {
return ChangeParameterNotifyRequest.builder()
.beforeCorePoolSize(before.getCoreSize())
.nowCorePoolSize(now.getCoreSize())
.beforeMaximumPoolSize(before.getMaxSize())
.nowMaximumPoolSize(now.getMaxSize())
.beforeKeepAliveTime(before.getKeepAliveTime())
.nowKeepAliveTime(now.getKeepAliveTime()).build();
}
private ThreadPoolParameterInfo buildWebPoolParameter(BootstrapConfigProperties bindableCoreProperties) {
ThreadPoolParameterInfo threadPoolParameterInfo = null;
WebThreadPoolProperties webThreadPoolProperties = bindableCoreProperties.getWeb();

@ -24,6 +24,7 @@ import cn.hippo4j.adapter.web.tomcat.TomcatWebThreadPoolHandlerAdapt;
import cn.hippo4j.adapter.web.undertow.DefaultUndertowWebThreadPoolHandler;
import cn.hippo4j.adapter.web.WebThreadPoolRunStateHandler;
import cn.hippo4j.adapter.web.undertow.UndertowWebThreadPoolHandlerAdapt;
import cn.hippo4j.common.api.ThreadPoolConfigChange;
import io.undertow.Undertow;
import org.apache.catalina.startup.Tomcat;
import org.apache.coyote.UpgradeProtocol;

Loading…
Cancel
Save