feat : adapter thread pool init, including nacos, zookeeper, etcd and apollo(#609)

pull/716/head
pizihao 3 years ago
parent bc30b89be9
commit 36096c5357

@ -41,6 +41,16 @@ public class ApolloRefresherHandler extends AbstractConfigThreadPoolDynamicRefre
@Value(APOLLO_PROPERTY)
private String namespace;
@Override
public String getProperties() {
String[] apolloNamespaces = this.namespace.split(",");
this.namespace = apolloNamespaces[0];
String copyNamespace = this.namespace.replaceAll("." + bootstrapConfigProperties.getConfigFileType().getValue(), "");
ConfigFileFormat configFileFormat = ConfigFileFormat.fromString(bootstrapConfigProperties.getConfigFileType().getValue());
ConfigFile configFile = ConfigService.getConfigFile(copyNamespace, configFileFormat);
return configFile.getContent();
}
@Override
public void afterPropertiesSet() {
String[] apolloNamespaces = this.namespace.split(",");

@ -52,22 +52,25 @@ public class EtcdRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh
private static final String KEY = "key";
@Override
public String getProperties() throws Exception {
Map<String, String> etcd = bootstrapConfigProperties.getEtcd();
Charset charset = StringUtil.isBlank(etcd.get(CHARSET)) ? StandardCharsets.UTF_8 : Charset.forName(etcd.get(CHARSET));
initClient(etcd, charset);
String key = etcd.get(KEY);
GetResponse getResponse = client.getKVClient().get(ByteSequence.from(key, charset)).get();
KeyValue keyValue = getResponse.getKvs().get(0);
return Objects.isNull(keyValue) ? null : keyValue.getValue().toString(charset);
}
@Override
public void afterPropertiesSet() throws Exception {
Map<String, String> etcd = bootstrapConfigProperties.getEtcd();
String user = etcd.get(USER);
String password = etcd.get(PASSWORD);
String endpoints = etcd.get(ENDPOINTS);
String authority = etcd.get(AUTHORITY);
String key = etcd.get(KEY);
Charset charset = StringUtil.isBlank(etcd.get(CHARSET)) ? StandardCharsets.UTF_8 : Charset.forName(etcd.get(CHARSET));
ClientBuilder clientBuilder = Client.builder().endpoints(endpoints.split(","));
// todo
if (Objects.isNull(client)) {
client = StringUtil.isAllNotEmpty(user, password) ? clientBuilder.user(ByteSequence.from(user, charset))
.password(ByteSequence.from(password, charset)).authority(authority)
.build() : clientBuilder.build();
}
initClient(etcd, charset);
// todo Currently only supports json
GetResponse getResponse = client.getKVClient().get(ByteSequence.from(key, charset)).get();
KeyValue keyValue = getResponse.getKvs().get(0);
@ -100,4 +103,25 @@ public class EtcdRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh
}
});
}
/**
* if client is null, init it
*
* @param etcd etcd configuration item
* @param charset charset
*/
private void initClient(Map<String, String> etcd, Charset charset) {
// todo
if (Objects.isNull(client)) {
String user = etcd.get(USER);
String password = etcd.get(PASSWORD);
String authority = etcd.get(AUTHORITY);
String endpoints = etcd.get(ENDPOINTS);
ClientBuilder clientBuilder = Client.builder().endpoints(endpoints.split(","));
client = StringUtil.isAllNotEmpty(user, password) ? clientBuilder.user(ByteSequence.from(user, charset))
.password(ByteSequence.from(password, charset)).authority(authority)
.build() : clientBuilder.build();
}
}
}

