mirror of https://github.com/longtai-cn/hippo4j
parent
2ccf9aa7de
commit
2cacdc31e5
@ -0,0 +1,41 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>io.dynamic-thread-pool</groupId>
|
||||
<artifactId>dtp-parent</artifactId>
|
||||
<version>${revision}</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>dtp-example</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
|
||||
<name>dtp-example</name>
|
||||
<description>Demo project for Spring Boot</description>
|
||||
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.dynamic-thread-pool</groupId>
|
||||
<artifactId>dtp-spring-boot-starter</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
@ -0,0 +1,13 @@
|
||||
package io.dtp.example;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@SpringBootApplication
|
||||
public class ExampleApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ExampleApplication.class, args);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
package io.dtp.example.config;
|
||||
|
||||
import io.dtp.starter.wrap.DynamicThreadPoolWrap;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* 线程池配置
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/20 17:16
|
||||
*/
|
||||
@Configuration
|
||||
public class ThreadPoolConfig {
|
||||
|
||||
@Bean
|
||||
public DynamicThreadPoolWrap messageCenterConsumeThreadPool() {
|
||||
return new DynamicThreadPoolWrap("message-consume");
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,13 @@
|
||||
package io.dtp.example;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
|
||||
@SpringBootTest
|
||||
class ExampleApplicationTests {
|
||||
|
||||
@Test
|
||||
void contextLoads() {
|
||||
}
|
||||
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package io.ruyi.server;
|
||||
package io.dtp.server;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
@ -0,0 +1,18 @@
|
||||
package io.dtp.server.constant;
|
||||
|
||||
/**
|
||||
* 服务端常量类
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/20 13:54
|
||||
*/
|
||||
public class Constants {
|
||||
|
||||
public static final String BASE_PATH = "/v1/cs";
|
||||
|
||||
public static final String CONFIG_CONTROLLER_PATH = BASE_PATH + "/configs";
|
||||
|
||||
public static final String LISTENING_CONFIGS = "Listening-Configs";
|
||||
|
||||
public static final String ENCODE = "UTF-8";
|
||||
}
|
@ -0,0 +1,50 @@
|
||||
package io.dtp.server.controller;
|
||||
|
||||
import io.dtp.server.constant.Constants;
|
||||
import io.dtp.server.model.ConfigInfoBase;
|
||||
import io.dtp.server.service.ConfigService;
|
||||
import lombok.SneakyThrows;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import java.net.URLDecoder;
|
||||
|
||||
/**
|
||||
* 服务端配置控制器
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/20 13:53
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping(Constants.CONFIG_CONTROLLER_PATH)
|
||||
public class ConfigController {
|
||||
|
||||
@Autowired
|
||||
private ConfigService configService;
|
||||
|
||||
@GetMapping
|
||||
public ConfigInfoBase detailConfigInfo(
|
||||
@RequestParam("tdId") String tdId,
|
||||
@RequestParam("itemId") String itemId,
|
||||
@RequestParam(value = "tenant", required = false, defaultValue = "") String tenant) {
|
||||
|
||||
return configService.findConfigAllInfo(tdId, itemId, tenant);
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
@PostMapping("/listener")
|
||||
public void listener(HttpServletRequest request, HttpServletResponse response) {
|
||||
request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
|
||||
|
||||
String probeModify = request.getParameter(Constants.LISTENING_CONFIGS);
|
||||
if (StringUtils.isEmpty(probeModify)) {
|
||||
throw new IllegalArgumentException("invalid probeModify");
|
||||
}
|
||||
|
||||
probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,36 @@
|
||||
package io.dtp.server.mapper;
|
||||
|
||||
import io.dtp.server.model.ConfigAllInfo;
|
||||
import org.springframework.jdbc.core.RowMapper;
|
||||
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
|
||||
/**
|
||||
* ConfigAllInfoRowMapper
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/20 15:57
|
||||
*/
|
||||
|
||||
public final class RowMapperManager {
|
||||
|
||||
public static final ConfigAllInfoRowMapper CONFIG_ALL_INFO_ROW_MAPPER = new ConfigAllInfoRowMapper();
|
||||
|
||||
public static class ConfigAllInfoRowMapper implements RowMapper<ConfigAllInfo> {
|
||||
|
||||
public ConfigAllInfo mapRow(ResultSet rs, int rowNum) throws SQLException {
|
||||
ConfigAllInfo configAllInfo = new ConfigAllInfo();
|
||||
configAllInfo.setDataId(rs.getString("data_id"));
|
||||
configAllInfo.setGroupId(rs.getString("group_id"));
|
||||
configAllInfo.setTenant(rs.getString("tenant_id"));
|
||||
configAllInfo.setContent(rs.getString("content"));
|
||||
configAllInfo.setMd5(rs.getString("md5"));
|
||||
configAllInfo.setCreateTime(rs.getTimestamp("gmt_modified").getTime());
|
||||
configAllInfo.setModifyTime(rs.getTimestamp("gmt_modified").getTime());
|
||||
return configAllInfo;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,23 @@
|
||||
package io.dtp.server.model;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* 配置全部信息
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/20 15:14
|
||||
*/
|
||||
@Data
|
||||
public class ConfigAllInfo extends ConfigInfo {
|
||||
|
||||
private static final long serialVersionUID = -2417394244017463665L;
|
||||
|
||||
private String createUser;
|
||||
|
||||
private String desc;
|
||||
|
||||
private Long createTime;
|
||||
|
||||
private Long modifyTime;
|
||||
}
|
@ -0,0 +1,17 @@
|
||||
package io.dtp.server.model;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* 配置信息
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/20 15:59
|
||||
*/
|
||||
@Data
|
||||
public class ConfigInfo extends ConfigInfoBase {
|
||||
|
||||
private static final long serialVersionUID = -3504960832191834675L;
|
||||
|
||||
private String tenant;
|
||||
}
|
@ -0,0 +1,37 @@
|
||||
package io.dtp.server.model;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* 基础配置信息
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/20 14:05
|
||||
*/
|
||||
@Data
|
||||
public class ConfigInfoBase implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = -1892597426099265730L;
|
||||
|
||||
/**
|
||||
* DataId
|
||||
*/
|
||||
private String dataId;
|
||||
|
||||
/**
|
||||
* GroupId
|
||||
*/
|
||||
private String groupId;
|
||||
|
||||
/**
|
||||
* 内容
|
||||
*/
|
||||
private String content;
|
||||
|
||||
/**
|
||||
* MD5
|
||||
*/
|
||||
private String md5;
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
package io.dtp.server.service;
|
||||
|
||||
import io.dtp.server.model.ConfigAllInfo;
|
||||
|
||||
/**
|
||||
* 服务端配置接口
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/20 15:18
|
||||
*/
|
||||
public interface ConfigService {
|
||||
|
||||
/**
|
||||
* 查询配置全部信息
|
||||
*
|
||||
* @param tpId tpId
|
||||
* @param itemId itemId
|
||||
* @param tenant tenant
|
||||
* @return 全部配置信息
|
||||
*/
|
||||
ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String tenant);
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
package io.dtp.server.service.impl;
|
||||
|
||||
import io.dtp.server.mapper.RowMapperManager;
|
||||
import io.dtp.server.model.ConfigAllInfo;
|
||||
import io.dtp.server.service.ConfigService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* 服务端配置接口实现
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/20 15:21
|
||||
*/
|
||||
@Service
|
||||
public class ConfigServiceImpl implements ConfigService {
|
||||
|
||||
@Autowired
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
|
||||
public ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String tenant) {
|
||||
ConfigAllInfo configAllInfo = jdbcTemplate.queryForObject(
|
||||
"select * from config_info where tp_id = ? and item_id = ? and tenant_id = ?",
|
||||
new Object[]{tpId, itemId, tenant},
|
||||
RowMapperManager.CONFIG_ALL_INFO_ROW_MAPPER);
|
||||
|
||||
return configAllInfo;
|
||||
}
|
||||
}
|
@ -0,0 +1,9 @@
|
||||
server:
|
||||
port: 6691
|
||||
|
||||
spring:
|
||||
datasource:
|
||||
driver-class-name: com.mysql.jdbc.Driver
|
||||
url: jdbc:mysql://localhost:3306/dynamic-thread-pool
|
||||
username: root
|
||||
password: root
|
@ -0,0 +1,33 @@
|
||||
HELP.md
|
||||
target/
|
||||
!.mvn/wrapper/maven-wrapper.jar
|
||||
!**/src/main/**/target/
|
||||
!**/src/test/**/target/
|
||||
|
||||
### STS ###
|
||||
.apt_generated
|
||||
.classpath
|
||||
.factorypath
|
||||
.project
|
||||
.settings
|
||||
.springBeans
|
||||
.sts4-cache
|
||||
|
||||
### IntelliJ IDEA ###
|
||||
.idea
|
||||
*.iws
|
||||
*.iml
|
||||
*.ipr
|
||||
|
||||
### NetBeans ###
|
||||
/nbproject/private/
|
||||
/nbbuild/
|
||||
/dist/
|
||||
/nbdist/
|
||||
/.nb-gradle/
|
||||
build/
|
||||
!**/src/main/**/build/
|
||||
!**/src/test/**/build/
|
||||
|
||||
### VS Code ###
|
||||
.vscode/
|
@ -0,0 +1,30 @@
|
||||
package io.dtp.starter.common;
|
||||
|
||||
import java.util.concurrent.*;
|
||||
|
||||
/**
|
||||
* 公共线程池生产者
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/16 22:35
|
||||
*/
|
||||
public class CommonThreadPool {
|
||||
|
||||
public static ThreadPoolExecutor getInstance(String threadPoolId) {
|
||||
TimeUnit unit = TimeUnit.SECONDS;
|
||||
BlockingQueue workQueue = new LinkedBlockingQueue(512);
|
||||
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
|
||||
3,
|
||||
5,
|
||||
10000,
|
||||
unit,
|
||||
workQueue,
|
||||
r -> {
|
||||
Thread thread = new Thread(r);
|
||||
thread.setDaemon(false);
|
||||
thread.setName(threadPoolId);
|
||||
return thread;
|
||||
});
|
||||
return poolExecutor;
|
||||
}
|
||||
}
|
@ -0,0 +1,18 @@
|
||||
package io.dtp.starter.common;
|
||||
|
||||
/**
|
||||
* Constants
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/16 23:01
|
||||
*/
|
||||
public class Constants {
|
||||
|
||||
public static final String DEFAULT_GROUP = "DEFAULT_GROUP";
|
||||
|
||||
public static final String DATA_ID = "dataId";
|
||||
|
||||
public static final String GROUP_ID = "group";
|
||||
|
||||
public static final String DEFAULT_NAMESPACE_ID = "public";
|
||||
}
|
@ -0,0 +1,58 @@
|
||||
package io.dtp.starter.config;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 非 Spring 环境下获取 IOC 容器对象
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/20 18:49
|
||||
*/
|
||||
public class ApplicationContextHolder implements ApplicationContextAware {
|
||||
|
||||
private static ApplicationContext CONTEXT;
|
||||
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||||
ApplicationContextHolder.CONTEXT = applicationContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据类型获取 IOC 容器 Bean
|
||||
*
|
||||
* @param clazz
|
||||
* @param <T>
|
||||
* @return
|
||||
*/
|
||||
public static <T> T getBean(Class<T> clazz) {
|
||||
return CONTEXT.getBean(clazz);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据名称 & 类型获取 IOC 容器 Bean
|
||||
*
|
||||
* @param name
|
||||
* @param clazz
|
||||
* @param <T>
|
||||
* @return
|
||||
*/
|
||||
public static <T> T getBean(String name, Class<T> clazz) {
|
||||
return CONTEXT.getBean(name, clazz);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据类型获取一组 IOC 容器 Bean
|
||||
*
|
||||
* @param clazz
|
||||
* @param <T>
|
||||
* @return
|
||||
*/
|
||||
public static <T> Map<String, T> getBeansOfType(Class<T> clazz) {
|
||||
return CONTEXT.getBeansOfType(clazz);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,25 @@
|
||||
package io.dtp.starter.config;
|
||||
|
||||
import io.dtp.starter.core.ThreadPoolRunListener;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* 公共配置
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/20 18:52
|
||||
*/
|
||||
@Configuration
|
||||
public class CommonConfiguration {
|
||||
|
||||
@Bean
|
||||
public ApplicationContextHolder applicationContextHolder() {
|
||||
return new ApplicationContextHolder();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ThreadPoolRunListener threadPoolRunListener() {
|
||||
return new ThreadPoolRunListener();
|
||||
}
|
||||
}
|
@ -0,0 +1,69 @@
|
||||
package io.dtp.starter.config;
|
||||
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import okhttp3.OkHttpClient;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.SSLSocketFactory;
|
||||
import javax.net.ssl.TrustManager;
|
||||
import javax.net.ssl.X509TrustManager;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* OkHttp3 bean
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/10 13:28
|
||||
*/
|
||||
@Slf4j
|
||||
@Configuration
|
||||
public class OkHttpClientConfig {
|
||||
|
||||
/**
|
||||
* 配置 OkHttpClient Bean
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
@Bean
|
||||
public OkHttpClient okHttpClient() {
|
||||
OkHttpClient.Builder build = new OkHttpClient.Builder();
|
||||
build.connectTimeout(10, TimeUnit.SECONDS)
|
||||
.readTimeout(30, TimeUnit.SECONDS)
|
||||
.build();
|
||||
supportHttps(build);
|
||||
return build.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 支持 Https
|
||||
*
|
||||
* @param builder
|
||||
*/
|
||||
@SneakyThrows
|
||||
private void supportHttps(OkHttpClient.Builder builder) {
|
||||
final TrustManager[] trustAllCerts = new TrustManager[]{new X509TrustManager() {
|
||||
@Override
|
||||
public void checkClientTrusted(java.security.cert.X509Certificate[] chain, String authType) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkServerTrusted(java.security.cert.X509Certificate[] chain, String authType) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public java.security.cert.X509Certificate[] getAcceptedIssuers() {
|
||||
return new java.security.cert.X509Certificate[]{};
|
||||
}
|
||||
}};
|
||||
|
||||
final SSLContext sslContext = SSLContext.getInstance("SSL");
|
||||
sslContext.init(null, trustAllCerts, new java.security.SecureRandom());
|
||||
final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
|
||||
builder.sslSocketFactory(sslSocketFactory, (X509TrustManager) trustAllCerts[0]);
|
||||
builder.hostnameVerifier((hostname, session) -> true);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
package io.dtp.starter.core;
|
||||
|
||||
import io.dtp.starter.wrap.DynamicThreadPoolWrap;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* 线程池全局管理
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/20 15:57
|
||||
*/
|
||||
public class GlobalThreadPoolManage {
|
||||
|
||||
private static final Map<String, DynamicThreadPoolWrap> EXECUTOR_MAP = new ConcurrentHashMap();
|
||||
|
||||
public static DynamicThreadPoolWrap getExecutorService(String name) {
|
||||
return EXECUTOR_MAP.get(name);
|
||||
}
|
||||
|
||||
public static void register(String name, DynamicThreadPoolWrap executor) {
|
||||
EXECUTOR_MAP.put(name, executor);
|
||||
}
|
||||
|
||||
public static void remove(String name) {
|
||||
EXECUTOR_MAP.remove(name);
|
||||
}
|
||||
}
|
@ -0,0 +1,32 @@
|
||||
package io.dtp.starter.core;
|
||||
|
||||
import cn.hutool.core.util.ReflectUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
/**
|
||||
* 可调整大小的阻塞队列
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/20 14:24
|
||||
*/
|
||||
@Slf4j
|
||||
public class ResizableCapacityLinkedBlockIngQueue extends LinkedBlockingQueue {
|
||||
|
||||
public ResizableCapacityLinkedBlockIngQueue(int capacity) {
|
||||
super(capacity);
|
||||
}
|
||||
|
||||
public boolean setCapacity(Integer capacity) {
|
||||
boolean successFlag = true;
|
||||
try {
|
||||
ReflectUtil.setFieldValue(this, "capacity", capacity);
|
||||
} catch (Exception ex) {
|
||||
// ignore
|
||||
log.error("动态修改阻塞队列大小失败.", ex);
|
||||
successFlag = false;
|
||||
}
|
||||
return successFlag;
|
||||
}
|
||||
}
|
@ -0,0 +1,65 @@
|
||||
package io.dtp.starter.core;
|
||||
|
||||
import io.dtp.starter.common.CommonThreadPool;
|
||||
import io.dtp.starter.config.ApplicationContextHolder;
|
||||
import io.dtp.starter.model.PoolParameterInfo;
|
||||
import io.dtp.starter.toolkit.BlockingQueueUtil;
|
||||
import io.dtp.starter.toolkit.HttpClientUtil;
|
||||
import io.dtp.starter.wrap.DynamicThreadPoolWrap;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 线程池启动监听
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/20 16:34
|
||||
*/
|
||||
public class ThreadPoolRunListener implements ApplicationRunner {
|
||||
|
||||
@Autowired
|
||||
private HttpClientUtil httpClientUtil;
|
||||
|
||||
@Override
|
||||
public void run(ApplicationArguments args) throws Exception {
|
||||
Map<String, DynamicThreadPoolWrap> executorMap =
|
||||
ApplicationContextHolder.getBeansOfType(DynamicThreadPoolWrap.class);
|
||||
|
||||
executorMap.forEach((key, val) -> {
|
||||
|
||||
Map<String, Object> queryStrMap = new HashMap(16);
|
||||
queryStrMap.put("tdId", val.getTpId());
|
||||
queryStrMap.put("itemId", val.getItemId());
|
||||
queryStrMap.put("tenant", val.getTenant());
|
||||
|
||||
PoolParameterInfo ppi = httpClientUtil.restApiGet(buildUrl(), queryStrMap, PoolParameterInfo.class);
|
||||
if (ppi != null) {
|
||||
// 使用相关参数创建线程池
|
||||
TimeUnit unit = TimeUnit.SECONDS;
|
||||
BlockingQueue workQueue = BlockingQueueUtil.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity());
|
||||
ThreadPoolExecutor resultTpe = new ThreadPoolExecutor(ppi.getCoreSize(), ppi.getMaxSize(), ppi.getKeepAliveTime(), unit, workQueue);
|
||||
val.setPool(resultTpe);
|
||||
} else if (val.getPool() == null) {
|
||||
val.setPool(CommonThreadPool.getInstance(val.getTpId()));
|
||||
}
|
||||
|
||||
GlobalThreadPoolManage.register(buildOnlyId(val), val);
|
||||
});
|
||||
}
|
||||
|
||||
private String buildUrl() {
|
||||
return "http://127.0.0.1/v1/cs/configs";
|
||||
}
|
||||
|
||||
private String buildOnlyId(DynamicThreadPoolWrap poolWrap) {
|
||||
return poolWrap.getTenant() + "_" + poolWrap.getItemId() + "_" + poolWrap.getTpId();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,73 @@
|
||||
package io.dtp.starter.model;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* 线程池参数
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/16 23:18
|
||||
*/
|
||||
@Data
|
||||
public class PoolParameterInfo implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = -7123935122108553864L;
|
||||
|
||||
/**
|
||||
* 租户 Or 命名空间
|
||||
*/
|
||||
private String tenant;
|
||||
|
||||
/**
|
||||
* 项目 Id
|
||||
*/
|
||||
private String itemId;
|
||||
|
||||
/**
|
||||
* 线程池 Id
|
||||
*/
|
||||
private String tpId;
|
||||
|
||||
/**
|
||||
* 内容
|
||||
*/
|
||||
private String content;
|
||||
|
||||
/**
|
||||
* 核心线程数
|
||||
*/
|
||||
private Integer coreSize;
|
||||
|
||||
/**
|
||||
* 最大线程数
|
||||
*/
|
||||
private Integer maxSize;
|
||||
|
||||
/**
|
||||
* 队列类型
|
||||
*/
|
||||
private Integer queueType;
|
||||
|
||||
/**
|
||||
* 队列长度
|
||||
*/
|
||||
private Integer capacity;
|
||||
|
||||
/**
|
||||
* 线程存活时长
|
||||
*/
|
||||
private Integer keepAliveTime;
|
||||
|
||||
public void setContent(String content) {
|
||||
JSONObject poolParam = JSON.parseObject(content);
|
||||
this.coreSize = poolParam.getInteger("coreSize");
|
||||
this.maxSize = poolParam.getInteger("maxSize");
|
||||
this.capacity = poolParam.getInteger("capacity");
|
||||
this.queueType = poolParam.getInteger("queueType");
|
||||
this.keepAliveTime = poolParam.getInteger("keepAliveTime");
|
||||
}
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
package io.dtp.starter.monitor;
|
||||
|
||||
import io.dtp.starter.core.GlobalThreadPoolManage;
|
||||
import io.dtp.starter.core.ResizableCapacityLinkedBlockIngQueue;
|
||||
import io.dtp.starter.wrap.DynamicThreadPoolWrap;
|
||||
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 线程池动态监听
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/20 15:51
|
||||
*/
|
||||
public class ThreadPoolDynamicMonitor {
|
||||
|
||||
public void dynamicPool(String threadPoolName, Integer coreSize, Integer maxSize, Integer capacity, Long keepAliveTime) {
|
||||
DynamicThreadPoolWrap wrap = GlobalThreadPoolManage.getExecutorService(threadPoolName);
|
||||
ThreadPoolExecutor executor = wrap.getPool();
|
||||
if (coreSize != null) {
|
||||
executor.setCorePoolSize(coreSize);
|
||||
}
|
||||
if (maxSize != null) {
|
||||
executor.setMaximumPoolSize(maxSize);
|
||||
}
|
||||
if (capacity != null) {
|
||||
ResizableCapacityLinkedBlockIngQueue queue = (ResizableCapacityLinkedBlockIngQueue) executor.getQueue();
|
||||
queue.setCapacity(capacity);
|
||||
}
|
||||
if (keepAliveTime != null) {
|
||||
executor.setKeepAliveTime(keepAliveTime, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
package io.dtp.starter.toolkit;
|
||||
|
||||
import io.dtp.starter.core.ResizableCapacityLinkedBlockIngQueue;
|
||||
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
/**
|
||||
* 阻塞队列工具类
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/20 16:50
|
||||
*/
|
||||
public class BlockingQueueUtil {
|
||||
|
||||
public static BlockingQueue createBlockingQueue(Integer type, Integer capacity) {
|
||||
BlockingQueue blockingQueue = null;
|
||||
switch (type) {
|
||||
case 1:
|
||||
blockingQueue = new ArrayBlockingQueue(capacity);
|
||||
break;
|
||||
case 2:
|
||||
blockingQueue = new LinkedBlockingQueue(capacity);
|
||||
break;
|
||||
case 3:
|
||||
blockingQueue = new ResizableCapacityLinkedBlockIngQueue(capacity);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return blockingQueue;
|
||||
}
|
||||
}
|
@ -0,0 +1,173 @@
|
||||
package io.dtp.starter.toolkit;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import okhttp3.*;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Http 客户端工具类
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/10 13:30
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class HttpClientUtil {
|
||||
|
||||
@Autowired
|
||||
private OkHttpClient okHttpClient;
|
||||
|
||||
private MediaType jsonMediaType = MediaType.parse("application/json; charset=utf-8");
|
||||
|
||||
private static int HTTP_OK_CODE = 200;
|
||||
|
||||
/**
|
||||
* Get 请求
|
||||
*
|
||||
* @param url
|
||||
* @return
|
||||
*/
|
||||
@SneakyThrows
|
||||
public String get(String url) {
|
||||
try {
|
||||
return new String(doGet(url), "utf-8");
|
||||
} catch (Exception e) {
|
||||
log.error("httpGet 调用失败. {}", url, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get 请求, 支持添加查询字符串
|
||||
*
|
||||
* @param url
|
||||
* @param queryString 查询字符串
|
||||
* @return
|
||||
*/
|
||||
public String get(String url, Map<String, Object> queryString) {
|
||||
String fullUrl = buildUrl(url, queryString);
|
||||
return get(fullUrl);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取 Json 后直接反序列化
|
||||
*
|
||||
* @param url
|
||||
* @param clazz
|
||||
* @return
|
||||
*/
|
||||
public <T> T restApiGet(String url, Class<T> clazz) {
|
||||
String resp = get(url);
|
||||
return JSON.parseObject(resp, clazz);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get 请求, 支持查询字符串
|
||||
*
|
||||
* @param url
|
||||
* @param queryString
|
||||
* @param clazz
|
||||
* @param <T>
|
||||
* @return
|
||||
*/
|
||||
public <T> T restApiGet(String url, Map<String, Object> queryString, Class<T> clazz) {
|
||||
String fullUrl = buildUrl(url, queryString);
|
||||
String resp = get(fullUrl);
|
||||
return JSON.parseObject(resp, clazz);
|
||||
}
|
||||
|
||||
/**
|
||||
* Rest 接口 Post 调用
|
||||
*
|
||||
* @param url
|
||||
* @param body
|
||||
* @return
|
||||
*/
|
||||
public String restApiPost(String url, Object body) {
|
||||
try {
|
||||
return doPost(url, body);
|
||||
} catch (Exception e) {
|
||||
log.error("httpPost 调用失败. {}", url, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Rest 接口 Post 调用
|
||||
* 对返回值直接反序列化
|
||||
*
|
||||
* @param url
|
||||
* @param body
|
||||
* @return
|
||||
*/
|
||||
public <T> T restApiPost(String url, Object body, Class<T> clazz) {
|
||||
String resp = restApiPost(url, body);
|
||||
return JSON.parseObject(resp, clazz);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据查询字符串构造完整的 Url
|
||||
*
|
||||
* @param url
|
||||
* @param queryString
|
||||
* @return
|
||||
*/
|
||||
public String buildUrl(String url, Map<String, Object> queryString) {
|
||||
if (null == queryString) {
|
||||
return url;
|
||||
}
|
||||
|
||||
StringBuilder builder = new StringBuilder(url);
|
||||
boolean isFirst = true;
|
||||
|
||||
for (Map.Entry<String, Object> entry : queryString.entrySet()) {
|
||||
String key = entry.getKey();
|
||||
if (key != null && entry.getValue() != null) {
|
||||
if (isFirst) {
|
||||
isFirst = false;
|
||||
builder.append("?");
|
||||
} else {
|
||||
builder.append("&");
|
||||
}
|
||||
builder.append(key)
|
||||
.append("=")
|
||||
.append(queryString.get(key));
|
||||
}
|
||||
}
|
||||
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
private String doPost(String url, Object body) {
|
||||
String jsonBody = JSON.toJSONString(body);
|
||||
RequestBody requestBody = RequestBody.create(jsonMediaType, jsonBody);
|
||||
Request request = new Request.Builder()
|
||||
.url(url)
|
||||
.post(requestBody)
|
||||
.build();
|
||||
Response resp = okHttpClient.newCall(request).execute();
|
||||
if (resp.code() != HTTP_OK_CODE) {
|
||||
String msg = String.format("HttpPost 响应 code 异常. [code] %s [url] %s [body] %s", resp.code(), url, jsonBody);
|
||||
throw new RuntimeException(msg);
|
||||
}
|
||||
return resp.body().string();
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
private byte[] doGet(String url) {
|
||||
Request request = new Request.Builder().get().url(url).build();
|
||||
Response resp = okHttpClient.newCall(request).execute();
|
||||
if (resp.code() != HTTP_OK_CODE) {
|
||||
String msg = String.format("HttpGet 响应 code 异常. [code] %s [url] %s", resp.code(), url);
|
||||
throw new RuntimeException(msg);
|
||||
}
|
||||
return resp.body().bytes();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,77 @@
|
||||
package io.dtp.starter.wrap;
|
||||
|
||||
import io.dtp.starter.common.CommonThreadPool;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
/**
|
||||
* 线程池包装
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/20 16:55
|
||||
*/
|
||||
@Data
|
||||
public class DynamicThreadPoolWrap {
|
||||
|
||||
private String tenant;
|
||||
|
||||
private String itemId;
|
||||
|
||||
private String tpId;
|
||||
|
||||
private ThreadPoolExecutor pool;
|
||||
|
||||
/**
|
||||
* 首选服务端线程池, 为空使用默认线程池 {@link CommonThreadPool#getInstance(String)}
|
||||
*
|
||||
* @param threadPoolId
|
||||
*/
|
||||
public DynamicThreadPoolWrap(String threadPoolId) {
|
||||
this.tpId = threadPoolId;
|
||||
}
|
||||
|
||||
/**
|
||||
* 首选服务端线程池, 为空使用 threadPoolExecutor
|
||||
*
|
||||
* @param threadPoolId
|
||||
* @param threadPoolExecutor
|
||||
*/
|
||||
public DynamicThreadPoolWrap(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
|
||||
this.tpId = threadPoolId;
|
||||
this.pool = threadPoolExecutor;
|
||||
}
|
||||
|
||||
/**
|
||||
* 提交任务
|
||||
*
|
||||
* @param command
|
||||
*/
|
||||
public void execute(Runnable command) {
|
||||
pool.execute(command);
|
||||
}
|
||||
|
||||
/**
|
||||
* 提交任务
|
||||
*
|
||||
* @param task
|
||||
* @return
|
||||
*/
|
||||
public Future<?> submit(Runnable task) {
|
||||
return pool.submit(task);
|
||||
}
|
||||
|
||||
/**
|
||||
* 提交任务
|
||||
*
|
||||
* @param task
|
||||
* @param <T>
|
||||
* @return
|
||||
*/
|
||||
public <T> Future<T> submit(Callable<T> task) {
|
||||
return pool.submit(task);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,2 @@
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=io.dtp.starter.config.CommonConfiguration, \
|
||||
io.dtp.starter.config.OkHttpClientConfig
|
@ -0,0 +1,72 @@
|
||||
package io.dtp.starter.test;
|
||||
|
||||
import io.dtp.starter.core.ResizableCapacityLinkedBlockIngQueue;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 可调整大小的阻塞队列单元测试
|
||||
*
|
||||
* @author chen.ma
|
||||
* @date 2021/6/20 14:45
|
||||
*/
|
||||
@Slf4j
|
||||
public class ResizableCapacityLinkedBlockIngQueueTest {
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
ResizableCapacityLinkedBlockIngQueue blockIngQueue = new ResizableCapacityLinkedBlockIngQueue(5);
|
||||
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
|
||||
3,
|
||||
1024,
|
||||
TimeUnit.SECONDS,
|
||||
blockIngQueue);
|
||||
|
||||
/*Runnable runnable = new Runnable() {
|
||||
@SneakyThrows
|
||||
public void run() {
|
||||
Thread.sleep(10000);
|
||||
}
|
||||
};
|
||||
for (int i = 0; i <; i++) {
|
||||
threadPoolExecutor.execute(runnable);
|
||||
}*/
|
||||
|
||||
print(threadPoolExecutor);
|
||||
|
||||
Thread.sleep(1000);
|
||||
blockIngQueue.setCapacity(1000);
|
||||
print(threadPoolExecutor);
|
||||
|
||||
}
|
||||
|
||||
private static void print(ThreadPoolExecutor executor) {
|
||||
LinkedBlockingQueue queue = (LinkedBlockingQueue) executor.getQueue();
|
||||
|
||||
log.info("核心线程数 :: {}," +
|
||||
" 活动线程数 :: {}," +
|
||||
" 最大线程数 :: {}," +
|
||||
" 线程池活跃度 :: {}," +
|
||||
" 任务完成数 :: {}," +
|
||||
" 队列大小 :: {}," +
|
||||
" 当前排队线程数 :: {}," +
|
||||
" 队列剩余大小 :: {}," +
|
||||
" 队列使用度 :: {}",
|
||||
executor.getCorePoolSize(),
|
||||
executor.getActiveCount(),
|
||||
executor.getMaximumPoolSize(),
|
||||
divide(executor.getActiveCount(), executor.getMaximumPoolSize()),
|
||||
executor.getCompletedTaskCount(),
|
||||
(queue.size() + queue.remainingCapacity()),
|
||||
queue.size(),
|
||||
queue.remainingCapacity(),
|
||||
divide(queue.size(), queue.size() + queue.remainingCapacity()));
|
||||
|
||||
}
|
||||
|
||||
private static String divide(int num1, int num2) {
|
||||
return String.format("%1.2f%%", Double.parseDouble(num1 + "") / Double.parseDouble(num2 + "") * 100);
|
||||
}
|
||||
}
|
Loading…
Reference in new issue