From 36239e8a4b53f5faf6a1b5ab0446b692e886a85a Mon Sep 17 00:00:00 2001 From: weihubeats Date: Fri, 2 Sep 2022 21:16:15 +0800 Subject: [PATCH] Etcd (#634) * add etcd * add etcd * add etcd * add etcd * add etcd * delete test --- .../pom.xml | 49 ++++++ .../etcd/ConfigEtcdExampleApplication.java | 37 +++++ .../config/etcd/config/ThreadPoolConfig.java | 52 ++++++ .../etcd/controller/TestController.java | 46 ++++++ .../src/main/resources/application.properties | 23 +++ hippo4j-example/pom.xml | 3 +- .../config/BootstrapConfigProperties.java | 3 + .../config/ConfigHandlerConfiguration.java | 2 +- ...ynamicThreadPoolCoreAutoConfiguration.java | 12 +- .../refresher/EtcdRefresherHandler.java | 152 +++++++++--------- 10 files changed, 293 insertions(+), 86 deletions(-) create mode 100644 hippo4j-example/hippo4j-config-etcd-spring-boot-starter-example/pom.xml create mode 100644 hippo4j-example/hippo4j-config-etcd-spring-boot-starter-example/src/main/java/cn/hippo4j/example/config/etcd/ConfigEtcdExampleApplication.java create mode 100644 hippo4j-example/hippo4j-config-etcd-spring-boot-starter-example/src/main/java/cn/hippo4j/example/config/etcd/config/ThreadPoolConfig.java create mode 100644 hippo4j-example/hippo4j-config-etcd-spring-boot-starter-example/src/main/java/cn/hippo4j/example/config/etcd/controller/TestController.java create mode 100644 hippo4j-example/hippo4j-config-etcd-spring-boot-starter-example/src/main/resources/application.properties diff --git a/hippo4j-example/hippo4j-config-etcd-spring-boot-starter-example/pom.xml b/hippo4j-example/hippo4j-config-etcd-spring-boot-starter-example/pom.xml new file mode 100644 index 00000000..8ac150a8 --- /dev/null +++ b/hippo4j-example/hippo4j-config-etcd-spring-boot-starter-example/pom.xml @@ -0,0 +1,49 @@ + + + + hippo4j-example + cn.hippo4j + ${revision} + + 4.0.0 + + hippo4j-config-etcd-spring-boot-starter-example + + + 11 + 11 + + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-logging + + + + cn.hippo4j + hippo4j-config-spring-boot-starter + ${revision} + + + + io.etcd + jetcd-core + ${jetcd.version} + + + + cn.hippo4j + hippo4j-example-core + ${revision} + + + + \ No newline at end of file diff --git a/hippo4j-example/hippo4j-config-etcd-spring-boot-starter-example/src/main/java/cn/hippo4j/example/config/etcd/ConfigEtcdExampleApplication.java b/hippo4j-example/hippo4j-config-etcd-spring-boot-starter-example/src/main/java/cn/hippo4j/example/config/etcd/ConfigEtcdExampleApplication.java new file mode 100644 index 00000000..ad793091 --- /dev/null +++ b/hippo4j-example/hippo4j-config-etcd-spring-boot-starter-example/src/main/java/cn/hippo4j/example/config/etcd/ConfigEtcdExampleApplication.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.example.config.etcd; + +import cn.hippo4j.core.enable.EnableDynamicThreadPool; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + *@author : wh + *@date : 2022/9/2 19:06 + *@description: + */ +@EnableDynamicThreadPool +@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.config") +public class ConfigEtcdExampleApplication { + + public static void main(String[] args) { + SpringApplication.run(ConfigEtcdExampleApplication.class, args); + } +} diff --git a/hippo4j-example/hippo4j-config-etcd-spring-boot-starter-example/src/main/java/cn/hippo4j/example/config/etcd/config/ThreadPoolConfig.java b/hippo4j-example/hippo4j-config-etcd-spring-boot-starter-example/src/main/java/cn/hippo4j/example/config/etcd/config/ThreadPoolConfig.java new file mode 100644 index 00000000..5a57c743 --- /dev/null +++ b/hippo4j-example/hippo4j-config-etcd-spring-boot-starter-example/src/main/java/cn/hippo4j/example/config/etcd/config/ThreadPoolConfig.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.example.config.etcd.config; + +import java.util.concurrent.ThreadPoolExecutor; + +import cn.hippo4j.core.executor.DynamicThreadPool; +import cn.hippo4j.core.executor.support.ThreadPoolBuilder; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + *@author : wh + *@date : 2022/9/2 19:26 + *@description: + */ +@Configuration +public class ThreadPoolConfig { + + + @Bean + @DynamicThreadPool + public ThreadPoolExecutor messageConsumeDynamicExecutor() { + String threadPoolId = "message-consume"; + ThreadPoolExecutor messageConsumeDynamicExecutor = ThreadPoolBuilder.builder() + .threadFactory(threadPoolId) + .threadPoolId(threadPoolId) + .dynamicPool() + .build(); + return messageConsumeDynamicExecutor; + } + + + + +} diff --git a/hippo4j-example/hippo4j-config-etcd-spring-boot-starter-example/src/main/java/cn/hippo4j/example/config/etcd/controller/TestController.java b/hippo4j-example/hippo4j-config-etcd-spring-boot-starter-example/src/main/java/cn/hippo4j/example/config/etcd/controller/TestController.java new file mode 100644 index 00000000..947bb610 --- /dev/null +++ b/hippo4j-example/hippo4j-config-etcd-spring-boot-starter-example/src/main/java/cn/hippo4j/example/config/etcd/controller/TestController.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.example.config.etcd.controller; + +import java.util.concurrent.ThreadPoolExecutor; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + *@author : wh + *@date : 2022/9/2 19:18 + *@description: + */ +@RestController +@RequestMapping +public class TestController { + + + @Autowired + private ThreadPoolExecutor messageConsumeDynamicExecutor; + + + @GetMapping("test") + public void test() { + System.out.println(messageConsumeDynamicExecutor.getMaximumPoolSize()); + } + +} diff --git a/hippo4j-example/hippo4j-config-etcd-spring-boot-starter-example/src/main/resources/application.properties b/hippo4j-example/hippo4j-config-etcd-spring-boot-starter-example/src/main/resources/application.properties new file mode 100644 index 00000000..d5c19eca --- /dev/null +++ b/hippo4j-example/hippo4j-config-etcd-spring-boot-starter-example/src/main/resources/application.properties @@ -0,0 +1,23 @@ +server.port=8888 + +spring.application.name=etcd +spring.dynamic.thread-pool.etcd.endpoints = http://127.0.0.1:2379 +spring.dynamic.thread-pool.etcd.key = /thread +spring.dynamic.thread-pool.executors[0].thread-pool-id=message-consume +spring.dynamic.thread-pool.executors[0].core-pool-size=3 +spring.dynamic.thread-pool.executors[0].maximum-pool-size=4 +spring.dynamic.thread-pool.executors[0].queue-capacity=4 +spring.dynamic.thread-pool.executors[0].execute-time-out=1000 +spring.dynamic.thread-pool.executors[0].blocking-queue=LinkedBlockingQueue +spring.dynamic.thread-pool.executors[0].rejected-handler=AbortPolicy +spring.dynamic.thread-pool.executors[0].keep-alive-time=1000 +spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true +spring.dynamic.thread-pool.executors[0].thread-name-prefix=message-consume +spring.dynamic.thread-pool.executors[0].alarm=true +spring.dynamic.thread-pool.executors[0].active-alarm=80 +spring.dynamic.thread-pool.executors[0].capacity-alarm=80 +spring.dynamic.thread-pool.executors[0].notify.interval=8 +spring.dynamic.thread-pool.executors[0].notify.receives=111 + +spring.dynamic.thread-pool.notify-platforms[0].platform=WECHAT +spring.dynamic.thread-pool.notify-platforms[0].secret-key=ac0426a5-c712-474c-9bff-72b8b8f5caff \ No newline at end of file diff --git a/hippo4j-example/pom.xml b/hippo4j-example/pom.xml index a8159637..55448c1c 100644 --- a/hippo4j-example/pom.xml +++ b/hippo4j-example/pom.xml @@ -22,7 +22,8 @@ hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rabbitmq-example hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example hippo4j-spring-boot-starter-adapter-rocketmq-example - + hippo4j-config-etcd-spring-boot-starter-example + true diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/BootstrapConfigProperties.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/BootstrapConfigProperties.java index 8a534f69..d939e371 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/BootstrapConfigProperties.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/BootstrapConfigProperties.java @@ -20,6 +20,9 @@ package cn.hippo4j.config.springboot.starter.config; import java.util.List; import java.util.Map; +import java.util.List; +import java.util.Map; + import cn.hippo4j.core.config.BootstrapPropertiesInterface; import cn.hippo4j.config.springboot.starter.parser.ConfigFileTypeEnum; import lombok.Getter; diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/ConfigHandlerConfiguration.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/ConfigHandlerConfiguration.java index 0685a0a3..bff13549 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/ConfigHandlerConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/ConfigHandlerConfiguration.java @@ -47,7 +47,7 @@ public class ConfigHandlerConfiguration { private static final String ZOOKEEPER_CONNECT_STR_KEY = "zookeeper.zk-connect-str"; - private static final String ETCD = "endpoints"; + private static final String ETCD = "etcd.endpoints"; @RequiredArgsConstructor @ConditionalOnClass(ConfigService.class) diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java index fd8ce8f5..8818ab3e 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/DynamicThreadPoolCoreAutoConfiguration.java @@ -18,10 +18,6 @@ package cn.hippo4j.config.springboot.starter.config; import cn.hippo4j.common.config.ApplicationContextHolder; -import cn.hippo4j.core.config.UtilAutoConfiguration; -import cn.hippo4j.core.enable.MarkerConfiguration; -import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; -import cn.hippo4j.core.handler.DynamicThreadPoolBannerHandler; import cn.hippo4j.config.springboot.starter.monitor.DynamicThreadPoolMonitorExecutor; import cn.hippo4j.config.springboot.starter.notify.CoreNotifyConfigBuilder; import cn.hippo4j.config.springboot.starter.refresher.event.AdapterExecutorsRefreshListener; @@ -31,12 +27,17 @@ import cn.hippo4j.config.springboot.starter.refresher.event.WebExecutorRefreshLi import cn.hippo4j.config.springboot.starter.support.DynamicThreadPoolAdapterRegister; import cn.hippo4j.config.springboot.starter.support.DynamicThreadPoolConfigService; import cn.hippo4j.config.springboot.starter.support.DynamicThreadPoolPostProcessor; +import cn.hippo4j.core.config.UtilAutoConfiguration; +import cn.hippo4j.core.enable.MarkerConfiguration; +import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; +import cn.hippo4j.core.handler.DynamicThreadPoolBannerHandler; import cn.hippo4j.message.api.NotifyConfigBuilder; import cn.hippo4j.message.config.MessageConfiguration; import cn.hippo4j.message.service.AlarmControlHandler; import cn.hippo4j.message.service.Hippo4jBaseSendMessageService; import cn.hippo4j.message.service.Hippo4jSendMessageService; import lombok.AllArgsConstructor; + import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -59,7 +60,8 @@ import org.springframework.core.annotation.Order; @ConditionalOnProperty(prefix = BootstrapConfigProperties.PREFIX, value = "enable", matchIfMissing = true, havingValue = "true") @Import({ ConfigHandlerConfiguration.EmbeddedNacos.class, ConfigHandlerConfiguration.EmbeddedNacosCloud.class, - ConfigHandlerConfiguration.EmbeddedApollo.class, ConfigHandlerConfiguration.EmbeddedZookeeper.class + ConfigHandlerConfiguration.EmbeddedApollo.class, ConfigHandlerConfiguration.EmbeddedZookeeper.class, + ConfigHandlerConfiguration.EmbeddedEtcd.class }) public class DynamicThreadPoolCoreAutoConfiguration { diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/EtcdRefresherHandler.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/EtcdRefresherHandler.java index 991d8b17..e99eb463 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/EtcdRefresherHandler.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/EtcdRefresherHandler.java @@ -29,13 +29,12 @@ import io.etcd.jetcd.Client; import io.etcd.jetcd.ClientBuilder; import io.etcd.jetcd.KeyValue; import io.etcd.jetcd.Watch; +import io.etcd.jetcd.kv.GetResponse; import io.etcd.jetcd.watch.WatchEvent; import io.etcd.jetcd.watch.WatchResponse; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; /** *@author : wh @@ -43,80 +42,75 @@ import org.springframework.context.ApplicationContextAware; *@description: */ @Slf4j -public class EtcdRefresherHandler extends AbstractCoreThreadPoolDynamicRefresh implements ApplicationContextAware { - - private ApplicationContext applicationContext; - - private Client client; - - private static final String ENDPOINTS = "endpoints"; - - private static final String USER = "user"; - - private static final String PASSWORD = "password"; - - private static final String CHARSET = "charset"; - - private static final String AUTHORITY = "authority"; - - private static final String KEY = "key"; - - @Override - public void afterPropertiesSet() throws Exception { - Map etcd = bootstrapConfigProperties.getEtcd(); - String user = etcd.get(USER); - String password = etcd.get(PASSWORD); - String endpoints = etcd.get(ENDPOINTS); - String authority = etcd.get(AUTHORITY); - String key = etcd.get(KEY); - Charset charset = StringUtil.isBlank(etcd.get(CHARSET)) ? StandardCharsets.UTF_8 : Charset.forName(etcd.get(CHARSET)); - - ClientBuilder clientBuilder = Client.builder().endpoints(endpoints.split(",")); - - client = applicationContext.getBean(Client.class); - if (Objects.isNull(client)) { - client = StringUtil.isAllNotEmpty(user, password) ? clientBuilder.user(ByteSequence.from(user, charset)) - .password(ByteSequence.from(password, charset)).authority(authority) - .build() : clientBuilder.build(); - } - - // todo Currently only supports json - KeyValue keyValue = client.getKVClient().get(ByteSequence.from(key, charset)).get().getKvs().get(0); - if (Objects.isNull(keyValue)) { - return; - } - - client.getWatchClient().watch(ByteSequence.from(key, charset), new Watch.Listener() { - - @Override - public void onNext(WatchResponse response) { - WatchEvent watchEvent = response.getEvents().get(0); - WatchEvent.EventType eventType = watchEvent.getEventType(); - // todo Currently only supports json - if (Objects.equals(eventType, WatchEvent.EventType.PUT)) { - KeyValue keyValue1 = watchEvent.getKeyValue(); - String value = keyValue1.getValue().toString(charset); - Map map = JSONUtil.parseObject(value, Map.class); - dynamicRefresh(keyValue1.getKey().toString(charset), map); - } - - } - - @Override - public void onError(Throwable throwable) { - log.error("dynamic thread pool etcd config watcher exception ", throwable); - } - - @Override - public void onCompleted() { - log.info("dynamic thread pool etcd config key refreshed, config key {}", key); - } - }); - - } - - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - this.applicationContext = applicationContext; - } -} +public class EtcdRefresherHandler extends AbstractCoreThreadPoolDynamicRefresh { + + private ApplicationContext applicationContext; + + private Client client; + + private static final String ENDPOINTS = "endpoints"; + + private static final String USER = "user"; + + private static final String PASSWORD = "password"; + + private static final String CHARSET = "charset"; + + private static final String AUTHORITY = "authority"; + + private static final String KEY = "key"; + + @Override + public void afterPropertiesSet() throws Exception { + Map etcd = bootstrapConfigProperties.getEtcd(); + String user = etcd.get(USER); + String password = etcd.get(PASSWORD); + String endpoints = etcd.get(ENDPOINTS); + String authority = etcd.get(AUTHORITY); + String key = etcd.get(KEY); + Charset charset = StringUtil.isBlank(etcd.get(CHARSET)) ? StandardCharsets.UTF_8 : Charset.forName(etcd.get(CHARSET)); + + ClientBuilder clientBuilder = Client.builder().endpoints(endpoints.split(",")); + //todo + if (Objects.isNull(client)) { + client = StringUtil.isAllNotEmpty(user, password) ? clientBuilder.user(ByteSequence.from(user, charset)) + .password(ByteSequence.from(password, charset)).authority(authority) + .build() : clientBuilder.build(); + } + + // todo Currently only supports json + GetResponse getResponse = client.getKVClient().get(ByteSequence.from(key, charset)).get(); + KeyValue keyValue = getResponse.getKvs().get(0); + if (Objects.isNull(keyValue)) { + return; + } + client.getWatchClient().watch(ByteSequence.from(key, charset), new Watch.Listener() { + + @Override + public void onNext(WatchResponse response) { + WatchEvent watchEvent = response.getEvents().get(0); + WatchEvent.EventType eventType = watchEvent.getEventType(); + // todo Currently only supports json + if (Objects.equals(eventType, WatchEvent.EventType.PUT)) { + KeyValue keyValue1 = watchEvent.getKeyValue(); + String value = keyValue1.getValue().toString(charset); + Map map = JSONUtil.parseObject(value, Map.class); + dynamicRefresh(keyValue1.getKey().toString(charset), map); + } + + } + + @Override + public void onError(Throwable throwable) { + log.error("dynamic thread pool etcd config watcher exception ", throwable); + } + + @Override + public void onCompleted() { + log.info("dynamic thread pool etcd config key refreshed, config key {}", key); + } + }); + + } + +} \ No newline at end of file