* add etcd

* add etcd

* add etcd

* add etcd

* add etcd

* delete test
pull/636/head
weihubeats 2 years ago committed by GitHub
parent 41b970a15f
commit 36239e8a4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hippo4j-example</artifactId>
<groupId>cn.hippo4j</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hippo4j-config-etcd-spring-boot-starter-example</artifactId>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-config-spring-boot-starter</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
<version>${jetcd.version}</version>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-example-core</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
</project>

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.example.config.etcd;
import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
*@author : wh
*@date : 2022/9/2 19:06
*@description:
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.config")
public class ConfigEtcdExampleApplication {
public static void main(String[] args) {
SpringApplication.run(ConfigEtcdExampleApplication.class, args);
}
}

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.example.config.etcd.config;
import java.util.concurrent.ThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPool;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
*@author : wh
*@date : 2022/9/2 19:26
*@description:
*/
@Configuration
public class ThreadPoolConfig {
@Bean
@DynamicThreadPool
public ThreadPoolExecutor messageConsumeDynamicExecutor() {
String threadPoolId = "message-consume";
ThreadPoolExecutor messageConsumeDynamicExecutor = ThreadPoolBuilder.builder()
.threadFactory(threadPoolId)
.threadPoolId(threadPoolId)
.dynamicPool()
.build();
return messageConsumeDynamicExecutor;
}
}

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.example.config.etcd.controller;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
*@author : wh
*@date : 2022/9/2 19:18
*@description:
*/
@RestController
@RequestMapping
public class TestController {
@Autowired
private ThreadPoolExecutor messageConsumeDynamicExecutor;
@GetMapping("test")
public void test() {
System.out.println(messageConsumeDynamicExecutor.getMaximumPoolSize());
}
}

@ -0,0 +1,23 @@
server.port=8888
spring.application.name=etcd
spring.dynamic.thread-pool.etcd.endpoints = http://127.0.0.1:2379
spring.dynamic.thread-pool.etcd.key = /thread
spring.dynamic.thread-pool.executors[0].thread-pool-id=message-consume
spring.dynamic.thread-pool.executors[0].core-pool-size=3
spring.dynamic.thread-pool.executors[0].maximum-pool-size=4
spring.dynamic.thread-pool.executors[0].queue-capacity=4
spring.dynamic.thread-pool.executors[0].execute-time-out=1000
spring.dynamic.thread-pool.executors[0].blocking-queue=LinkedBlockingQueue
spring.dynamic.thread-pool.executors[0].rejected-handler=AbortPolicy
spring.dynamic.thread-pool.executors[0].keep-alive-time=1000
spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true
spring.dynamic.thread-pool.executors[0].thread-name-prefix=message-consume
spring.dynamic.thread-pool.executors[0].alarm=true
spring.dynamic.thread-pool.executors[0].active-alarm=80
spring.dynamic.thread-pool.executors[0].capacity-alarm=80
spring.dynamic.thread-pool.executors[0].notify.interval=8
spring.dynamic.thread-pool.executors[0].notify.receives=111
spring.dynamic.thread-pool.notify-platforms[0].platform=WECHAT
spring.dynamic.thread-pool.notify-platforms[0].secret-key=ac0426a5-c712-474c-9bff-72b8b8f5caff

@ -22,7 +22,8 @@
<module>hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rabbitmq-example</module>
<module>hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example</module>
<module>hippo4j-spring-boot-starter-adapter-rocketmq-example</module>
</modules>
<module>hippo4j-config-etcd-spring-boot-starter-example</module>
</modules>
<properties>
<maven.deploy.skip>true</maven.deploy.skip>

@ -20,6 +20,9 @@ package cn.hippo4j.config.springboot.starter.config;
import java.util.List;
import java.util.Map;
import java.util.List;
import java.util.Map;
import cn.hippo4j.core.config.BootstrapPropertiesInterface;
import cn.hippo4j.config.springboot.starter.parser.ConfigFileTypeEnum;
import lombok.Getter;