@ -31,17 +31,28 @@ import java.util.concurrent.Executor;
@Slf4j
public class NacosCloudRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh {
static final String DATA_ID = "data-id";
static final String GROUP = "group";
private final NacosConfigManager nacosConfigManager;
public NacosCloudRefresherHandler() {
nacosConfigManager = ApplicationContextHolder.getBean(NacosConfigManager.class);
}
@Override
public String getProperties() throws Exception {
Map<String, String> nacosConfig = bootstrapConfigProperties.getNacos();
String dataId = nacosConfig.get(DATA_ID);
String group = nacosConfig.get(GROUP);
return nacosConfigManager.getConfigService().getConfig(dataId, group, 5000L);
}
@Override
public void afterPropertiesSet() throws Exception {
Map<String, String> nacosConfig = bootstrapConfigProperties.getNacos();
nacosConfigManager.getConfigService().addListener(nacosConfig.get("data-id"),
nacosConfig.get("group"), new Listener() {
nacosConfigManager.getConfigService().addListener(nacosConfig.get(DATA_ID),
nacosConfig.get(GROUP), new Listener() {
@Override
public Executor getExecutor() {
@ -53,6 +64,6 @@ public class NacosCloudRefresherHandler extends AbstractConfigThreadPoolDynamicR
dynamicRefresh(configInfo);
}
});
log.info("Dynamic thread pool refresher, add nacos cloud listener success. data-id: {}, group: {}", nacosConfig.get("data-id"), nacosConfig.get("group"));
log.info("Dynamic thread pool refresher, add nacos cloud listener success. data-id: {}, group: {}", nacosConfig.get(DATA_ID), nacosConfig.get(GROUP));
}
}

@ -32,6 +32,9 @@ import java.util.concurrent.Executor;
@Slf4j
public class NacosRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh {
static final String DATA_ID = "data-id";
static final String GROUP = "group";
@NacosInjected
private ConfigService configService;
@ -39,11 +42,19 @@ public class NacosRefresherHandler extends AbstractConfigThreadPoolDynamicRefres
super(bootstrapConfigProperties);
}
@Override
public String getProperties() throws Exception {
Map<String, String> nacosConfig = bootstrapConfigProperties.getNacos();
String dataId = nacosConfig.get(DATA_ID);
String group = nacosConfig.get(GROUP);
return configService.getConfig(dataId, group, Long.MAX_VALUE);
}
@Override
public void afterPropertiesSet() throws Exception {
Map<String, String> nacosConfig = bootstrapConfigProperties.getNacos();
configService.addListener(nacosConfig.get("data-id"), nacosConfig.get("group"),
configService.addListener(nacosConfig.get(DATA_ID), nacosConfig.get(GROUP),
new Listener() {
@Override
@ -56,6 +67,6 @@ public class NacosRefresherHandler extends AbstractConfigThreadPoolDynamicRefres
dynamicRefresh(configInfo);
}
});
log.info("Dynamic thread pool refresher, add nacos listener success. data-id: {}, group: {}", nacosConfig.get("data-id"), nacosConfig.get("group"));
log.info("Dynamic thread pool refresher, add nacos listener success. data-id: {}, group: {}", nacosConfig.get(DATA_ID), nacosConfig.get(GROUP));
}
}

@ -41,15 +41,31 @@ import java.util.Map;
@Slf4j
public class ZookeeperRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh {
static final String ZK_CONNECT_STR = "zk-connect-str";
static final String ROOT_NODE = "root-node";
static final String CONFIG_VERSION = "config-version";
static final String NODE = "node";
private CuratorFramework curatorFramework;
@Override
public String getProperties() {
Map<String, String> zkConfigs = bootstrapConfigProperties.getZookeeper();
String nodePath = ZKPaths.makePath(ZKPaths.makePath(zkConfigs.get(ROOT_NODE),
zkConfigs.get(CONFIG_VERSION)), zkConfigs.get(NODE));
return nodePathResolver(nodePath);
}
@Override
public void afterPropertiesSet() {
Map<String, String> zkConfigs = bootstrapConfigProperties.getZookeeper();
curatorFramework = CuratorFrameworkFactory.newClient(zkConfigs.get("zk-connect-str"),
curatorFramework = CuratorFrameworkFactory.newClient(zkConfigs.get(ZK_CONNECT_STR),
new ExponentialBackoffRetry(1000, 3));
String nodePath = ZKPaths.makePath(ZKPaths.makePath(zkConfigs.get("root-node"),
zkConfigs.get("config-version")), zkConfigs.get("node"));
String nodePath = ZKPaths.makePath(ZKPaths.makePath(zkConfigs.get(ROOT_NODE),
zkConfigs.get(CONFIG_VERSION)), zkConfigs.get(NODE));
final ConnectionStateListener connectionStateListener = (client, newState) -> {
if (newState == ConnectionState.CONNECTED) {
loadNode(nodePath);
@ -81,6 +97,20 @@ public class ZookeeperRefresherHandler extends AbstractConfigThreadPoolDynamicRe
* @param nodePath zk config node path.
*/
public void loadNode(String nodePath) {
String content = nodePathResolver(nodePath);
if (content != null) {
dynamicRefresh(content);
registerNotifyAlarmManage();
}
}
/**
* resolver for zk config
*
* @param nodePath zk config node path
* @return resolver result
*/
private String nodePathResolver(String nodePath) {
try {
final GetChildrenBuilder childrenBuilder = curatorFramework.getChildren();
final List<String> children = childrenBuilder.watched().forPath(nodePath);
@ -97,10 +127,10 @@ public class ZookeeperRefresherHandler extends AbstractConfigThreadPoolDynamicRe
}
content.append(nodeName).append("=").append(value).append("\n");
});
dynamicRefresh(content.toString());
registerNotifyAlarmManage();
return content.toString();
} catch (Exception ex) {
log.error("Load zookeeper node error, nodePath is: {}", nodePath, ex);
return null;
}
}

@ -79,7 +79,7 @@ public class WebExecutorRefreshListener extends AbstractRefreshListener<WebThrea
} else if (bindableCoreProperties.getJetty() != null) {
webThreadPoolProperties = bindableCoreProperties.getJetty();
}
if (webThreadPoolProperties != null && match(webThreadPoolProperties)) {
if (webThreadPoolProperties != null && webThreadPoolProperties.getEnable() && match(webThreadPoolProperties)) {
threadPoolParameterInfo = ThreadPoolParameterInfo.builder()
.coreSize(webThreadPoolProperties.getCorePoolSize())
.maximumPoolSize(webThreadPoolProperties.getMaximumPoolSize())

Loading…
Cancel
Save