From 36096c535781834922c27957a49055e2aacdf6ae Mon Sep 17 00:00:00 2001 From: pizihao <2335715300@qq.com> Date: Fri, 23 Sep 2022 11:44:42 +0800 Subject: [PATCH] feat : adapter thread pool init, including nacos, zookeeper, etcd and apollo(#609) --- .../refresher/ApolloRefresherHandler.java | 10 ++++ .../refresher/EtcdRefresherHandler.java | 46 ++++++++++++++----- .../refresher/NacosCloudRefresherHandler.java | 17 +++++-- .../refresher/NacosRefresherHandler.java | 15 +++++- .../refresher/ZookeeperRefresherHandler.java | 40 ++++++++++++++-- .../event/WebExecutorRefreshListener.java | 2 +- 6 files changed, 108 insertions(+), 22 deletions(-) diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/ApolloRefresherHandler.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/ApolloRefresherHandler.java index bc4ce2e2..90425870 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/ApolloRefresherHandler.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/ApolloRefresherHandler.java @@ -41,6 +41,16 @@ public class ApolloRefresherHandler extends AbstractConfigThreadPoolDynamicRefre @Value(APOLLO_PROPERTY) private String namespace; + @Override + public String getProperties() { + String[] apolloNamespaces = this.namespace.split(","); + this.namespace = apolloNamespaces[0]; + String copyNamespace = this.namespace.replaceAll("." + bootstrapConfigProperties.getConfigFileType().getValue(), ""); + ConfigFileFormat configFileFormat = ConfigFileFormat.fromString(bootstrapConfigProperties.getConfigFileType().getValue()); + ConfigFile configFile = ConfigService.getConfigFile(copyNamespace, configFileFormat); + return configFile.getContent(); + } + @Override public void afterPropertiesSet() { String[] apolloNamespaces = this.namespace.split(","); diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/EtcdRefresherHandler.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/EtcdRefresherHandler.java index 30df2d14..5f0cbe25 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/EtcdRefresherHandler.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/EtcdRefresherHandler.java @@ -52,22 +52,25 @@ public class EtcdRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh private static final String KEY = "key"; + @Override + public String getProperties() throws Exception { + Map etcd = bootstrapConfigProperties.getEtcd(); + Charset charset = StringUtil.isBlank(etcd.get(CHARSET)) ? StandardCharsets.UTF_8 : Charset.forName(etcd.get(CHARSET)); + initClient(etcd, charset); + + String key = etcd.get(KEY); + GetResponse getResponse = client.getKVClient().get(ByteSequence.from(key, charset)).get(); + KeyValue keyValue = getResponse.getKvs().get(0); + return Objects.isNull(keyValue) ? null : keyValue.getValue().toString(charset); + } + @Override public void afterPropertiesSet() throws Exception { Map etcd = bootstrapConfigProperties.getEtcd(); - String user = etcd.get(USER); - String password = etcd.get(PASSWORD); - String endpoints = etcd.get(ENDPOINTS); - String authority = etcd.get(AUTHORITY); String key = etcd.get(KEY); Charset charset = StringUtil.isBlank(etcd.get(CHARSET)) ? StandardCharsets.UTF_8 : Charset.forName(etcd.get(CHARSET)); - ClientBuilder clientBuilder = Client.builder().endpoints(endpoints.split(",")); - // todo - if (Objects.isNull(client)) { - client = StringUtil.isAllNotEmpty(user, password) ? clientBuilder.user(ByteSequence.from(user, charset)) - .password(ByteSequence.from(password, charset)).authority(authority) - .build() : clientBuilder.build(); - } + initClient(etcd, charset); + // todo Currently only supports json GetResponse getResponse = client.getKVClient().get(ByteSequence.from(key, charset)).get(); KeyValue keyValue = getResponse.getKvs().get(0); @@ -100,4 +103,25 @@ public class EtcdRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh } }); } + + /** + * if client is null, init it + * + * @param etcd etcd configuration item + * @param charset charset + */ + private void initClient(Map etcd, Charset charset) { + // todo + if (Objects.isNull(client)) { + String user = etcd.get(USER); + String password = etcd.get(PASSWORD); + String authority = etcd.get(AUTHORITY); + String endpoints = etcd.get(ENDPOINTS); + ClientBuilder clientBuilder = Client.builder().endpoints(endpoints.split(",")); + client = StringUtil.isAllNotEmpty(user, password) ? clientBuilder.user(ByteSequence.from(user, charset)) + .password(ByteSequence.from(password, charset)).authority(authority) + .build() : clientBuilder.build(); + } + } + } \ No newline at end of file diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/NacosCloudRefresherHandler.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/NacosCloudRefresherHandler.java index 4433cfa4..6f328b1f 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/NacosCloudRefresherHandler.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/NacosCloudRefresherHandler.java @@ -31,17 +31,28 @@ import java.util.concurrent.Executor; @Slf4j public class NacosCloudRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh { + static final String DATA_ID = "data-id"; + static final String GROUP = "group"; + private final NacosConfigManager nacosConfigManager; public NacosCloudRefresherHandler() { nacosConfigManager = ApplicationContextHolder.getBean(NacosConfigManager.class); } + @Override + public String getProperties() throws Exception { + Map nacosConfig = bootstrapConfigProperties.getNacos(); + String dataId = nacosConfig.get(DATA_ID); + String group = nacosConfig.get(GROUP); + return nacosConfigManager.getConfigService().getConfig(dataId, group, 5000L); + } + @Override public void afterPropertiesSet() throws Exception { Map nacosConfig = bootstrapConfigProperties.getNacos(); - nacosConfigManager.getConfigService().addListener(nacosConfig.get("data-id"), - nacosConfig.get("group"), new Listener() { + nacosConfigManager.getConfigService().addListener(nacosConfig.get(DATA_ID), + nacosConfig.get(GROUP), new Listener() { @Override public Executor getExecutor() { @@ -53,6 +64,6 @@ public class NacosCloudRefresherHandler extends AbstractConfigThreadPoolDynamicR dynamicRefresh(configInfo); } }); - log.info("Dynamic thread pool refresher, add nacos cloud listener success. data-id: {}, group: {}", nacosConfig.get("data-id"), nacosConfig.get("group")); + log.info("Dynamic thread pool refresher, add nacos cloud listener success. data-id: {}, group: {}", nacosConfig.get(DATA_ID), nacosConfig.get(GROUP)); } } diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/NacosRefresherHandler.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/NacosRefresherHandler.java index fa5f7e91..ad052018 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/NacosRefresherHandler.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/NacosRefresherHandler.java @@ -32,6 +32,9 @@ import java.util.concurrent.Executor; @Slf4j public class NacosRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh { + static final String DATA_ID = "data-id"; + static final String GROUP = "group"; + @NacosInjected private ConfigService configService; @@ -39,11 +42,19 @@ public class NacosRefresherHandler extends AbstractConfigThreadPoolDynamicRefres super(bootstrapConfigProperties); } + @Override + public String getProperties() throws Exception { + Map nacosConfig = bootstrapConfigProperties.getNacos(); + String dataId = nacosConfig.get(DATA_ID); + String group = nacosConfig.get(GROUP); + return configService.getConfig(dataId, group, Long.MAX_VALUE); + } + @Override public void afterPropertiesSet() throws Exception { Map nacosConfig = bootstrapConfigProperties.getNacos(); - configService.addListener(nacosConfig.get("data-id"), nacosConfig.get("group"), + configService.addListener(nacosConfig.get(DATA_ID), nacosConfig.get(GROUP), new Listener() { @Override @@ -56,6 +67,6 @@ public class NacosRefresherHandler extends AbstractConfigThreadPoolDynamicRefres dynamicRefresh(configInfo); } }); - log.info("Dynamic thread pool refresher, add nacos listener success. data-id: {}, group: {}", nacosConfig.get("data-id"), nacosConfig.get("group")); + log.info("Dynamic thread pool refresher, add nacos listener success. data-id: {}, group: {}", nacosConfig.get(DATA_ID), nacosConfig.get(GROUP)); } } diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/ZookeeperRefresherHandler.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/ZookeeperRefresherHandler.java index 479bba79..fe886001 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/ZookeeperRefresherHandler.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/ZookeeperRefresherHandler.java @@ -41,15 +41,31 @@ import java.util.Map; @Slf4j public class ZookeeperRefresherHandler extends AbstractConfigThreadPoolDynamicRefresh { + static final String ZK_CONNECT_STR = "zk-connect-str"; + + static final String ROOT_NODE = "root-node"; + + static final String CONFIG_VERSION = "config-version"; + + static final String NODE = "node"; + private CuratorFramework curatorFramework; + @Override + public String getProperties() { + Map zkConfigs = bootstrapConfigProperties.getZookeeper(); + String nodePath = ZKPaths.makePath(ZKPaths.makePath(zkConfigs.get(ROOT_NODE), + zkConfigs.get(CONFIG_VERSION)), zkConfigs.get(NODE)); + return nodePathResolver(nodePath); + } + @Override public void afterPropertiesSet() { Map zkConfigs = bootstrapConfigProperties.getZookeeper(); - curatorFramework = CuratorFrameworkFactory.newClient(zkConfigs.get("zk-connect-str"), + curatorFramework = CuratorFrameworkFactory.newClient(zkConfigs.get(ZK_CONNECT_STR), new ExponentialBackoffRetry(1000, 3)); - String nodePath = ZKPaths.makePath(ZKPaths.makePath(zkConfigs.get("root-node"), - zkConfigs.get("config-version")), zkConfigs.get("node")); + String nodePath = ZKPaths.makePath(ZKPaths.makePath(zkConfigs.get(ROOT_NODE), + zkConfigs.get(CONFIG_VERSION)), zkConfigs.get(NODE)); final ConnectionStateListener connectionStateListener = (client, newState) -> { if (newState == ConnectionState.CONNECTED) { loadNode(nodePath); @@ -81,6 +97,20 @@ public class ZookeeperRefresherHandler extends AbstractConfigThreadPoolDynamicRe * @param nodePath zk config node path. */ public void loadNode(String nodePath) { + String content = nodePathResolver(nodePath); + if (content != null) { + dynamicRefresh(content); + registerNotifyAlarmManage(); + } + } + + /** + * resolver for zk config + * + * @param nodePath zk config node path + * @return resolver result + */ + private String nodePathResolver(String nodePath) { try { final GetChildrenBuilder childrenBuilder = curatorFramework.getChildren(); final List children = childrenBuilder.watched().forPath(nodePath); @@ -97,10 +127,10 @@ public class ZookeeperRefresherHandler extends AbstractConfigThreadPoolDynamicRe } content.append(nodeName).append("=").append(value).append("\n"); }); - dynamicRefresh(content.toString()); - registerNotifyAlarmManage(); + return content.toString(); } catch (Exception ex) { log.error("Load zookeeper node error, nodePath is: {}", nodePath, ex); + return null; } } diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/WebExecutorRefreshListener.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/WebExecutorRefreshListener.java index f529bee7..1a438e1b 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/WebExecutorRefreshListener.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/WebExecutorRefreshListener.java @@ -79,7 +79,7 @@ public class WebExecutorRefreshListener extends AbstractRefreshListener