diff --git a/ruyi-spring-boot-starter/.gitignore b/dtp-example/.gitignore similarity index 100% rename from ruyi-spring-boot-starter/.gitignore rename to dtp-example/.gitignore diff --git a/dtp-example/pom.xml b/dtp-example/pom.xml new file mode 100644 index 00000000..9fe4a08b --- /dev/null +++ b/dtp-example/pom.xml @@ -0,0 +1,41 @@ + + + 4.0.0 + + + io.dynamic-thread-pool + dtp-parent + ${revision} + + + dtp-example + 0.0.1-SNAPSHOT + + dtp-example + Demo project for Spring Boot + + + 1.8 + + + + + org.springframework.boot + spring-boot-starter + + + + org.springframework.boot + spring-boot-starter-test + test + + + + io.dynamic-thread-pool + dtp-spring-boot-starter + ${revision} + + + + diff --git a/dtp-example/src/main/java/io/dtp/example/ExampleApplication.java b/dtp-example/src/main/java/io/dtp/example/ExampleApplication.java new file mode 100644 index 00000000..a70a154d --- /dev/null +++ b/dtp-example/src/main/java/io/dtp/example/ExampleApplication.java @@ -0,0 +1,13 @@ +package io.dtp.example; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class ExampleApplication { + + public static void main(String[] args) { + SpringApplication.run(ExampleApplication.class, args); + } + +} diff --git a/dtp-example/src/main/java/io/dtp/example/config/ThreadPoolConfig.java b/dtp-example/src/main/java/io/dtp/example/config/ThreadPoolConfig.java new file mode 100644 index 00000000..98a82420 --- /dev/null +++ b/dtp-example/src/main/java/io/dtp/example/config/ThreadPoolConfig.java @@ -0,0 +1,21 @@ +package io.dtp.example.config; + +import io.dtp.starter.wrap.DynamicThreadPoolWrap; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * 线程池配置 + * + * @author chen.ma + * @date 2021/6/20 17:16 + */ +@Configuration +public class ThreadPoolConfig { + + @Bean + public DynamicThreadPoolWrap messageCenterConsumeThreadPool() { + return new DynamicThreadPoolWrap("message-consume"); + } + +} diff --git a/server/src/main/resources/application.properties b/dtp-example/src/main/resources/application.properties similarity index 100% rename from server/src/main/resources/application.properties rename to dtp-example/src/main/resources/application.properties diff --git a/dtp-example/src/test/java/io/dtp/example/ExampleApplicationTests.java b/dtp-example/src/test/java/io/dtp/example/ExampleApplicationTests.java new file mode 100644 index 00000000..bc5f3bca --- /dev/null +++ b/dtp-example/src/test/java/io/dtp/example/ExampleApplicationTests.java @@ -0,0 +1,13 @@ +package io.dtp.example; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class ExampleApplicationTests { + + @Test + void contextLoads() { + } + +} diff --git a/server/.gitignore b/dtp-server/.gitignore similarity index 100% rename from server/.gitignore rename to dtp-server/.gitignore diff --git a/server/pom.xml b/dtp-server/pom.xml similarity index 69% rename from server/pom.xml rename to dtp-server/pom.xml index 68386670..2f15c6f4 100644 --- a/server/pom.xml +++ b/dtp-server/pom.xml @@ -4,12 +4,12 @@ 4.0.0 - io.ruyi - ruyi-parent + io.dynamic-thread-pool + dtp-parent ${revision} - server + dtp-server jar ${project.artifactId} @@ -25,6 +25,17 @@ org.springframework.boot spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-jdbc + + + + mysql + mysql-connector-java + runtime + diff --git a/server/src/main/java/io/ruyi/server/ServerApplication.java b/dtp-server/src/main/java/io/dtp/server/ServerApplication.java similarity index 92% rename from server/src/main/java/io/ruyi/server/ServerApplication.java rename to dtp-server/src/main/java/io/dtp/server/ServerApplication.java index 33a1d1f2..5e5f5076 100644 --- a/server/src/main/java/io/ruyi/server/ServerApplication.java +++ b/dtp-server/src/main/java/io/dtp/server/ServerApplication.java @@ -1,4 +1,4 @@ -package io.ruyi.server; +package io.dtp.server; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; diff --git a/dtp-server/src/main/java/io/dtp/server/constant/Constants.java b/dtp-server/src/main/java/io/dtp/server/constant/Constants.java new file mode 100644 index 00000000..7578a86a --- /dev/null +++ b/dtp-server/src/main/java/io/dtp/server/constant/Constants.java @@ -0,0 +1,18 @@ +package io.dtp.server.constant; + +/** + * 服务端常量类 + * + * @author chen.ma + * @date 2021/6/20 13:54 + */ +public class Constants { + + public static final String BASE_PATH = "/v1/cs"; + + public static final String CONFIG_CONTROLLER_PATH = BASE_PATH + "/configs"; + + public static final String LISTENING_CONFIGS = "Listening-Configs"; + + public static final String ENCODE = "UTF-8"; +} diff --git a/dtp-server/src/main/java/io/dtp/server/controller/ConfigController.java b/dtp-server/src/main/java/io/dtp/server/controller/ConfigController.java new file mode 100644 index 00000000..9d0b9575 --- /dev/null +++ b/dtp-server/src/main/java/io/dtp/server/controller/ConfigController.java @@ -0,0 +1,50 @@ +package io.dtp.server.controller; + +import io.dtp.server.constant.Constants; +import io.dtp.server.model.ConfigInfoBase; +import io.dtp.server.service.ConfigService; +import lombok.SneakyThrows; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.util.StringUtils; +import org.springframework.web.bind.annotation.*; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.net.URLDecoder; + +/** + * 服务端配置控制器 + * + * @author chen.ma + * @date 2021/6/20 13:53 + */ +@RestController +@RequestMapping(Constants.CONFIG_CONTROLLER_PATH) +public class ConfigController { + + @Autowired + private ConfigService configService; + + @GetMapping + public ConfigInfoBase detailConfigInfo( + @RequestParam("tdId") String tdId, + @RequestParam("itemId") String itemId, + @RequestParam(value = "tenant", required = false, defaultValue = "") String tenant) { + + return configService.findConfigAllInfo(tdId, itemId, tenant); + } + + @SneakyThrows + @PostMapping("/listener") + public void listener(HttpServletRequest request, HttpServletResponse response) { + request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true); + + String probeModify = request.getParameter(Constants.LISTENING_CONFIGS); + if (StringUtils.isEmpty(probeModify)) { + throw new IllegalArgumentException("invalid probeModify"); + } + + probeModify = URLDecoder.decode(probeModify, Constants.ENCODE); + } + +} diff --git a/dtp-server/src/main/java/io/dtp/server/mapper/RowMapperManager.java b/dtp-server/src/main/java/io/dtp/server/mapper/RowMapperManager.java new file mode 100644 index 00000000..89f2d49a --- /dev/null +++ b/dtp-server/src/main/java/io/dtp/server/mapper/RowMapperManager.java @@ -0,0 +1,36 @@ +package io.dtp.server.mapper; + +import io.dtp.server.model.ConfigAllInfo; +import org.springframework.jdbc.core.RowMapper; + +import java.sql.ResultSet; +import java.sql.SQLException; + +/** + * ConfigAllInfoRowMapper + * + * @author chen.ma + * @date 2021/6/20 15:57 + */ + +public final class RowMapperManager { + + public static final ConfigAllInfoRowMapper CONFIG_ALL_INFO_ROW_MAPPER = new ConfigAllInfoRowMapper(); + + public static class ConfigAllInfoRowMapper implements RowMapper { + + public ConfigAllInfo mapRow(ResultSet rs, int rowNum) throws SQLException { + ConfigAllInfo configAllInfo = new ConfigAllInfo(); + configAllInfo.setDataId(rs.getString("data_id")); + configAllInfo.setGroupId(rs.getString("group_id")); + configAllInfo.setTenant(rs.getString("tenant_id")); + configAllInfo.setContent(rs.getString("content")); + configAllInfo.setMd5(rs.getString("md5")); + configAllInfo.setCreateTime(rs.getTimestamp("gmt_modified").getTime()); + configAllInfo.setModifyTime(rs.getTimestamp("gmt_modified").getTime()); + return configAllInfo; + } + } + +} + diff --git a/dtp-server/src/main/java/io/dtp/server/model/ConfigAllInfo.java b/dtp-server/src/main/java/io/dtp/server/model/ConfigAllInfo.java new file mode 100644 index 00000000..3f658163 --- /dev/null +++ b/dtp-server/src/main/java/io/dtp/server/model/ConfigAllInfo.java @@ -0,0 +1,23 @@ +package io.dtp.server.model; + +import lombok.Data; + +/** + * 配置全部信息 + * + * @author chen.ma + * @date 2021/6/20 15:14 + */ +@Data +public class ConfigAllInfo extends ConfigInfo { + + private static final long serialVersionUID = -2417394244017463665L; + + private String createUser; + + private String desc; + + private Long createTime; + + private Long modifyTime; +} diff --git a/dtp-server/src/main/java/io/dtp/server/model/ConfigInfo.java b/dtp-server/src/main/java/io/dtp/server/model/ConfigInfo.java new file mode 100644 index 00000000..68d72de9 --- /dev/null +++ b/dtp-server/src/main/java/io/dtp/server/model/ConfigInfo.java @@ -0,0 +1,17 @@ +package io.dtp.server.model; + +import lombok.Data; + +/** + * 配置信息 + * + * @author chen.ma + * @date 2021/6/20 15:59 + */ +@Data +public class ConfigInfo extends ConfigInfoBase { + + private static final long serialVersionUID = -3504960832191834675L; + + private String tenant; +} diff --git a/dtp-server/src/main/java/io/dtp/server/model/ConfigInfoBase.java b/dtp-server/src/main/java/io/dtp/server/model/ConfigInfoBase.java new file mode 100644 index 00000000..63f65484 --- /dev/null +++ b/dtp-server/src/main/java/io/dtp/server/model/ConfigInfoBase.java @@ -0,0 +1,37 @@ +package io.dtp.server.model; + +import lombok.Data; + +import java.io.Serializable; + +/** + * 基础配置信息 + * + * @author chen.ma + * @date 2021/6/20 14:05 + */ +@Data +public class ConfigInfoBase implements Serializable { + + private static final long serialVersionUID = -1892597426099265730L; + + /** + * DataId + */ + private String dataId; + + /** + * GroupId + */ + private String groupId; + + /** + * 内容 + */ + private String content; + + /** + * MD5 + */ + private String md5; +} diff --git a/dtp-server/src/main/java/io/dtp/server/service/ConfigService.java b/dtp-server/src/main/java/io/dtp/server/service/ConfigService.java new file mode 100644 index 00000000..1a39439b --- /dev/null +++ b/dtp-server/src/main/java/io/dtp/server/service/ConfigService.java @@ -0,0 +1,22 @@ +package io.dtp.server.service; + +import io.dtp.server.model.ConfigAllInfo; + +/** + * 服务端配置接口 + * + * @author chen.ma + * @date 2021/6/20 15:18 + */ +public interface ConfigService { + + /** + * 查询配置全部信息 + * + * @param tpId tpId + * @param itemId itemId + * @param tenant tenant + * @return 全部配置信息 + */ + ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String tenant); +} diff --git a/dtp-server/src/main/java/io/dtp/server/service/impl/ConfigServiceImpl.java b/dtp-server/src/main/java/io/dtp/server/service/impl/ConfigServiceImpl.java new file mode 100644 index 00000000..2dfabc7d --- /dev/null +++ b/dtp-server/src/main/java/io/dtp/server/service/impl/ConfigServiceImpl.java @@ -0,0 +1,30 @@ +package io.dtp.server.service.impl; + +import io.dtp.server.mapper.RowMapperManager; +import io.dtp.server.model.ConfigAllInfo; +import io.dtp.server.service.ConfigService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Service; + +/** + * 服务端配置接口实现 + * + * @author chen.ma + * @date 2021/6/20 15:21 + */ +@Service +public class ConfigServiceImpl implements ConfigService { + + @Autowired + private JdbcTemplate jdbcTemplate; + + public ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String tenant) { + ConfigAllInfo configAllInfo = jdbcTemplate.queryForObject( + "select * from config_info where tp_id = ? and item_id = ? and tenant_id = ?", + new Object[]{tpId, itemId, tenant}, + RowMapperManager.CONFIG_ALL_INFO_ROW_MAPPER); + + return configAllInfo; + } +} diff --git a/dtp-server/src/main/resources/application.yaml b/dtp-server/src/main/resources/application.yaml new file mode 100644 index 00000000..6cca881d --- /dev/null +++ b/dtp-server/src/main/resources/application.yaml @@ -0,0 +1,9 @@ +server: + port: 6691 + +spring: + datasource: + driver-class-name: com.mysql.jdbc.Driver + url: jdbc:mysql://localhost:3306/dynamic-thread-pool + username: root + password: root \ No newline at end of file diff --git a/dtp-spring-boot-starter/.gitignore b/dtp-spring-boot-starter/.gitignore new file mode 100644 index 00000000..549e00a2 --- /dev/null +++ b/dtp-spring-boot-starter/.gitignore @@ -0,0 +1,33 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/ruyi-spring-boot-starter/pom.xml b/dtp-spring-boot-starter/pom.xml similarity index 50% rename from ruyi-spring-boot-starter/pom.xml rename to dtp-spring-boot-starter/pom.xml index 32bd5465..04748ed2 100644 --- a/ruyi-spring-boot-starter/pom.xml +++ b/dtp-spring-boot-starter/pom.xml @@ -4,21 +4,41 @@ 4.0.0 - io.ruyi - ruyi-parent + io.dynamic-thread-pool + dtp-parent ${revision} - ruyi-spring-boot-starter + dtp-spring-boot-starter jar ${project.artifactId} ${project.artifactId} + + 1.8 + + org.springframework.boot spring-boot-starter + + + cn.hutool + hutool-core + + + + com.squareup.okhttp3 + logging-interceptor + true + + + + com.alibaba + fastjson + diff --git a/dtp-spring-boot-starter/src/main/java/io/dtp/starter/common/CommonThreadPool.java b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/common/CommonThreadPool.java new file mode 100644 index 00000000..babba9fd --- /dev/null +++ b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/common/CommonThreadPool.java @@ -0,0 +1,30 @@ +package io.dtp.starter.common; + +import java.util.concurrent.*; + +/** + * 公共线程池生产者 + * + * @author chen.ma + * @date 2021/6/16 22:35 + */ +public class CommonThreadPool { + + public static ThreadPoolExecutor getInstance(String threadPoolId) { + TimeUnit unit = TimeUnit.SECONDS; + BlockingQueue workQueue = new LinkedBlockingQueue(512); + ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor( + 3, + 5, + 10000, + unit, + workQueue, + r -> { + Thread thread = new Thread(r); + thread.setDaemon(false); + thread.setName(threadPoolId); + return thread; + }); + return poolExecutor; + } +} diff --git a/dtp-spring-boot-starter/src/main/java/io/dtp/starter/common/Constants.java b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/common/Constants.java new file mode 100644 index 00000000..ac21325d --- /dev/null +++ b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/common/Constants.java @@ -0,0 +1,18 @@ +package io.dtp.starter.common; + +/** + * Constants + * + * @author chen.ma + * @date 2021/6/16 23:01 + */ +public class Constants { + + public static final String DEFAULT_GROUP = "DEFAULT_GROUP"; + + public static final String DATA_ID = "dataId"; + + public static final String GROUP_ID = "group"; + + public static final String DEFAULT_NAMESPACE_ID = "public"; +} diff --git a/dtp-spring-boot-starter/src/main/java/io/dtp/starter/config/ApplicationContextHolder.java b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/config/ApplicationContextHolder.java new file mode 100644 index 00000000..30efb925 --- /dev/null +++ b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/config/ApplicationContextHolder.java @@ -0,0 +1,58 @@ +package io.dtp.starter.config; + +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; + +import java.util.Map; + +/** + * 非 Spring 环境下获取 IOC 容器对象 + * + * @author chen.ma + * @date 2021/6/20 18:49 + */ +public class ApplicationContextHolder implements ApplicationContextAware { + + private static ApplicationContext CONTEXT; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + ApplicationContextHolder.CONTEXT = applicationContext; + } + + /** + * 根据类型获取 IOC 容器 Bean + * + * @param clazz + * @param + * @return + */ + public static T getBean(Class clazz) { + return CONTEXT.getBean(clazz); + } + + /** + * 根据名称 & 类型获取 IOC 容器 Bean + * + * @param name + * @param clazz + * @param + * @return + */ + public static T getBean(String name, Class clazz) { + return CONTEXT.getBean(name, clazz); + } + + /** + * 根据类型获取一组 IOC 容器 Bean + * + * @param clazz + * @param + * @return + */ + public static Map getBeansOfType(Class clazz) { + return CONTEXT.getBeansOfType(clazz); + } + +} diff --git a/dtp-spring-boot-starter/src/main/java/io/dtp/starter/config/CommonConfiguration.java b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/config/CommonConfiguration.java new file mode 100644 index 00000000..8fa7955f --- /dev/null +++ b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/config/CommonConfiguration.java @@ -0,0 +1,25 @@ +package io.dtp.starter.config; + +import io.dtp.starter.core.ThreadPoolRunListener; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * 公共配置 + * + * @author chen.ma + * @date 2021/6/20 18:52 + */ +@Configuration +public class CommonConfiguration { + + @Bean + public ApplicationContextHolder applicationContextHolder() { + return new ApplicationContextHolder(); + } + + @Bean + public ThreadPoolRunListener threadPoolRunListener() { + return new ThreadPoolRunListener(); + } +} diff --git a/dtp-spring-boot-starter/src/main/java/io/dtp/starter/config/OkHttpClientConfig.java b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/config/OkHttpClientConfig.java new file mode 100644 index 00000000..88f58587 --- /dev/null +++ b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/config/OkHttpClientConfig.java @@ -0,0 +1,69 @@ +package io.dtp.starter.config; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import okhttp3.OkHttpClient; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; +import java.util.concurrent.TimeUnit; + +/** + * OkHttp3 bean + * + * @author chen.ma + * @date 2021/6/10 13:28 + */ +@Slf4j +@Configuration +public class OkHttpClientConfig { + + /** + * 配置 OkHttpClient Bean + * + * @return + */ + @Bean + public OkHttpClient okHttpClient() { + OkHttpClient.Builder build = new OkHttpClient.Builder(); + build.connectTimeout(10, TimeUnit.SECONDS) + .readTimeout(30, TimeUnit.SECONDS) + .build(); + supportHttps(build); + return build.build(); + } + + /** + * 支持 Https + * + * @param builder + */ + @SneakyThrows + private void supportHttps(OkHttpClient.Builder builder) { + final TrustManager[] trustAllCerts = new TrustManager[]{new X509TrustManager() { + @Override + public void checkClientTrusted(java.security.cert.X509Certificate[] chain, String authType) { + } + + @Override + public void checkServerTrusted(java.security.cert.X509Certificate[] chain, String authType) { + } + + @Override + public java.security.cert.X509Certificate[] getAcceptedIssuers() { + return new java.security.cert.X509Certificate[]{}; + } + }}; + + final SSLContext sslContext = SSLContext.getInstance("SSL"); + sslContext.init(null, trustAllCerts, new java.security.SecureRandom()); + final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory(); + builder.sslSocketFactory(sslSocketFactory, (X509TrustManager) trustAllCerts[0]); + builder.hostnameVerifier((hostname, session) -> true); + } + +} diff --git a/dtp-spring-boot-starter/src/main/java/io/dtp/starter/core/GlobalThreadPoolManage.java b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/core/GlobalThreadPoolManage.java new file mode 100644 index 00000000..0c3c634d --- /dev/null +++ b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/core/GlobalThreadPoolManage.java @@ -0,0 +1,29 @@ +package io.dtp.starter.core; + +import io.dtp.starter.wrap.DynamicThreadPoolWrap; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 线程池全局管理 + * + * @author chen.ma + * @date 2021/6/20 15:57 + */ +public class GlobalThreadPoolManage { + + private static final Map EXECUTOR_MAP = new ConcurrentHashMap(); + + public static DynamicThreadPoolWrap getExecutorService(String name) { + return EXECUTOR_MAP.get(name); + } + + public static void register(String name, DynamicThreadPoolWrap executor) { + EXECUTOR_MAP.put(name, executor); + } + + public static void remove(String name) { + EXECUTOR_MAP.remove(name); + } +} \ No newline at end of file diff --git a/dtp-spring-boot-starter/src/main/java/io/dtp/starter/core/ResizableCapacityLinkedBlockIngQueue.java b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/core/ResizableCapacityLinkedBlockIngQueue.java new file mode 100644 index 00000000..99811097 --- /dev/null +++ b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/core/ResizableCapacityLinkedBlockIngQueue.java @@ -0,0 +1,32 @@ +package io.dtp.starter.core; + +import cn.hutool.core.util.ReflectUtil; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.LinkedBlockingQueue; + +/** + * 可调整大小的阻塞队列 + * + * @author chen.ma + * @date 2021/6/20 14:24 + */ +@Slf4j +public class ResizableCapacityLinkedBlockIngQueue extends LinkedBlockingQueue { + + public ResizableCapacityLinkedBlockIngQueue(int capacity) { + super(capacity); + } + + public boolean setCapacity(Integer capacity) { + boolean successFlag = true; + try { + ReflectUtil.setFieldValue(this, "capacity", capacity); + } catch (Exception ex) { + // ignore + log.error("动态修改阻塞队列大小失败.", ex); + successFlag = false; + } + return successFlag; + } +} diff --git a/dtp-spring-boot-starter/src/main/java/io/dtp/starter/core/ThreadPoolRunListener.java b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/core/ThreadPoolRunListener.java new file mode 100644 index 00000000..ae75ae29 --- /dev/null +++ b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/core/ThreadPoolRunListener.java @@ -0,0 +1,65 @@ +package io.dtp.starter.core; + +import io.dtp.starter.common.CommonThreadPool; +import io.dtp.starter.config.ApplicationContextHolder; +import io.dtp.starter.model.PoolParameterInfo; +import io.dtp.starter.toolkit.BlockingQueueUtil; +import io.dtp.starter.toolkit.HttpClientUtil; +import io.dtp.starter.wrap.DynamicThreadPoolWrap; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * 线程池启动监听 + * + * @author chen.ma + * @date 2021/6/20 16:34 + */ +public class ThreadPoolRunListener implements ApplicationRunner { + + @Autowired + private HttpClientUtil httpClientUtil; + + @Override + public void run(ApplicationArguments args) throws Exception { + Map executorMap = + ApplicationContextHolder.getBeansOfType(DynamicThreadPoolWrap.class); + + executorMap.forEach((key, val) -> { + + Map queryStrMap = new HashMap(16); + queryStrMap.put("tdId", val.getTpId()); + queryStrMap.put("itemId", val.getItemId()); + queryStrMap.put("tenant", val.getTenant()); + + PoolParameterInfo ppi = httpClientUtil.restApiGet(buildUrl(), queryStrMap, PoolParameterInfo.class); + if (ppi != null) { + // 使用相关参数创建线程池 + TimeUnit unit = TimeUnit.SECONDS; + BlockingQueue workQueue = BlockingQueueUtil.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity()); + ThreadPoolExecutor resultTpe = new ThreadPoolExecutor(ppi.getCoreSize(), ppi.getMaxSize(), ppi.getKeepAliveTime(), unit, workQueue); + val.setPool(resultTpe); + } else if (val.getPool() == null) { + val.setPool(CommonThreadPool.getInstance(val.getTpId())); + } + + GlobalThreadPoolManage.register(buildOnlyId(val), val); + }); + } + + private String buildUrl() { + return "http://127.0.0.1/v1/cs/configs"; + } + + private String buildOnlyId(DynamicThreadPoolWrap poolWrap) { + return poolWrap.getTenant() + "_" + poolWrap.getItemId() + "_" + poolWrap.getTpId(); + } + +} diff --git a/dtp-spring-boot-starter/src/main/java/io/dtp/starter/model/PoolParameterInfo.java b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/model/PoolParameterInfo.java new file mode 100644 index 00000000..9f6107e1 --- /dev/null +++ b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/model/PoolParameterInfo.java @@ -0,0 +1,73 @@ +package io.dtp.starter.model; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import lombok.Data; + +import java.io.Serializable; + +/** + * 线程池参数 + * + * @author chen.ma + * @date 2021/6/16 23:18 + */ +@Data +public class PoolParameterInfo implements Serializable { + + private static final long serialVersionUID = -7123935122108553864L; + + /** + * 租户 Or 命名空间 + */ + private String tenant; + + /** + * 项目 Id + */ + private String itemId; + + /** + * 线程池 Id + */ + private String tpId; + + /** + * 内容 + */ + private String content; + + /** + * 核心线程数 + */ + private Integer coreSize; + + /** + * 最大线程数 + */ + private Integer maxSize; + + /** + * 队列类型 + */ + private Integer queueType; + + /** + * 队列长度 + */ + private Integer capacity; + + /** + * 线程存活时长 + */ + private Integer keepAliveTime; + + public void setContent(String content) { + JSONObject poolParam = JSON.parseObject(content); + this.coreSize = poolParam.getInteger("coreSize"); + this.maxSize = poolParam.getInteger("maxSize"); + this.capacity = poolParam.getInteger("capacity"); + this.queueType = poolParam.getInteger("queueType"); + this.keepAliveTime = poolParam.getInteger("keepAliveTime"); + } +} diff --git a/dtp-spring-boot-starter/src/main/java/io/dtp/starter/monitor/ThreadPoolDynamicMonitor.java b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/monitor/ThreadPoolDynamicMonitor.java new file mode 100644 index 00000000..687f9dd6 --- /dev/null +++ b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/monitor/ThreadPoolDynamicMonitor.java @@ -0,0 +1,35 @@ +package io.dtp.starter.monitor; + +import io.dtp.starter.core.GlobalThreadPoolManage; +import io.dtp.starter.core.ResizableCapacityLinkedBlockIngQueue; +import io.dtp.starter.wrap.DynamicThreadPoolWrap; + +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * 线程池动态监听 + * + * @author chen.ma + * @date 2021/6/20 15:51 + */ +public class ThreadPoolDynamicMonitor { + + public void dynamicPool(String threadPoolName, Integer coreSize, Integer maxSize, Integer capacity, Long keepAliveTime) { + DynamicThreadPoolWrap wrap = GlobalThreadPoolManage.getExecutorService(threadPoolName); + ThreadPoolExecutor executor = wrap.getPool(); + if (coreSize != null) { + executor.setCorePoolSize(coreSize); + } + if (maxSize != null) { + executor.setMaximumPoolSize(maxSize); + } + if (capacity != null) { + ResizableCapacityLinkedBlockIngQueue queue = (ResizableCapacityLinkedBlockIngQueue) executor.getQueue(); + queue.setCapacity(capacity); + } + if (keepAliveTime != null) { + executor.setKeepAliveTime(keepAliveTime, TimeUnit.SECONDS); + } + } +} diff --git a/dtp-spring-boot-starter/src/main/java/io/dtp/starter/toolkit/BlockingQueueUtil.java b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/toolkit/BlockingQueueUtil.java new file mode 100644 index 00000000..54aa6d94 --- /dev/null +++ b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/toolkit/BlockingQueueUtil.java @@ -0,0 +1,33 @@ +package io.dtp.starter.toolkit; + +import io.dtp.starter.core.ResizableCapacityLinkedBlockIngQueue; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * 阻塞队列工具类 + * + * @author chen.ma + * @date 2021/6/20 16:50 + */ +public class BlockingQueueUtil { + + public static BlockingQueue createBlockingQueue(Integer type, Integer capacity) { + BlockingQueue blockingQueue = null; + switch (type) { + case 1: + blockingQueue = new ArrayBlockingQueue(capacity); + break; + case 2: + blockingQueue = new LinkedBlockingQueue(capacity); + break; + case 3: + blockingQueue = new ResizableCapacityLinkedBlockIngQueue(capacity); + default: + break; + } + return blockingQueue; + } +} diff --git a/dtp-spring-boot-starter/src/main/java/io/dtp/starter/toolkit/HttpClientUtil.java b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/toolkit/HttpClientUtil.java new file mode 100644 index 00000000..265dadf3 --- /dev/null +++ b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/toolkit/HttpClientUtil.java @@ -0,0 +1,173 @@ +package io.dtp.starter.toolkit; + +import com.alibaba.fastjson.JSON; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import okhttp3.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * Http 客户端工具类 + * + * @author chen.ma + * @date 2021/6/10 13:30 + */ +@Slf4j +@Component +public class HttpClientUtil { + + @Autowired + private OkHttpClient okHttpClient; + + private MediaType jsonMediaType = MediaType.parse("application/json; charset=utf-8"); + + private static int HTTP_OK_CODE = 200; + + /** + * Get 请求 + * + * @param url + * @return + */ + @SneakyThrows + public String get(String url) { + try { + return new String(doGet(url), "utf-8"); + } catch (Exception e) { + log.error("httpGet 调用失败. {}", url, e); + throw e; + } + } + + /** + * Get 请求, 支持添加查询字符串 + * + * @param url + * @param queryString 查询字符串 + * @return + */ + public String get(String url, Map queryString) { + String fullUrl = buildUrl(url, queryString); + return get(fullUrl); + } + + /** + * 获取 Json 后直接反序列化 + * + * @param url + * @param clazz + * @return + */ + public T restApiGet(String url, Class clazz) { + String resp = get(url); + return JSON.parseObject(resp, clazz); + } + + /** + * Get 请求, 支持查询字符串 + * + * @param url + * @param queryString + * @param clazz + * @param + * @return + */ + public T restApiGet(String url, Map queryString, Class clazz) { + String fullUrl = buildUrl(url, queryString); + String resp = get(fullUrl); + return JSON.parseObject(resp, clazz); + } + + /** + * Rest 接口 Post 调用 + * + * @param url + * @param body + * @return + */ + public String restApiPost(String url, Object body) { + try { + return doPost(url, body); + } catch (Exception e) { + log.error("httpPost 调用失败. {}", url, e); + throw e; + } + } + + /** + * Rest 接口 Post 调用 + * 对返回值直接反序列化 + * + * @param url + * @param body + * @return + */ + public T restApiPost(String url, Object body, Class clazz) { + String resp = restApiPost(url, body); + return JSON.parseObject(resp, clazz); + } + + /** + * 根据查询字符串构造完整的 Url + * + * @param url + * @param queryString + * @return + */ + public String buildUrl(String url, Map queryString) { + if (null == queryString) { + return url; + } + + StringBuilder builder = new StringBuilder(url); + boolean isFirst = true; + + for (Map.Entry entry : queryString.entrySet()) { + String key = entry.getKey(); + if (key != null && entry.getValue() != null) { + if (isFirst) { + isFirst = false; + builder.append("?"); + } else { + builder.append("&"); + } + builder.append(key) + .append("=") + .append(queryString.get(key)); + } + } + + return builder.toString(); + } + + @SneakyThrows + private String doPost(String url, Object body) { + String jsonBody = JSON.toJSONString(body); + RequestBody requestBody = RequestBody.create(jsonMediaType, jsonBody); + Request request = new Request.Builder() + .url(url) + .post(requestBody) + .build(); + Response resp = okHttpClient.newCall(request).execute(); + if (resp.code() != HTTP_OK_CODE) { + String msg = String.format("HttpPost 响应 code 异常. [code] %s [url] %s [body] %s", resp.code(), url, jsonBody); + throw new RuntimeException(msg); + } + return resp.body().string(); + } + + @SneakyThrows + private byte[] doGet(String url) { + Request request = new Request.Builder().get().url(url).build(); + Response resp = okHttpClient.newCall(request).execute(); + if (resp.code() != HTTP_OK_CODE) { + String msg = String.format("HttpGet 响应 code 异常. [code] %s [url] %s", resp.code(), url); + throw new RuntimeException(msg); + } + return resp.body().bytes(); + } + +} diff --git a/dtp-spring-boot-starter/src/main/java/io/dtp/starter/wrap/DynamicThreadPoolWrap.java b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/wrap/DynamicThreadPoolWrap.java new file mode 100644 index 00000000..47940e8a --- /dev/null +++ b/dtp-spring-boot-starter/src/main/java/io/dtp/starter/wrap/DynamicThreadPoolWrap.java @@ -0,0 +1,77 @@ +package io.dtp.starter.wrap; + +import io.dtp.starter.common.CommonThreadPool; +import lombok.Data; + +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * 线程池包装 + * + * @author chen.ma + * @date 2021/6/20 16:55 + */ +@Data +public class DynamicThreadPoolWrap { + + private String tenant; + + private String itemId; + + private String tpId; + + private ThreadPoolExecutor pool; + + /** + * 首选服务端线程池, 为空使用默认线程池 {@link CommonThreadPool#getInstance(String)} + * + * @param threadPoolId + */ + public DynamicThreadPoolWrap(String threadPoolId) { + this.tpId = threadPoolId; + } + + /** + * 首选服务端线程池, 为空使用 threadPoolExecutor + * + * @param threadPoolId + * @param threadPoolExecutor + */ + public DynamicThreadPoolWrap(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) { + this.tpId = threadPoolId; + this.pool = threadPoolExecutor; + } + + /** + * 提交任务 + * + * @param command + */ + public void execute(Runnable command) { + pool.execute(command); + } + + /** + * 提交任务 + * + * @param task + * @return + */ + public Future submit(Runnable task) { + return pool.submit(task); + } + + /** + * 提交任务 + * + * @param task + * @param + * @return + */ + public Future submit(Callable task) { + return pool.submit(task); + } + +} diff --git a/dtp-spring-boot-starter/src/main/resources/META-INF/spring.factories b/dtp-spring-boot-starter/src/main/resources/META-INF/spring.factories new file mode 100644 index 00000000..60d8bf56 --- /dev/null +++ b/dtp-spring-boot-starter/src/main/resources/META-INF/spring.factories @@ -0,0 +1,2 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=io.dtp.starter.config.CommonConfiguration, \ + io.dtp.starter.config.OkHttpClientConfig \ No newline at end of file diff --git a/dtp-spring-boot-starter/src/test/java/io/dtp/starter/test/ResizableCapacityLinkedBlockIngQueueTest.java b/dtp-spring-boot-starter/src/test/java/io/dtp/starter/test/ResizableCapacityLinkedBlockIngQueueTest.java new file mode 100644 index 00000000..83524ec5 --- /dev/null +++ b/dtp-spring-boot-starter/src/test/java/io/dtp/starter/test/ResizableCapacityLinkedBlockIngQueueTest.java @@ -0,0 +1,72 @@ +package io.dtp.starter.test; + +import io.dtp.starter.core.ResizableCapacityLinkedBlockIngQueue; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * 可调整大小的阻塞队列单元测试 + * + * @author chen.ma + * @date 2021/6/20 14:45 + */ +@Slf4j +public class ResizableCapacityLinkedBlockIngQueueTest { + + public static void main(String[] args) throws InterruptedException { + ResizableCapacityLinkedBlockIngQueue blockIngQueue = new ResizableCapacityLinkedBlockIngQueue(5); + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, + 3, + 1024, + TimeUnit.SECONDS, + blockIngQueue); + + /*Runnable runnable = new Runnable() { + @SneakyThrows + public void run() { + Thread.sleep(10000); + } + }; + for (int i = 0; i <; i++) { + threadPoolExecutor.execute(runnable); + }*/ + + print(threadPoolExecutor); + + Thread.sleep(1000); + blockIngQueue.setCapacity(1000); + print(threadPoolExecutor); + + } + + private static void print(ThreadPoolExecutor executor) { + LinkedBlockingQueue queue = (LinkedBlockingQueue) executor.getQueue(); + + log.info("核心线程数 :: {}," + + " 活动线程数 :: {}," + + " 最大线程数 :: {}," + + " 线程池活跃度 :: {}," + + " 任务完成数 :: {}," + + " 队列大小 :: {}," + + " 当前排队线程数 :: {}," + + " 队列剩余大小 :: {}," + + " 队列使用度 :: {}", + executor.getCorePoolSize(), + executor.getActiveCount(), + executor.getMaximumPoolSize(), + divide(executor.getActiveCount(), executor.getMaximumPoolSize()), + executor.getCompletedTaskCount(), + (queue.size() + queue.remainingCapacity()), + queue.size(), + queue.remainingCapacity(), + divide(queue.size(), queue.size() + queue.remainingCapacity())); + + } + + private static String divide(int num1, int num2) { + return String.format("%1.2f%%", Double.parseDouble(num1 + "") / Double.parseDouble(num2 + "") * 100); + } +} diff --git a/pom.xml b/pom.xml index fcf6d3e9..1de4d06e 100644 --- a/pom.xml +++ b/pom.xml @@ -3,24 +3,28 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - io.ruyi - ruyi-parent + io.dynamic-thread-pool + dtp-parent ${revision} pom - ruyi + ${project.artifactId} 动态线程池,附带监控报警功能 1.8 1.0.0-SNAPSHOT + 3.8.1 + 5.4.7 + 1.2.75 + 2.3.2.RELEASE - ruyi-spring-boot-starter - server + dtp-spring-boot-starter + dtp-server @@ -32,6 +36,25 @@ pom import + + + com.squareup.okhttp3 + logging-interceptor + ${okhttp3.version} + true + + + + com.alibaba + fastjson + ${fastjson.version} + + + + cn.hutool + hutool-core + ${hutool-core.version} + @@ -48,14 +71,6 @@ org.springframework.boot spring-boot-maven-plugin - - - - org.projectlombok - lombok - - -