Merge pull request #145 from Redick01/#143

#143 support zookeeper config center
pull/146/head
龙台 Long Tai 3 years ago committed by GitHub
commit 7cf7baaa9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,18 @@
package cn.hippo4j.example.core.zookeeper;
import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author Redick01
* @date 2022/3/14 20:40
*/
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.core")
public class Hippo4jCoreZookeeperExampleApplication {
public static void main(String[] args) {
SpringApplication.run(Hippo4jCoreZookeeperExampleApplication.class, args);
}
}

@ -0,0 +1,19 @@
server:
port: 8888
spring:
application:
name: dynamic-threadpool-zookeeper-example
dynamic:
thread-pool:
config-file-type: properties
zookeeper:
zk-connect-str: 127.0.0.1:2181
config-version: 1.0.0
root-node: /configserver/userproject
node: zookeeper-demo
#logging:
# level:
# root: DEBUG

@ -0,0 +1,40 @@
# Export from zookeeper configuration group: [/configserver/userproject] - [1.0.0] - [zookeeper-demo].
spring.application.name=dynamic-threadpool-zookeeper-example
spring.dynamic.thread-pool.banner=true
spring.dynamic.thread-pool.check-state-interval=5
spring.dynamic.thread-pool.collect=true
spring.dynamic.thread-pool.config-file-type=properties
spring.dynamic.thread-pool.enable=true
spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out=true
spring.dynamic.thread-pool.executors[0].blocking-queue=LinkedBlockingQueue
spring.dynamic.thread-pool.executors[0].core-pool-size=2
spring.dynamic.thread-pool.executors[0].execute-time-out=1000
spring.dynamic.thread-pool.executors[0].keep-alive-time=6691
spring.dynamic.thread-pool.executors[0].maximum-pool-size=4
spring.dynamic.thread-pool.executors[0].notify.active-alarm=80
spring.dynamic.thread-pool.executors[0].notify.capacity-alarm=80
spring.dynamic.thread-pool.executors[0].notify.interval=8
spring.dynamic.thread-pool.executors[0].notify.is-alarm=true
spring.dynamic.thread-pool.executors[0].notify.receives.DING=177****6993
spring.dynamic.thread-pool.executors[0].queue-capacity=100
spring.dynamic.thread-pool.executors[0].rejected-handler=AbortPolicy
spring.dynamic.thread-pool.executors[0].thread-name-prefix=message-consume
spring.dynamic.thread-pool.executors[0].thread-pool-id=message-consume
spring.dynamic.thread-pool.executors[1].allow-core-thread-time-out=true
spring.dynamic.thread-pool.executors[1].blocking-queue=LinkedBlockingQueue
spring.dynamic.thread-pool.executors[1].core-pool-size=1
spring.dynamic.thread-pool.executors[1].execute-time-out=1000
spring.dynamic.thread-pool.executors[1].keep-alive-time=6691
spring.dynamic.thread-pool.executors[1].maximum-pool-size=1
spring.dynamic.thread-pool.executors[1].notify.active-alarm=80
spring.dynamic.thread-pool.executors[1].notify.capacity-alarm=80
spring.dynamic.thread-pool.executors[1].notify.interval=8
spring.dynamic.thread-pool.executors[1].notify.is-alarm=true
spring.dynamic.thread-pool.executors[1].notify.receives.DING=177****6993
spring.dynamic.thread-pool.executors[1].queue-capacity=1
spring.dynamic.thread-pool.executors[1].rejected-handler=AbortPolicy
spring.dynamic.thread-pool.executors[1].thread-name-prefix=message-produce
spring.dynamic.thread-pool.executors[1].thread-pool-id=message-produce
spring.dynamic.thread-pool.notify-platforms[0].platform=DING
spring.dynamic.thread-pool.notify-platforms[0].secret-key=aab197577f6d8cc3aa8b52ee38adb6e16a46642a9c4986f5e45ca6946fdcea6f

@ -28,6 +28,7 @@
<module>hippo4j-core-nacos-spring-boot-starter-example</module>
<!-- 测试 Hippo4J-Core Apollo 配置中心 -->
<module>hippo4j-core-apollo-spring-boot-starter-example</module>
<module>hippo4j-core-zookeeper-spring-boot-starter-example</module>
</modules>
</project>