@ -47,7 +47,7 @@ public class ConfigHandlerConfiguration {
private static final String ZOOKEEPER_CONNECT_STR_KEY = "zookeeper.zk-connect-str";
private static final String ETCD = "endpoints";
private static final String ETCD = "etcd.endpoints";
@RequiredArgsConstructor
@ConditionalOnClass(ConfigService.class)

@ -18,10 +18,6 @@
package cn.hippo4j.config.springboot.starter.config;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.config.UtilAutoConfiguration;
import cn.hippo4j.core.enable.MarkerConfiguration;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.handler.DynamicThreadPoolBannerHandler;
import cn.hippo4j.config.springboot.starter.monitor.DynamicThreadPoolMonitorExecutor;
import cn.hippo4j.config.springboot.starter.notify.CoreNotifyConfigBuilder;
import cn.hippo4j.config.springboot.starter.refresher.event.AdapterExecutorsRefreshListener;
@ -31,12 +27,17 @@ import cn.hippo4j.config.springboot.starter.refresher.event.WebExecutorRefreshLi
import cn.hippo4j.config.springboot.starter.support.DynamicThreadPoolAdapterRegister;
import cn.hippo4j.config.springboot.starter.support.DynamicThreadPoolConfigService;
import cn.hippo4j.config.springboot.starter.support.DynamicThreadPoolPostProcessor;
import cn.hippo4j.core.config.UtilAutoConfiguration;
import cn.hippo4j.core.enable.MarkerConfiguration;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.handler.DynamicThreadPoolBannerHandler;
import cn.hippo4j.message.api.NotifyConfigBuilder;
import cn.hippo4j.message.config.MessageConfiguration;
import cn.hippo4j.message.service.AlarmControlHandler;
import cn.hippo4j.message.service.Hippo4jBaseSendMessageService;
import cn.hippo4j.message.service.Hippo4jSendMessageService;
import lombok.AllArgsConstructor;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@ -59,7 +60,8 @@ import org.springframework.core.annotation.Order;
@ConditionalOnProperty(prefix = BootstrapConfigProperties.PREFIX, value = "enable", matchIfMissing = true, havingValue = "true")
@Import({
ConfigHandlerConfiguration.EmbeddedNacos.class, ConfigHandlerConfiguration.EmbeddedNacosCloud.class,
ConfigHandlerConfiguration.EmbeddedApollo.class, ConfigHandlerConfiguration.EmbeddedZookeeper.class
ConfigHandlerConfiguration.EmbeddedApollo.class, ConfigHandlerConfiguration.EmbeddedZookeeper.class,
ConfigHandlerConfiguration.EmbeddedEtcd.class
})
public class DynamicThreadPoolCoreAutoConfiguration {

@ -29,13 +29,12 @@ import io.etcd.jetcd.Client;
import io.etcd.jetcd.ClientBuilder;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.watch.WatchEvent;
import io.etcd.jetcd.watch.WatchResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
/**
*@author : wh
@ -43,80 +42,75 @@ import org.springframework.context.ApplicationContextAware;
*@description:
*/
@Slf4j
public class EtcdRefresherHandler extends AbstractCoreThreadPoolDynamicRefresh implements ApplicationContextAware {
private ApplicationContext applicationContext;
private Client client;
private static final String ENDPOINTS = "endpoints";
private static final String USER = "user";
private static final String PASSWORD = "password";
private static final String CHARSET = "charset";
private static final String AUTHORITY = "authority";
private static final String KEY = "key";
@Override
public void afterPropertiesSet() throws Exception {
Map<String, String> etcd = bootstrapConfigProperties.getEtcd();
String user = etcd.get(USER);
String password = etcd.get(PASSWORD);
String endpoints = etcd.get(ENDPOINTS);
String authority = etcd.get(AUTHORITY);
String key = etcd.get(KEY);
Charset charset = StringUtil.isBlank(etcd.get(CHARSET)) ? StandardCharsets.UTF_8 : Charset.forName(etcd.get(CHARSET));
ClientBuilder clientBuilder = Client.builder().endpoints(endpoints.split(","));
client = applicationContext.getBean(Client.class);
if (Objects.isNull(client)) {
client = StringUtil.isAllNotEmpty(user, password) ? clientBuilder.user(ByteSequence.from(user, charset))
.password(ByteSequence.from(password, charset)).authority(authority)
.build() : clientBuilder.build();
}
// todo Currently only supports json
KeyValue keyValue = client.getKVClient().get(ByteSequence.from(key, charset)).get().getKvs().get(0);
if (Objects.isNull(keyValue)) {
return;
}
client.getWatchClient().watch(ByteSequence.from(key, charset), new Watch.Listener() {
@Override
public void onNext(WatchResponse response) {
WatchEvent watchEvent = response.getEvents().get(0);
WatchEvent.EventType eventType = watchEvent.getEventType();
// todo Currently only supports json
if (Objects.equals(eventType, WatchEvent.EventType.PUT)) {
KeyValue keyValue1 = watchEvent.getKeyValue();
String value = keyValue1.getValue().toString(charset);
Map map = JSONUtil.parseObject(value, Map.class);
dynamicRefresh(keyValue1.getKey().toString(charset), map);
}
}
@Override
public void onError(Throwable throwable) {
log.error("dynamic thread pool etcd config watcher exception ", throwable);
}
@Override
public void onCompleted() {
log.info("dynamic thread pool etcd config key refreshed, config key {}", key);
}
});
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
public class EtcdRefresherHandler extends AbstractCoreThreadPoolDynamicRefresh {
private ApplicationContext applicationContext;
private Client client;
private static final String ENDPOINTS = "endpoints";
private static final String USER = "user";
private static final String PASSWORD = "password";
private static final String CHARSET = "charset";
private static final String AUTHORITY = "authority";
private static final String KEY = "key";
@Override
public void afterPropertiesSet() throws Exception {
Map<String, String> etcd = bootstrapConfigProperties.getEtcd();
String user = etcd.get(USER);
String password = etcd.get(PASSWORD);
String endpoints = etcd.get(ENDPOINTS);
String authority = etcd.get(AUTHORITY);
String key = etcd.get(KEY);
Charset charset = StringUtil.isBlank(etcd.get(CHARSET)) ? StandardCharsets.UTF_8 : Charset.forName(etcd.get(CHARSET));
ClientBuilder clientBuilder = Client.builder().endpoints(endpoints.split(","));
//todo
if (Objects.isNull(client)) {
client = StringUtil.isAllNotEmpty(user, password) ? clientBuilder.user(ByteSequence.from(user, charset))
.password(ByteSequence.from(password, charset)).authority(authority)
.build() : clientBuilder.build();
}
// todo Currently only supports json
GetResponse getResponse = client.getKVClient().get(ByteSequence.from(key, charset)).get();
KeyValue keyValue = getResponse.getKvs().get(0);
if (Objects.isNull(keyValue)) {
return;
}
client.getWatchClient().watch(ByteSequence.from(key, charset), new Watch.Listener() {
@Override
public void onNext(WatchResponse response) {
WatchEvent watchEvent = response.getEvents().get(0);
WatchEvent.EventType eventType = watchEvent.getEventType();
// todo Currently only supports json
if (Objects.equals(eventType, WatchEvent.EventType.PUT)) {
KeyValue keyValue1 = watchEvent.getKeyValue();
String value = keyValue1.getValue().toString(charset);
Map map = JSONUtil.parseObject(value, Map.class);
dynamicRefresh(keyValue1.getKey().toString(charset), map);
}
}
@Override
public void onError(Throwable throwable) {
log.error("dynamic thread pool etcd config watcher exception ", throwable);
}
@Override
public void onCompleted() {
log.info("dynamic thread pool etcd config key refreshed, config key {}", key);
}
});
}
}
Loading…
Cancel
Save