Adapter 初始化覆盖核心参数 (#716)

* feat : Unify get the IP and port of the current instance(#609)

* feat : add enable prop for this(#609)

* feat : add ThreadPoolInitRefresh for init(#609)

* feat : adapter thread pool init, including nacos, zookeeper, etcd and apollo(#609)
pull/718/head
pizihao 2 years ago committed by GitHub
parent 7dc30efc53
commit a7f2bb7521
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.adapter.web;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.model.WebIpAndPortInfo;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import org.springframework.boot.web.server.WebServer;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
/**
* Ip and port Holder
*/
public class WebIpAndPortHolder {
/**
* Application ip and application post
*/
protected static AtomicReference<WebIpAndPortInfo> webIpAndPort = new AtomicReference<>();
public static final String ALL = "*";
protected static final String SEPARATOR = ",";
private WebIpAndPortHolder() {
}
protected static void initIpAndPort() {
webIpAndPort.compareAndSet(null, getWebIpAndPortInfo());
}
private static WebIpAndPortInfo getWebIpAndPortInfo() {
InetUtils inetUtils = ApplicationContextHolder.getBean(InetUtils.class);
InetUtils.HostInfo loopBackHostInfo = inetUtils.findFirstNonLoopBackHostInfo();
Assert.notNull(loopBackHostInfo, "Unable to get the application IP address");
String ip = loopBackHostInfo.getIpAddress();
WebThreadPoolHandlerChoose webThreadPoolHandlerChoose = ApplicationContextHolder.getBean(WebThreadPoolHandlerChoose.class);
WebThreadPoolService webThreadPoolService = webThreadPoolHandlerChoose.choose();
// When get the port at startup, can get the message: "port xxx was already in use" or use two ports
WebServer webServer = webThreadPoolService.getWebServer();
String port = String.valueOf(webServer.getPort());
return new WebIpAndPortInfo(ip, port);
}
/**
* get WebIpAndPortInfo, If it is null, initialize it
*
* @return WebIpAndPortInfo
*/
public static WebIpAndPortInfo getWebIpAndPort() {
if (webIpAndPort.get() == null) {
initIpAndPort();
}
return WebIpAndPortHolder.webIpAndPort.get();
}
/**
* Check the new properties and instance IP and port
*
* @param nodes nodes in properties
* @return Whether it meets the conditions
*/
public static boolean check(String nodes) {
WebIpAndPortInfo webIpAndPort = WebIpAndPortHolder.getWebIpAndPort();
if (StringUtil.isEmpty(nodes) || ALL.equals(nodes)) {
return true;
}
String[] splitNodes = nodes.split(SEPARATOR);
return Arrays.stream(splitNodes)
.distinct()
.map(WebIpAndPortInfo::build)
.filter(Objects::nonNull)
.anyMatch(each -> each.check(webIpAndPort.getIpSegment(), webIpAndPort.getPort()));
}
}

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.common.api;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
/**
* when init thread-pool dynamic refresh.
*/
public interface ThreadPoolInitRefresh extends ApplicationRunner {
/**
* Initializes the thread pool after system startup
*
* @param context new properties
*/
void initRefresh(String context);
/**
* get from the Configuration center
*
* @return new properties
* @throws Exception exception
*/
String getProperties() throws Exception;
@Override
default void run(ApplicationArguments args) throws Exception {
String properties = getProperties();
if (properties == null) {
return;
}
initRefresh(properties);
}
}

@ -49,4 +49,9 @@ public class AdapterExecutorProperties {
* Nodes, application startup is not affect, change properties is effect
*/
private String nodes;
/**
* these propertied is enabled?
*/
private Boolean enable = true;
}

@ -44,4 +44,9 @@ public class WebThreadPoolProperties {
* Nodes, application startup is not affect, change properties is effect
*/
private String nodes;
/**
* these propertied is enabled?
*/
private Boolean enable = true;
}

@ -18,6 +18,7 @@
package cn.hippo4j.config.springboot.starter.refresher;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.common.api.ThreadPoolInitRefresh;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
@ -37,7 +38,11 @@ import java.util.concurrent.ExecutorService;
*/
@Slf4j
@RequiredArgsConstructor
public abstract class AbstractConfigThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh, InitializingBean {
public abstract class AbstractConfigThreadPoolDynamicRefresh
implements
ThreadPoolDynamicRefresh,
ThreadPoolInitRefresh,
InitializingBean {
protected final BootstrapConfigProperties bootstrapConfigProperties;
@ -47,6 +52,11 @@ public abstract class AbstractConfigThreadPoolDynamicRefresh implements ThreadPo
bootstrapConfigProperties = ApplicationContextHolder.getBean(BootstrapConfigProperties.class);
}
@Override
public void initRefresh(String context) {
dynamicRefresh(context);
}
@Override
public void dynamicRefresh(String configContent) {
dynamicRefresh(configContent, null);

@ -41,6 +41,16 @@ public class ApolloRefresherHandler extends AbstractConfigThreadPoolDynamicRefre
@Value(APOLLO_PROPERTY)
private String namespace;
@Override
public String getProperties() {
String[] apolloNamespaces = this.namespace.split(",");
this.namespace = apolloNamespaces[0];
String copyNamespace = this.namespace.replaceAll("." + bootstrapConfigProperties.getConfigFileType().getValue(), "");
ConfigFileFormat configFileFormat = ConfigFileFormat.fromString(bootstrapConfigProperties.getConfigFileType().getValue());
ConfigFile configFile = ConfigService.getConfigFile(copyNamespace, configFileFormat);
return configFile.getContent();
}
@Override
public void afterPropertiesSet() {
String[] apolloNamespaces = this.namespace.split(",");

@ -52,22 +52,25 @@ public class EtcdRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh
private static final String KEY = "key";
@Override
public String getProperties() throws Exception {
Map<String, String> etcd = bootstrapConfigProperties.getEtcd();
Charset charset = StringUtil.isBlank(etcd.get(CHARSET)) ? StandardCharsets.UTF_8 : Charset.forName(etcd.get(CHARSET));
initClient(etcd, charset);
String key = etcd.get(KEY);
GetResponse getResponse = client.getKVClient().get(ByteSequence.from(key, charset)).get();
KeyValue keyValue = getResponse.getKvs().get(0);
return Objects.isNull(keyValue) ? null : keyValue.getValue().toString(charset);
}
@Override
public void afterPropertiesSet() throws Exception {
Map<String, String> etcd = bootstrapConfigProperties.getEtcd();
String user = etcd.get(USER);
String password = etcd.get(PASSWORD);
String endpoints = etcd.get(ENDPOINTS);
String authority = etcd.get(AUTHORITY);
String key = etcd.get(KEY);
Charset charset = StringUtil.isBlank(etcd.get(CHARSET)) ? StandardCharsets.UTF_8 : Charset.forName(etcd.get(CHARSET));
ClientBuilder clientBuilder = Client.builder().endpoints(endpoints.split(","));
// todo
if (Objects.isNull(client)) {
client = StringUtil.isAllNotEmpty(user, password) ? clientBuilder.user(ByteSequence.from(user, charset))
.password(ByteSequence.from(password, charset)).authority(authority)
.build() : clientBuilder.build();
}
initClient(etcd, charset);
// todo Currently only supports json
GetResponse getResponse = client.getKVClient().get(ByteSequence.from(key, charset)).get();
KeyValue keyValue = getResponse.getKvs().get(0);
@ -100,4 +103,25 @@ public class EtcdRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh
}
});
}
/**
* if client is null, init it
*
* @param etcd etcd configuration item
* @param charset charset
*/
private void initClient(Map<String, String> etcd, Charset charset) {
// todo
if (Objects.isNull(client)) {
String user = etcd.get(USER);
String password = etcd.get(PASSWORD);
String authority = etcd.get(AUTHORITY);
String endpoints = etcd.get(ENDPOINTS);
ClientBuilder clientBuilder = Client.builder().endpoints(endpoints.split(","));
client = StringUtil.isAllNotEmpty(user, password) ? clientBuilder.user(ByteSequence.from(user, charset))
.password(ByteSequence.from(password, charset)).authority(authority)
.build() : clientBuilder.build();
}
}
}

@ -31,17 +31,28 @@ import java.util.concurrent.Executor;
@Slf4j
public class NacosCloudRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh {
static final String DATA_ID = "data-id";
static final String GROUP = "group";
private final NacosConfigManager nacosConfigManager;
public NacosCloudRefresherHandler() {
nacosConfigManager = ApplicationContextHolder.getBean(NacosConfigManager.class);
}
@Override
public String getProperties() throws Exception {
Map<String, String> nacosConfig = bootstrapConfigProperties.getNacos();
String dataId = nacosConfig.get(DATA_ID);
String group = nacosConfig.get(GROUP);
return nacosConfigManager.getConfigService().getConfig(dataId, group, 5000L);
}
@Override
public void afterPropertiesSet() throws Exception {
Map<String, String> nacosConfig = bootstrapConfigProperties.getNacos();
nacosConfigManager.getConfigService().addListener(nacosConfig.get("data-id"),
nacosConfig.get("group"), new Listener() {
nacosConfigManager.getConfigService().addListener(nacosConfig.get(DATA_ID),
nacosConfig.get(GROUP), new Listener() {
@Override
public Executor getExecutor() {
@ -53,6 +64,6 @@ public class NacosCloudRefresherHandler extends AbstractConfigThreadPoolDynamicR
dynamicRefresh(configInfo);
}
});
log.info("Dynamic thread pool refresher, add nacos cloud listener success. data-id: {}, group: {}", nacosConfig.get("data-id"), nacosConfig.get("group"));
log.info("Dynamic thread pool refresher, add nacos cloud listener success. data-id: {}, group: {}", nacosConfig.get(DATA_ID), nacosConfig.get(GROUP));
}
}

@ -32,6 +32,9 @@ import java.util.concurrent.Executor;
@Slf4j
public class NacosRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh {
static final String DATA_ID = "data-id";
static final String GROUP = "group";
@NacosInjected
private ConfigService configService;
@ -39,11 +42,19 @@ public class NacosRefresherHandler extends AbstractConfigThreadPoolDynamicRefres
super(bootstrapConfigProperties);
}
@Override
public String getProperties() throws Exception {
Map<String, String> nacosConfig = bootstrapConfigProperties.getNacos();
String dataId = nacosConfig.get(DATA_ID);
String group = nacosConfig.get(GROUP);
return configService.getConfig(dataId, group, Long.MAX_VALUE);
}
@Override
public void afterPropertiesSet() throws Exception {
Map<String, String> nacosConfig = bootstrapConfigProperties.getNacos();
configService.addListener(nacosConfig.get("data-id"), nacosConfig.get("group"),
configService.addListener(nacosConfig.get(DATA_ID), nacosConfig.get(GROUP),
new Listener() {
@Override
@ -56,6 +67,6 @@ public class NacosRefresherHandler extends AbstractConfigThreadPoolDynamicRefres
dynamicRefresh(configInfo);
}
});
log.info("Dynamic thread pool refresher, add nacos listener success. data-id: {}, group: {}", nacosConfig.get("data-id"), nacosConfig.get("group"));
log.info("Dynamic thread pool refresher, add nacos listener success. data-id: {}, group: {}", nacosConfig.get(DATA_ID), nacosConfig.get(GROUP));
}
}

@ -41,15 +41,31 @@ import java.util.Map;
@Slf4j
public class ZookeeperRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh {
static final String ZK_CONNECT_STR = "zk-connect-str";
static final String ROOT_NODE = "root-node";
static final String CONFIG_VERSION = "config-version";
static final String NODE = "node";
private CuratorFramework curatorFramework;
@Override
public String getProperties() {
Map<String, String> zkConfigs = bootstrapConfigProperties.getZookeeper();
String nodePath = ZKPaths.makePath(ZKPaths.makePath(zkConfigs.get(ROOT_NODE),
zkConfigs.get(CONFIG_VERSION)), zkConfigs.get(NODE));
return nodePathResolver(nodePath);
}
@Override
public void afterPropertiesSet() {
Map<String, String> zkConfigs = bootstrapConfigProperties.getZookeeper();
curatorFramework = CuratorFrameworkFactory.newClient(zkConfigs.get("zk-connect-str"),
curatorFramework = CuratorFrameworkFactory.newClient(zkConfigs.get(ZK_CONNECT_STR),
new ExponentialBackoffRetry(1000, 3));
String nodePath = ZKPaths.makePath(ZKPaths.makePath(zkConfigs.get("root-node"),
zkConfigs.get("config-version")), zkConfigs.get("node"));
String nodePath = ZKPaths.makePath(ZKPaths.makePath(zkConfigs.get(ROOT_NODE),
zkConfigs.get(CONFIG_VERSION)), zkConfigs.get(NODE));
final ConnectionStateListener connectionStateListener = (client, newState) -> {
if (newState == ConnectionState.CONNECTED) {
loadNode(nodePath);
@ -81,6 +97,20 @@ public class ZookeeperRefresherHandler extends AbstractConfigThreadPoolDynamicRe
* @param nodePath zk config node path.
*/
public void loadNode(String nodePath) {
String content = nodePathResolver(nodePath);
if (content != null) {
dynamicRefresh(content);
registerNotifyAlarmManage();
}
}
/**
* resolver for zk config
*
* @param nodePath zk config node path
* @return resolver result
*/
private String nodePathResolver(String nodePath) {
try {
final GetChildrenBuilder childrenBuilder = curatorFramework.getChildren();
final List<String> children = childrenBuilder.watched().forPath(nodePath);
@ -97,10 +127,10 @@ public class ZookeeperRefresherHandler extends AbstractConfigThreadPoolDynamicRe
}
content.append(nodeName).append("=").append(value).append("\n");
});
dynamicRefresh(content.toString());
registerNotifyAlarmManage();
return content.toString();
} catch (Exception ex) {
log.error("Load zookeeper node error, nodePath is: {}", nodePath, ex);
return null;
}
}

@ -17,55 +17,15 @@
package cn.hippo4j.config.springboot.starter.refresher.event;
import cn.hippo4j.adapter.web.WebThreadPoolHandlerChoose;
import cn.hippo4j.adapter.web.WebThreadPoolService;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.model.WebIpAndPortInfo;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import cn.hippo4j.adapter.web.WebIpAndPortHolder;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.Objects;
/**
* Refresh listener abstract base class.
*/
@Slf4j
public abstract class AbstractRefreshListener<M> implements RefreshListener<Hippo4jConfigDynamicRefreshEvent, M> {
protected static final String ALL = "*";
protected static final String SEPARATOR = ",";
/**
* Application ip and application post
*/
protected static volatile WebIpAndPortInfo webIpAndPort;
protected void initIpAndPort() {
if (webIpAndPort == null) {
synchronized (AbstractRefreshListener.class) {
if (webIpAndPort == null) {
webIpAndPort = getWebIpAndPortInfo();
}
}
}
}
private WebIpAndPortInfo getWebIpAndPortInfo() {
InetUtils inetUtils = ApplicationContextHolder.getBean(InetUtils.class);
InetUtils.HostInfo loopBackHostInfo = inetUtils.findFirstNonLoopBackHostInfo();
Assert.notNull(loopBackHostInfo, "Unable to get the application IP address");
String ip = loopBackHostInfo.getIpAddress();
WebThreadPoolHandlerChoose webThreadPoolHandlerChoose = ApplicationContextHolder.getBean(WebThreadPoolHandlerChoose.class);
WebThreadPoolService webThreadPoolService = webThreadPoolHandlerChoose.choose();
// When get the port at startup, can get the message: "port xxx was already in use" or use two ports
String port = String.valueOf(webThreadPoolService.getWebServer().getPort());
return new WebIpAndPortInfo(ip, port);
}
/**
* Matching nodes<br>
* nodes is ip + port.Get 'nodes' in the new Properties,Compare this with the ip + port of Application.<br>
@ -82,19 +42,8 @@ public abstract class AbstractRefreshListener<M> implements RefreshListener<Hipp
*/
@Override
public boolean match(M properties) {
if (webIpAndPort == null) {
initIpAndPort();
}
String nodes = getNodes(properties);
if (StringUtil.isEmpty(nodes) || ALL.equals(nodes)) {
return true;
}
String[] splitNodes = nodes.split(SEPARATOR);
return Arrays.stream(splitNodes)
.distinct()
.map(WebIpAndPortInfo::build)
.filter(Objects::nonNull)
.anyMatch(each -> each.check(webIpAndPort.getIpSegment(), webIpAndPort.getPort()));
return WebIpAndPortHolder.check(nodes);
}
/**
@ -104,6 +53,6 @@ public abstract class AbstractRefreshListener<M> implements RefreshListener<Hipp
* @return nodes in properties
*/
protected String getNodes(M properties) {
return ALL;
return WebIpAndPortHolder.ALL;
}
}

@ -56,7 +56,7 @@ public class AdapterExecutorsRefreshListener extends AbstractRefreshListener<Ada
for (AdapterExecutorProperties each : adapterExecutors) {
String buildKey = each.getMark() + IDENTIFY_SLICER_SYMBOL + each.getThreadPoolKey();
AdapterExecutorProperties adapterExecutorProperties = DynamicThreadPoolAdapterRegister.ADAPTER_EXECUTORS_MAP.get(buildKey);
if (adapterExecutorProperties == null || !match(adapterExecutorProperties)) {
if (adapterExecutorProperties == null || !adapterExecutorProperties.getEnable() || !match(adapterExecutorProperties)) {
continue;
}
if (!Objects.equals(adapterExecutorProperties.getCorePoolSize(), each.getCorePoolSize())

@ -79,7 +79,7 @@ public class WebExecutorRefreshListener extends AbstractRefreshListener<WebThrea
} else if (bindableCoreProperties.getJetty() != null) {
webThreadPoolProperties = bindableCoreProperties.getJetty();
}
if (webThreadPoolProperties != null && match(webThreadPoolProperties)) {
if (webThreadPoolProperties != null && webThreadPoolProperties.getEnable() && match(webThreadPoolProperties)) {
threadPoolParameterInfo = ThreadPoolParameterInfo.builder()
.coreSize(webThreadPoolProperties.getCorePoolSize())
.maximumPoolSize(webThreadPoolProperties.getMaximumPoolSize())

Loading…
Cancel
Save