@ -65,6 +65,27 @@
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.1.0</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.guicedee.services</groupId>
<artifactId>commons-lang3</artifactId>
<version>1.2.1.1-jre17</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>

@ -57,6 +57,11 @@ public class BootstrapCoreProperties implements BootstrapPropertiesInterface {
*/
private Map<String, String> apollo;
/**
* Zookeeper config.
*/
private Map<String, String> zookeeper;
/**
* Tomcat thread pool config.
*/

@ -17,6 +17,7 @@ import cn.hippo4j.core.starter.notify.CoreNotifyConfigBuilder;
import cn.hippo4j.core.starter.refresher.ApolloRefresherHandler;
import cn.hippo4j.core.starter.refresher.NacosCloudRefresherHandler;
import cn.hippo4j.core.starter.refresher.NacosRefresherHandler;
import cn.hippo4j.core.starter.refresher.ZookeeperRefresherHandler;
import cn.hippo4j.core.starter.support.DynamicThreadPoolPostProcessor;
import lombok.AllArgsConstructor;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
@ -52,6 +53,8 @@ public class DynamicThreadPoolCoreAutoConfiguration {
private static final String APOLLO_CONFIG_KEY = "com.ctrip.framework.apollo.ConfigService";
private static final String ZK_CONFIG_KEY = "org.apache.curator.framework.CuratorFramework";
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public ApplicationContextHolder hippo4JApplicationContextHolder() {
@ -121,4 +124,10 @@ public class DynamicThreadPoolCoreAutoConfiguration {
return new ApolloRefresherHandler(threadPoolNotifyAlarmHandler, bootstrapCoreProperties);
}
@Bean
@ConditionalOnClass(name = ZK_CONFIG_KEY)
public ZookeeperRefresherHandler zookeeperRefresher(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler,
BootstrapCoreProperties bootstrapCoreProperties) {
return new ZookeeperRefresherHandler(threadPoolNotifyAlarmHandler, bootstrapCoreProperties);
}
}

@ -32,10 +32,11 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder {
Map<String, List<NotifyConfigDTO>> resultMap = Maps.newHashMap();
List<ExecutorProperties> executors = bootstrapCoreProperties.getExecutors();
for (ExecutorProperties executor : executors) {
resultMap.putAll(buildSingleNotifyConfig(executor));
if (null !=executors) {
for (ExecutorProperties executor : executors) {
resultMap.putAll(buildSingleNotifyConfig(executor));
}
}
return resultMap;
}

@ -6,12 +6,14 @@ import cn.hippo4j.common.model.PoolParameter;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.notify.HippoBaseSendMessageService;
import cn.hippo4j.common.notify.NotifyConfigDTO;
import cn.hippo4j.common.notify.ThreadPoolNotifyAlarm;
import cn.hippo4j.common.notify.request.ChangeParameterNotifyRequest;
import cn.hippo4j.common.web.executor.WebThreadPoolHandlerChoose;
import cn.hippo4j.common.web.executor.WebThreadPoolService;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.*;
import cn.hippo4j.core.proxy.RejectedProxyUtil;
@ -75,6 +77,24 @@ public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPool
refreshExecutors(bindableCoreProperties);
}
/**
* register notify alarm manage
*/
public void registerNotifyAlarmManage() {
bootstrapCoreProperties.getExecutors().forEach(executorProperties -> {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(
executorProperties.getNotify().getIsAlarm(),
executorProperties.getNotify().getCapacityAlarm(),
executorProperties.getNotify().getActiveAlarm()
);
threadPoolNotifyAlarm.setInterval(executorProperties.getNotify().getInterval());
threadPoolNotifyAlarm.setReceives(executorProperties.getNotify().getReceives());
GlobalNotifyAlarmManage.put(executorProperties.getThreadPoolId(), threadPoolNotifyAlarm);
});
}
/**
* Refresh web executor.
*

@ -0,0 +1,95 @@
package cn.hippo4j.core.starter.refresher;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.starter.config.BootstrapCoreProperties;
import com.google.common.base.Charsets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.WatchedEvent;
import java.util.List;
import java.util.Map;
/**
* @author Redick01
* @date 2022/3/14 16:03
*/
@Slf4j
public class ZookeeperRefresherHandler extends AbstractCoreThreadPoolDynamicRefresh {
private CuratorFramework curatorFramework;
public ZookeeperRefresherHandler(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler, BootstrapCoreProperties bootstrapCoreProperties) {
super(threadPoolNotifyAlarmHandler, bootstrapCoreProperties);
}
@Override
public void afterPropertiesSet() {
Map<String, String> zkConfigs = bootstrapCoreProperties.getZookeeper();
curatorFramework = CuratorFrameworkFactory.newClient(zkConfigs.get("zk-connect-str"),
new ExponentialBackoffRetry(1000, 3));
String nodePath = ZKPaths.makePath(ZKPaths.makePath(zkConfigs.get("root-node"),
zkConfigs.get("config-version")), zkConfigs.get("node"));
final ConnectionStateListener connectionStateListener = (client, newState) -> {
if (newState == ConnectionState.CONNECTED) {
loadNode(nodePath);
} else if (newState == ConnectionState.RECONNECTED) {
loadNode(nodePath);
}};
final CuratorListener curatorListener = (client, curatorEvent) -> {
final WatchedEvent watchedEvent = curatorEvent.getWatchedEvent();
if (null != watchedEvent) {
switch (watchedEvent.getType()) {
case NodeChildrenChanged:
case NodeDataChanged:
loadNode(nodePath);
break;
default:
break;
}
}};
curatorFramework.getConnectionStateListenable().addListener(connectionStateListener);
curatorFramework.getCuratorListenable().addListener(curatorListener);
curatorFramework.start();
}
/**
* load config info and refresh.
* @param nodePath zk config node path.
*/
public void loadNode(String nodePath) {
try {
final GetChildrenBuilder childrenBuilder = curatorFramework.getChildren();
final List<String> children = childrenBuilder.watched().forPath(nodePath);
StringBuilder content = new StringBuilder();
children.forEach(c -> {
String n = ZKPaths.makePath(nodePath, c);
final String nodeName = ZKPaths.getNodeFromPath(n);
final GetDataBuilder data = curatorFramework.getData();
String value = "";
try {
value = new String(data.watched().forPath(n), Charsets.UTF_8);
} catch (Exception e) {
e.printStackTrace();
}
final Pair<String, String> keyValue = new ImmutablePair<>(nodeName, value);
content.append(keyValue.getKey()).append("=").append(keyValue.getValue()).append("\n");
});
dynamicRefresh(content.toString());
registerNotifyAlarmManage();
} catch (Exception e) {
log.error("load zk node error, nodePath is {}", nodePath, e);
}
}
}

@ -90,67 +90,68 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
protected ThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrapper dynamicThreadPoolWrap) {
String threadPoolId = dynamicThreadPoolWrap.getTpId();
ThreadPoolExecutor newDynamicPoolExecutor = dynamicThreadPoolWrap.getExecutor();
ExecutorProperties executorProperties = null;
if (null != bootstrapCoreProperties.getExecutors()) {
executorProperties = bootstrapCoreProperties.getExecutors()
.stream()
.filter(each -> Objects.equals(threadPoolId, each.getThreadPoolId()))
.findFirst()
.orElse(null);
if (executorProperties != null) {
try {
// 使用相关参数创建线程池
BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(executorProperties.getBlockingQueue(), executorProperties.getQueueCapacity());
String threadNamePrefix = executorProperties.getThreadNamePrefix();
newDynamicPoolExecutor = ThreadPoolBuilder.builder()
.dynamicPool()
.workQueue(workQueue)
.threadFactory(StringUtil.isNotBlank(threadNamePrefix) ? threadNamePrefix : threadPoolId)
.executeTimeOut(Optional.ofNullable(executorProperties.getExecuteTimeOut()).orElse(0L))
.poolThreadSize(executorProperties.getCorePoolSize(), executorProperties.getMaximumPoolSize())
.keepAliveTime(executorProperties.getKeepAliveTime(), TimeUnit.SECONDS)
.rejected(RejectedTypeEnum.createPolicy(executorProperties.getRejectedHandler()))
.allowCoreThreadTimeOut(executorProperties.getAllowCoreThreadTimeOut())
.build();
} catch (Exception ex) {
log.error("Failed to initialize thread pool configuration. error :: {}", ex);
} finally {
if (Objects.isNull(dynamicThreadPoolWrap.getExecutor())) {
dynamicThreadPoolWrap.setExecutor(CommonDynamicThreadPool.getInstance(threadPoolId));
}
ExecutorProperties executorProperties = bootstrapCoreProperties.getExecutors()
.stream()
.filter(each -> Objects.equals(threadPoolId, each.getThreadPoolId()))
.findFirst()
.orElse(null);
if (executorProperties != null) {
try {
// 使用相关参数创建线程池
BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(executorProperties.getBlockingQueue(), executorProperties.getQueueCapacity());
String threadNamePrefix = executorProperties.getThreadNamePrefix();
newDynamicPoolExecutor = ThreadPoolBuilder.builder()
.dynamicPool()
.workQueue(workQueue)
.threadFactory(StringUtil.isNotBlank(threadNamePrefix) ? threadNamePrefix : threadPoolId)
.executeTimeOut(Optional.ofNullable(executorProperties.getExecuteTimeOut()).orElse(0L))
.poolThreadSize(executorProperties.getCorePoolSize(), executorProperties.getMaximumPoolSize())
.keepAliveTime(executorProperties.getKeepAliveTime(), TimeUnit.SECONDS)
.rejected(RejectedTypeEnum.createPolicy(executorProperties.getRejectedHandler()))
.allowCoreThreadTimeOut(executorProperties.getAllowCoreThreadTimeOut())
.build();
} catch (Exception ex) {
log.error("Failed to initialize thread pool configuration. error :: {}", ex);
} finally {
if (Objects.isNull(dynamicThreadPoolWrap.getExecutor())) {
dynamicThreadPoolWrap.setExecutor(CommonDynamicThreadPool.getInstance(threadPoolId));
dynamicThreadPoolWrap.setInitFlag(Boolean.TRUE);
}
}
dynamicThreadPoolWrap.setInitFlag(Boolean.TRUE);
// 设置动态线程池增强参数
ThreadPoolNotifyAlarm notify = Optional.ofNullable(executorProperties)
.map(each -> each.getNotify())
.orElseGet(() -> {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(true, 80, 80);
threadPoolNotifyAlarm.setInterval(2);
return threadPoolNotifyAlarm;
});
if (dynamicThreadPoolWrap.getExecutor() instanceof AbstractDynamicExecutorSupport) {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(
notify.getIsAlarm(),
notify.getCapacityAlarm(),
notify.getActiveAlarm()
);
threadPoolNotifyAlarm.setInterval(notify.getInterval());
threadPoolNotifyAlarm.setReceives(notify.getReceives());
GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm);
TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getTaskDecorator();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator);
long awaitTerminationMillis = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).awaitTerminationMillis;
boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).waitForTasksToCompleteOnShutdown;
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown);
}
}
// 设置动态线程池增强参数
ThreadPoolNotifyAlarm notify = Optional.ofNullable(executorProperties)
.map(each -> each.getNotify())
.orElseGet(() -> {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(true, 80, 80);
threadPoolNotifyAlarm.setInterval(2);
return threadPoolNotifyAlarm;
});
if (dynamicThreadPoolWrap.getExecutor() instanceof AbstractDynamicExecutorSupport) {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(
notify.getIsAlarm(),
notify.getCapacityAlarm(),
notify.getActiveAlarm()
);
threadPoolNotifyAlarm.setInterval(notify.getInterval());
threadPoolNotifyAlarm.setReceives(notify.getReceives());
GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm);
TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getTaskDecorator();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator);
long awaitTerminationMillis = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).awaitTerminationMillis;
boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).waitForTasksToCompleteOnShutdown;
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown);
dynamicThreadPoolWrap.setExecutor(newDynamicPoolExecutor);
}
dynamicThreadPoolWrap.setExecutor(newDynamicPoolExecutor);
GlobalThreadPoolManage.registerPool(dynamicThreadPoolWrap.getTpId(), dynamicThreadPoolWrap);
GlobalCoreThreadPoolManage.register(
threadPoolId,

Loading…
Cancel
Save