feat: filter nodes that do not require a dynamic refresh (#614)

pull/649/head
pizihao 3 years ago
parent 1b84a0e5e7
commit 70f0a92f1b

@ -44,4 +44,9 @@ public class AdapterExecutorProperties {
* Maximum pool size
*/
private Integer maximumPoolSize;
/**
* nodes
*/
private String nodes;
}

@ -102,4 +102,9 @@ public class ExecutorProperties {
* Notify
*/
private DynamicThreadPoolNotifyProperties notify;
/**
* nodes
*/
private String nodes;
}

@ -39,4 +39,9 @@ public class WebThreadPoolProperties {
* Keep alive time
*/
private Integer keepAliveTime;
/**
* nodes
*/
private String nodes;
}

@ -0,0 +1,114 @@
package cn.hippo4j.core.springboot.starter.refresher.event;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import lombok.Data;
import lombok.extern.log4j.Log4j2;
import org.springframework.core.env.Environment;
import java.util.Arrays;
import java.util.Objects;
/**
* Refresh listener abstract base class.
*/
@Log4j2
public abstract class AbstractRefreshListener<M> implements RefreshListener<Hippo4jConfigDynamicRefreshEvent, M> {
protected static final String PORT_PROPERTY = "local.server.port";
protected static final String ALL = "*";
/**
* separator
*/
protected static final String SEPARATOR = ",";
/**
* application ip
*/
protected final String ipAddress;
/**
* application post
*/
protected final String port;
AbstractRefreshListener() {
InetUtils inetUtils = ApplicationContextHolder.getBean(InetUtils.class);
InetUtils.HostInfo loopbackHostInfo = inetUtils.findFirstNonLoopbackHostInfo();
Assert.isNull(loopbackHostInfo, "Unable to get the application IP address");
ipAddress = loopbackHostInfo.getIpAddress();
Environment environment = ApplicationContextHolder.getInstance().getEnvironment();
Assert.isTrue(environment.containsProperty(PORT_PROPERTY), "Unable to get the application port");
port = environment.getProperty(PORT_PROPERTY);
}
/**
* Matching nodes<br>
* nodes is ip + port.Get 'nodes' in the new Properties,Compare this with the ip + port of Application.<br>
* Support prefix pattern matching.e.g: <br>
* <ul>
* <li>192.168.1.5:* -- Matches all ports of 192.168.1.5</li>
* <li>192.168.1.*:2009 -- Matches 2009 port of 192.168.1.*</li>
* <li>* -- all</li>
* <li>empty -- all</li>
* </ul>
* The format of ip + port is ip : port.
*
* @param properties new Properties
*/
@Override
public boolean match(M properties) {
return false;
}
/**
* check all
*
* @param nodes nodes
*/
protected boolean checkArray(String nodes) {
if (StringUtil.isEmpty(nodes) || ALL.equals(nodes)) {
return true;
}
String[] splitNodes = nodes.split(SEPARATOR);
return Arrays.stream(splitNodes)
.distinct()
.map(IpAndPort::new)
.map(i -> i.check(ipAddress, port))
.anyMatch(Boolean.FALSE::equals);
}
/**
* ip + port
*/
@Data
protected static class IpAndPort {
protected static final String COLON = ";";
private String ip;
private String port;
public IpAndPort(String node) {
String[] ipPort = node.split(COLON);
Assert.isTrue(ipPort.length != 2, "The IP address format is error:" + node);
ip = ipPort[0];
port = ipPort[1];
}
/**
* check
*
* @param ip application ip
* @param port application port
*/
public boolean check(String ip, String port) {
return Objects.equals(ip, this.ip) && Objects.equals(port, this.port);
}
}
}

@ -22,6 +22,7 @@ import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.core.springboot.starter.config.AdapterExecutorProperties;
import cn.hippo4j.core.springboot.starter.config.ExecutorProperties;
import cn.hutool.core.bean.BeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
@ -40,7 +41,13 @@ import static cn.hippo4j.core.springboot.starter.support.DynamicThreadPoolAdapte
*/
@Slf4j
@Order(ADAPTER_EXECUTORS_LISTENER)
public class AdapterExecutorsRefreshListener implements ApplicationListener<Hippo4jConfigDynamicRefreshEvent> {
public class AdapterExecutorsRefreshListener extends AbstractRefreshListener<AdapterExecutorProperties> {
@Override
public boolean match(AdapterExecutorProperties properties) {
String nodes = properties.getNodes();
return checkArray(nodes);
}
@Override
public void onApplicationEvent(Hippo4jConfigDynamicRefreshEvent event) {
@ -52,7 +59,7 @@ public class AdapterExecutorsRefreshListener implements ApplicationListener<Hipp
for (AdapterExecutorProperties each : adapterExecutors) {
String buildKey = each.getMark() + IDENTIFY_SLICER_SYMBOL + each.getThreadPoolKey();
AdapterExecutorProperties adapterExecutorProperties = ADAPTER_EXECUTORS_MAP.get(buildKey);
if (adapterExecutorProperties == null) {
if (adapterExecutorProperties == null || match(adapterExecutorProperties)) {
continue;
}
if (!Objects.equals(adapterExecutorProperties.getCorePoolSize(), each.getCorePoolSize())

@ -21,6 +21,7 @@ import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
@ -29,6 +30,7 @@ import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.core.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.core.springboot.starter.config.ExecutorProperties;
import cn.hippo4j.core.springboot.starter.config.WebThreadPoolProperties;
import cn.hippo4j.core.springboot.starter.notify.CoreNotifyConfigBuilder;
import cn.hippo4j.core.springboot.starter.support.GlobalCoreThreadPoolManage;
import cn.hippo4j.message.dto.NotifyConfigDTO;
@ -60,7 +62,7 @@ import static cn.hippo4j.core.springboot.starter.refresher.event.Hippo4jConfigDy
@Slf4j
@RequiredArgsConstructor
@Order(EXECUTORS_LISTENER)
public class DynamicThreadPoolRefreshListener implements ApplicationListener<Hippo4jConfigDynamicRefreshEvent> {
public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<ExecutorProperties> {
private final ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler;
@ -68,19 +70,25 @@ public class DynamicThreadPoolRefreshListener implements ApplicationListener<Hip
private final Hippo4jBaseSendMessageService hippo4jBaseSendMessageService;
@Override
public boolean match(ExecutorProperties properties) {
String nodes = properties.getNodes();
return checkArray(nodes);
}
@Override
public void onApplicationEvent(Hippo4jConfigDynamicRefreshEvent event) {
BootstrapConfigProperties bindableConfigProperties = event.getBootstrapConfigProperties();
List<ExecutorProperties> executors = bindableConfigProperties.getExecutors();
for (ExecutorProperties properties : executors) {
String threadPoolId = properties.getThreadPoolId();
/**
if (!match(properties) && !checkConsistency(threadPoolId, properties)) {
continue;
}
/*
* Check whether the notification configuration is consistent, this operation will not trigger the notification.
*/
checkNotifyConsistencyAndReplace(properties);
if (!checkConsistency(threadPoolId, properties)) {
continue;
}
dynamicRefreshPool(threadPoolId, properties);
ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId());
GlobalCoreThreadPoolManage.refresh(threadPoolId, failDefaultExecutorProperties(beforeProperties, properties));

@ -37,7 +37,7 @@ import static cn.hippo4j.core.springboot.starter.refresher.event.Hippo4jConfigDy
* Platforms refresh listener.
*/
@Order(PLATFORMS_LISTENER)
public class PlatformsRefreshListener implements ApplicationListener<Hippo4jConfigDynamicRefreshEvent> {
public class PlatformsRefreshListener extends AbstractRefreshListener<ExecutorProperties> {
@Override
public void onApplicationEvent(Hippo4jConfigDynamicRefreshEvent threadPoolDynamicRefreshEvent) {

@ -0,0 +1,14 @@
package cn.hippo4j.core.springboot.starter.refresher.event;
import cn.hippo4j.common.function.Matcher;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
/**
* Refresh listener interface.
* Tevent.
* Mproperties.
*/
public interface RefreshListener<T extends ApplicationEvent, M> extends ApplicationListener<T>, Matcher<M> {
}

@ -22,10 +22,10 @@ import cn.hippo4j.adapter.web.WebThreadPoolService;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.core.springboot.starter.config.WebThreadPoolProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.core.annotation.Order;
import java.util.Objects;
@ -37,7 +37,13 @@ import static cn.hippo4j.core.springboot.starter.refresher.event.Hippo4jConfigDy
*/
@Slf4j
@Order(WEB_EXECUTOR_LISTENER)
public class WebExecutorRefreshListener implements ApplicationListener<Hippo4jConfigDynamicRefreshEvent> {
public class WebExecutorRefreshListener extends AbstractRefreshListener<WebThreadPoolProperties> {
@Override
public boolean match(WebThreadPoolProperties properties) {
String nodes = properties.getNodes();
return checkArray(nodes);
}
@Override
public void onApplicationEvent(Hippo4jConfigDynamicRefreshEvent threadPoolDynamicRefreshEvent) {
@ -75,7 +81,7 @@ public class WebExecutorRefreshListener implements ApplicationListener<Hippo4jCo
} else if (bindableCoreProperties.getJetty() != null) {
webThreadPoolProperties = bindableCoreProperties.getJetty();
}
if (webThreadPoolProperties != null) {
if (webThreadPoolProperties != null && match(webThreadPoolProperties)) {
threadPoolParameterInfo = ThreadPoolParameterInfo.builder()
.coreSize(webThreadPoolProperties.getCorePoolSize())
.maximumPoolSize(webThreadPoolProperties.getMaximumPoolSize())

Loading…
Cancel
Save