diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java b/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java index da6ccc9f..fb80954e 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java @@ -68,6 +68,8 @@ public class Constants { public static final String REGISTER_ADAPTER_PATH = REGISTER_ADAPTER_BASE_PATH + "/register"; + public static final String REGISTER_DYNAMIC_THREAD_POOL_PATH = CONFIG_CONTROLLER_PATH + "/register"; + public static final String HEALTH_CHECK_PATH = BASE_PATH + "/health/check"; public static final String PROBE_MODIFY_REQUEST = "Listening-Configs"; diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/model/ThreadPoolParameterInfo.java b/hippo4j-common/src/main/java/cn/hippo4j/common/model/ThreadPoolParameterInfo.java index 175db3dc..8a6a5ef8 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/model/ThreadPoolParameterInfo.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/model/ThreadPoolParameterInfo.java @@ -38,33 +38,33 @@ public class ThreadPoolParameterInfo implements ThreadPoolParameter, Serializabl private static final long serialVersionUID = -7123935122108553864L; /** - * tenantId + * Tenant id */ private String tenantId; /** - * itemId + * Item id */ private String itemId; /** - * tpId + * Thread-pool id */ private String tpId; /** - * content + * Content */ private String content; /** - * coreSize + * Core size */ @Deprecated private Integer coreSize; /** - * maxSize + * Max size */ @Deprecated private Integer maxSize; @@ -80,42 +80,42 @@ public class ThreadPoolParameterInfo implements ThreadPoolParameter, Serializabl private Integer maximumPoolSize; /** - * queueType + * Queue type */ private Integer queueType; /** - * capacity + * Capacity */ private Integer capacity; /** - * keepAliveTime + * Keep alive time */ private Integer keepAliveTime; /** - * rejectedType + * Rejected type */ private Integer rejectedType; /** - * isAlarm + * Is alarm */ private Integer isAlarm; /** - * capacityAlarm + * Capacity alarm */ private Integer capacityAlarm; /** - * livenessAlarm + * Liveness alarm */ private Integer livenessAlarm; /** - * allowCoreThreadTimeOut + * Allow core thread timeout */ private Integer allowCoreThreadTimeOut; diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java new file mode 100644 index 00000000..4989e027 --- /dev/null +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterParameter.java @@ -0,0 +1,77 @@ +package cn.hippo4j.common.model.register; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Dynamic thread-pool register parameter. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class DynamicThreadPoolRegisterParameter { + + /** + * Thread-pool id + * Empty or empty strings are not allowed, and `+` signs are not allowed + */ + private String threadPoolId; + + /** + * Content + */ + private String content; + + /** + * Core pool size + */ + private Integer corePoolSize; + + /** + * Maximum pool size + */ + private Integer maximumPoolSize; + + /** + * Queue type + */ + private Integer queueType; + + /** + * Capacity + */ + private Integer capacity; + + /** + * Keep alive time + */ + private Integer keepAliveTime; + + /** + * Rejected type + */ + private Integer rejectedType; + + /** + * Is alarm + */ + private Integer isAlarm; + + /** + * Capacity alarm + */ + private Integer capacityAlarm; + + /** + * Liveness alarm + */ + private Integer livenessAlarm; + + /** + * Allow core thread timeout + */ + private Integer allowCoreThreadTimeOut; +} diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterWrapper.java b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterWrapper.java new file mode 100644 index 00000000..7703d267 --- /dev/null +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/model/register/DynamicThreadPoolRegisterWrapper.java @@ -0,0 +1,45 @@ +package cn.hippo4j.common.model.register; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Dynamic thread-pool register wrapper. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class DynamicThreadPoolRegisterWrapper { + + /** + * Tenant id + */ + private String tenantId; + + /** + * Item id + */ + private String itemId; + + /** + * Update if exists + */ + private Boolean updateIfExists = Boolean.TRUE; + + /** + * Dynamic thread-pool executor + */ + @JsonIgnore + private ThreadPoolExecutor dynamicThreadPoolExecutor; + + /** + * Dynamic thread-pool register parameter + */ + private DynamicThreadPoolRegisterParameter dynamicThreadPoolRegisterParameter; +} diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/controller/ConfigController.java b/hippo4j-config/src/main/java/cn/hippo4j/config/controller/ConfigController.java index b2f03cba..5c0fe84f 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/controller/ConfigController.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/controller/ConfigController.java @@ -17,15 +17,16 @@ package cn.hippo4j.config.controller; +import cn.hippo4j.common.constant.Constants; +import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; +import cn.hippo4j.common.web.base.Result; +import cn.hippo4j.common.web.base.Results; import cn.hippo4j.config.model.ConfigAllInfo; import cn.hippo4j.config.model.ConfigInfoBase; import cn.hippo4j.config.service.ConfigCacheService; import cn.hippo4j.config.service.ConfigServletInner; -import cn.hippo4j.config.toolkit.Md5ConfigUtil; import cn.hippo4j.config.service.biz.ConfigService; -import cn.hippo4j.common.constant.Constants; -import cn.hippo4j.common.web.base.Result; -import cn.hippo4j.common.web.base.Results; +import cn.hippo4j.config.toolkit.Md5ConfigUtil; import cn.hutool.core.util.StrUtil; import lombok.AllArgsConstructor; import lombok.SneakyThrows; @@ -91,4 +92,10 @@ public class ConfigController { } return Results.success(); } + + @PostMapping("/register") + public Result register(@RequestBody DynamicThreadPoolRegisterWrapper registerWrapper) { + configService.register(registerWrapper); + return Results.success(); + } } diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/model/ConfigInfoBase.java b/hippo4j-config/src/main/java/cn/hippo4j/config/model/ConfigInfoBase.java index b1a98e67..60eddf3d 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/model/ConfigInfoBase.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/model/ConfigInfoBase.java @@ -19,6 +19,7 @@ package cn.hippo4j.config.model; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; +import com.fasterxml.jackson.annotation.JsonAlias; import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Data; @@ -56,11 +57,13 @@ public class ConfigInfoBase implements Serializable { /** * coreSize */ + @JsonAlias("corePoolSize") private Integer coreSize; /** * maxSize */ + @JsonAlias("maximumPoolSize") private Integer maxSize; /** diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/notify/DefaultPublisher.java b/hippo4j-config/src/main/java/cn/hippo4j/config/notify/DefaultPublisher.java index 06e66d78..6c2860ff 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/notify/DefaultPublisher.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/notify/DefaultPublisher.java @@ -111,7 +111,7 @@ public class DefaultPublisher extends Thread implements EventPublisher { public boolean publish(AbstractEvent event) { boolean success = this.queue.offer(event); if (!success) { - log.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event); + log.warn("Unable to plug in due to interruption, synchronize sending time, event: {}", event); receiveEvent(event); return true; } @@ -130,7 +130,7 @@ public class DefaultPublisher extends Thread implements EventPublisher { try { job.run(); } catch (Throwable e) { - log.error("Event callback exception : {}", e); + log.error("Event callback exception: {}", e); } } } diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/notify/NotifyCenter.java b/hippo4j-config/src/main/java/cn/hippo4j/config/notify/NotifyCenter.java index a0c30918..bd5d04fa 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/notify/NotifyCenter.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/notify/NotifyCenter.java @@ -58,7 +58,7 @@ public class NotifyCenter { publisher.init(cls, buffer); return publisher; } catch (Throwable ex) { - log.error("Service class newInstance has error : {}", ex); + log.error("Service class newInstance has error: {}", ex); throw new RuntimeException(ex); } }; @@ -98,7 +98,7 @@ public class NotifyCenter { try { return publishEvent(event.getClass(), event); } catch (Throwable ex) { - log.error("There was an exception to the message publishing : {}", ex); + log.error("There was an exception to the message publishing: {}", ex); return false; } } diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/service/ConfigCacheService.java b/hippo4j-config/src/main/java/cn/hippo4j/config/service/ConfigCacheService.java index b0385991..194e1811 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/service/ConfigCacheService.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/service/ConfigCacheService.java @@ -178,7 +178,7 @@ public class ConfigCacheService { List identificationList = MapUtil.parseMapForFilter(CLIENT_CONFIG_CACHE, coarse); for (String cacheMapKey : identificationList) { Map removeCacheItem = CLIENT_CONFIG_CACHE.remove(cacheMapKey); - log.info("Remove invalidated config cache. config info : {}", JSONUtil.toJSONString(removeCacheItem)); + log.info("Remove invalidated config cache. config info: {}", JSONUtil.toJSONString(removeCacheItem)); } } @@ -189,7 +189,7 @@ public class ConfigCacheService { @Override public void accept(ObserverMessage observerMessage) { - log.info("Clean up the configuration cache. Key : {}", observerMessage.message()); + log.info("Clean up the configuration cache. Key: {}", observerMessage.message()); coarseRemove(observerMessage.message()); } } diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/ConfigService.java b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/ConfigService.java index a12b1225..e42efa93 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/ConfigService.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/ConfigService.java @@ -17,6 +17,7 @@ package cn.hippo4j.config.service.biz; +import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; import cn.hippo4j.config.model.ConfigAllInfo; /** @@ -50,4 +51,11 @@ public interface ConfigService { * @param configAllInfo */ void insertOrUpdate(String identify, boolean isChangeNotice, ConfigAllInfo configAllInfo); + + /** + * Register. + * + * @param registerWrapper + */ + void register(DynamicThreadPoolRegisterWrapper registerWrapper); } diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ConfigServiceImpl.java b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ConfigServiceImpl.java index 1fda0ac4..636b105e 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ConfigServiceImpl.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ConfigServiceImpl.java @@ -19,6 +19,8 @@ package cn.hippo4j.config.service.biz.impl; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.enums.DelEnum; +import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter; +import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; import cn.hippo4j.common.toolkit.*; import cn.hippo4j.common.web.exception.ServiceException; import cn.hippo4j.config.event.LocalDataChangeEvent; @@ -31,7 +33,9 @@ import cn.hippo4j.config.model.LogRecordInfo; import cn.hippo4j.config.service.ConfigCacheService; import cn.hippo4j.config.service.ConfigChangePublisher; import cn.hippo4j.config.service.biz.ConfigService; +import cn.hippo4j.config.service.biz.ItemService; import cn.hippo4j.config.service.biz.OperationLogService; +import cn.hippo4j.config.service.biz.TenantService; import cn.hippo4j.config.toolkit.BeanUtil; import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; @@ -133,6 +137,26 @@ public class ConfigServiceImpl implements ConfigService { } } + @Override + public void register(DynamicThreadPoolRegisterWrapper registerWrapper) { + DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); + ConfigAllInfo configAllInfo = JSONUtil.parseObject(JSONUtil.toJSONString(registerParameter), ConfigAllInfo.class); + configAllInfo.setTenantId(registerWrapper.getTenantId()); + configAllInfo.setItemId(registerWrapper.getItemId()); + configAllInfo.setTpId(registerParameter.getThreadPoolId()); + TenantService tenantService = ApplicationContextHolder.getBean(TenantService.class); + ItemService itemService = ApplicationContextHolder.getBean(ItemService.class); + Assert.isTrue(tenantService.getTenantByTenantId(registerWrapper.getTenantId()) != null, "Tenant does not exist"); + Assert.isTrue(itemService.queryItemById(registerWrapper.getTenantId(), registerWrapper.getItemId()) != null, "Item does not exist"); + ConfigAllInfo existConfigAllInfo = findConfigAllInfo(configAllInfo.getTpId(), registerWrapper.getItemId(), registerWrapper.getTenantId()); + if (existConfigAllInfo == null) { + addConfigInfo(configAllInfo); + } else if (registerWrapper.getUpdateIfExists()) { + ConfigServiceImpl configService = ApplicationContextHolder.getBean(this.getClass()); + configService.updateConfigInfo(null, false, configAllInfo); + } + } + private void verification(String identify) { if (StringUtil.isNotBlank(identify)) { Map content = getContent(identify); @@ -156,7 +180,7 @@ public class ConfigServiceImpl implements ConfigService { } } } catch (Exception ex) { - log.error("[db-error] message : {}", ex.getMessage(), ex); + log.error("[db-error] message: {}", ex.getMessage(), ex); throw ex; } return null; @@ -192,7 +216,7 @@ public class ConfigServiceImpl implements ConfigService { } configInfoMapper.update(config, wrapper); } catch (Exception ex) { - log.error("[db-error] message : {}", ex.getMessage(), ex); + log.error("[db-error] message: {}", ex.getMessage(), ex); throw ex; } } diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/service/handler/ClientCloseHookRemoveConfigCache.java b/hippo4j-config/src/main/java/cn/hippo4j/config/service/handler/ClientCloseHookRemoveConfigCache.java index 34e81cc7..d67ab517 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/service/handler/ClientCloseHookRemoveConfigCache.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/service/handler/ClientCloseHookRemoveConfigCache.java @@ -33,7 +33,7 @@ public class ClientCloseHookRemoveConfigCache implements ClientCloseHookExecute @Override public void closeHook(ClientCloseHookReq requestParam) { - log.info("Remove Config Cache, Execute client hook function. Request : {}", JSONUtil.toJSONString(requestParam)); + log.info("Remove Config Cache, Execute client hook function. Request: {}", JSONUtil.toJSONString(requestParam)); try { String groupKey = requestParam.getGroupKey(); if (StrUtil.isNotBlank(groupKey)) { diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/service/handler/RemoveThreadPoolAdapterCache.java b/hippo4j-config/src/main/java/cn/hippo4j/config/service/handler/RemoveThreadPoolAdapterCache.java index 7b01d631..c2819fd4 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/service/handler/RemoveThreadPoolAdapterCache.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/service/handler/RemoveThreadPoolAdapterCache.java @@ -32,7 +32,7 @@ public class RemoveThreadPoolAdapterCache implements ClientCloseHookExecute { @Override public void closeHook(ClientCloseHookReq requestParam) { - log.info("Remove thread-pool adapter cache, Execute client hook function. Req : {}", JSONUtil.toJSONString(requestParam)); + log.info("Remove thread-pool adapter cache, Execute client hook function. Req: {}", JSONUtil.toJSONString(requestParam)); ThreadPoolAdapterService.remove(requestParam.getInstanceId()); } } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java index a83f1c62..a8827549 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java @@ -19,7 +19,10 @@ package cn.hippo4j.core.executor; import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; import cn.hippo4j.core.executor.support.CommonDynamicThreadPool; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import org.springframework.beans.factory.DisposableBean; import java.util.concurrent.Callable; @@ -30,6 +33,9 @@ import java.util.concurrent.ThreadPoolExecutor; * Dynamic thread-pool wrapper. */ @Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class DynamicThreadPoolWrapper implements DisposableBean { private String tenantId, itemId, threadPoolId; diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/DynamicThreadPoolService.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/DynamicThreadPoolService.java new file mode 100644 index 00000000..38cdbdeb --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/DynamicThreadPoolService.java @@ -0,0 +1,16 @@ +package cn.hippo4j.core.executor.support; + +import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; + +/** + * Dynamic thread-pool service. + */ +public interface DynamicThreadPoolService { + + /** + * Registering dynamic thread pools at runtime. + * + * @param registerWrapper + */ + void registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper); +} diff --git a/hippo4j-discovery/src/main/java/cn/hippo4j/discovery/controller/ApplicationController.java b/hippo4j-discovery/src/main/java/cn/hippo4j/discovery/controller/ApplicationController.java index ebad6e7c..a72b197a 100644 --- a/hippo4j-discovery/src/main/java/cn/hippo4j/discovery/controller/ApplicationController.java +++ b/hippo4j-discovery/src/main/java/cn/hippo4j/discovery/controller/ApplicationController.java @@ -58,7 +58,7 @@ public class ApplicationController { public Result renew(@RequestBody InstanceInfo.InstanceRenew instanceRenew) { boolean isSuccess = instanceRegistry.renew(instanceRenew); if (!isSuccess) { - log.warn("Not Found (Renew) : {} - {}", instanceRenew.getAppName(), instanceRenew.getInstanceId()); + log.warn("Not Found (Renew): {} - {}", instanceRenew.getAppName(), instanceRenew.getInstanceId()); return Results.failure(ErrorCodeEnum.NOT_FOUND); } return Results.success(); diff --git a/hippo4j-discovery/src/main/java/cn/hippo4j/discovery/core/BaseInstanceRegistry.java b/hippo4j-discovery/src/main/java/cn/hippo4j/discovery/core/BaseInstanceRegistry.java index 1160f476..33dcdf3c 100644 --- a/hippo4j-discovery/src/main/java/cn/hippo4j/discovery/core/BaseInstanceRegistry.java +++ b/hippo4j-discovery/src/main/java/cn/hippo4j/discovery/core/BaseInstanceRegistry.java @@ -113,15 +113,15 @@ public class BaseInstanceRegistry implements InstanceRegistry { String instanceId = info.getInstanceId(); Map> leaseMap = registry.get(appName); if (CollectionUtils.isEmpty(leaseMap)) { - log.warn("Failed to remove unhealthy node, no application found : {}", appName); + log.warn("Failed to remove unhealthy node, no application found: {}", appName); return; } Lease remove = leaseMap.remove(instanceId); if (remove == null) { - log.warn("Failed to remove unhealthy node, no instance found : {}", instanceId); + log.warn("Failed to remove unhealthy node, no instance found: {}", instanceId); return; } - log.info("Remove unhealthy node, node ID : {}", instanceId); + log.info("Remove unhealthy node, node ID: {}", instanceId); } public void evict(long additionalLeaseMs) { @@ -150,7 +150,7 @@ public class BaseInstanceRegistry implements InstanceRegistry { if (!CollectionUtils.isEmpty(registerMap)) { registerMap.remove(id); AbstractSubjectCenter.notify(AbstractSubjectCenter.SubjectType.CLEAR_CONFIG_CACHE, () -> identify); - log.info("Clean up unhealthy nodes. Node id : {}", id); + log.info("Clean up unhealthy nodes. Node id: {}", id); } return true; } diff --git a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/TaskDecoratorTest.java b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/TaskDecoratorTest.java index efdad813..3de161f1 100644 --- a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/TaskDecoratorTest.java +++ b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/TaskDecoratorTest.java @@ -71,7 +71,7 @@ public class TaskDecoratorTest { * 此处打印不为空, taskDecorator 即为生效. * taskDecorator 配置查看 {@link ThreadPoolConfig#messageConsumeDynamicThreadPool()} */ - log.info("通过 taskDecorator MDC 传递上下文 : {}", MDC.get(PLACEHOLDER)); + log.info("通过 taskDecorator MDC 传递上下文: {}", MDC.get(PLACEHOLDER)); }); }); } diff --git a/hippo4j-server/src/main/java/cn/hippo4j/server/listener/StartingApplicationListener.java b/hippo4j-server/src/main/java/cn/hippo4j/server/listener/StartingApplicationListener.java index f7002f0c..f5510477 100644 --- a/hippo4j-server/src/main/java/cn/hippo4j/server/listener/StartingApplicationListener.java +++ b/hippo4j-server/src/main/java/cn/hippo4j/server/listener/StartingApplicationListener.java @@ -70,7 +70,7 @@ public class StartingApplicationListener implements Hippo4JApplicationListener { @Override public void failed(ConfigurableApplicationContext context, Throwable exception) { - log.error("Startup errors : {}", exception); + log.error("Startup errors: {}", exception); closeExecutor(); context.close(); log.error("Hippo4J failed to start, please see {} for more details.", diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java index 49c6195d..24f40176 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -20,11 +20,13 @@ package cn.hippo4j.springboot.starter.config; import cn.hippo4j.adapter.base.ThreadPoolAdapterBeanContainer; import cn.hippo4j.adapter.web.WebThreadPoolHandlerChoose; import cn.hippo4j.common.api.ThreadDetailState; +import cn.hippo4j.common.api.ThreadPoolDynamicRefresh; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.core.config.UtilAutoConfiguration; import cn.hippo4j.core.enable.MarkerConfiguration; import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler; +import cn.hippo4j.core.executor.support.DynamicThreadPoolService; import cn.hippo4j.core.handler.DynamicThreadPoolBannerHandler; import cn.hippo4j.core.toolkit.IdentifyUtil; import cn.hippo4j.core.toolkit.inet.InetUtils; @@ -85,24 +87,28 @@ public class DynamicThreadPoolAutoConfiguration { } @Bean - @SuppressWarnings("all") - public ConfigService configService(HttpAgent httpAgent, InetUtils hippo4JInetUtils, ServerHealthCheck serverHealthCheck) { + public ClientWorker hippo4jClientWorker(HttpAgent httpAgent, + InetUtils hippo4JInetUtils, + ServerHealthCheck serverHealthCheck) { String identify = IdentifyUtil.generate(environment, hippo4JInetUtils); - return new ThreadPoolConfigService(httpAgent, identify, serverHealthCheck); + return new ClientWorker(httpAgent, identify, serverHealthCheck); } @Bean - public ThreadPoolOperation threadPoolOperation(ConfigService configService) { - return new ThreadPoolOperation(properties, configService); + @SuppressWarnings("all") + public DynamicThreadPoolService dynamicThreadPoolConfigService(HttpAgent httpAgent, + ClientWorker clientWorker, + ServerHealthCheck serverHealthCheck, + DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig) { + return new DynamicThreadPoolConfigService(httpAgent, clientWorker, properties, dynamicThreadPoolSubscribeConfig); } @Bean @SuppressWarnings("all") public DynamicThreadPoolPostProcessor threadPoolBeanPostProcessor(HttpAgent httpAgent, - ThreadPoolOperation threadPoolOperation, ApplicationContextHolder hippo4JApplicationContextHolder, - ServerThreadPoolDynamicRefresh threadPoolDynamicRefresh) { - return new DynamicThreadPoolPostProcessor(properties, httpAgent, threadPoolOperation, threadPoolDynamicRefresh); + DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig) { + return new DynamicThreadPoolPostProcessor(properties, httpAgent, dynamicThreadPoolSubscribeConfig); } @Bean @@ -125,7 +131,8 @@ public class DynamicThreadPoolAutoConfiguration { } @Bean - public ReportingEventExecutor reportingEventExecutor(BootstrapProperties properties, MessageSender messageSender, + public ReportingEventExecutor reportingEventExecutor(BootstrapProperties properties, + MessageSender messageSender, ServerHealthCheck serverHealthCheck) { return new ReportingEventExecutor(properties, messageSender, serverHealthCheck); } @@ -165,7 +172,8 @@ public class DynamicThreadPoolAutoConfiguration { @Bean @SuppressWarnings("all") - public ThreadPoolAdapterRegister threadPoolAdapterRegister(HttpAgent httpAgent, InetUtils hippo4JInetUtils) { + public ThreadPoolAdapterRegister threadPoolAdapterRegister(HttpAgent httpAgent, + InetUtils hippo4JInetUtils) { return new ThreadPoolAdapterRegister(httpAgent, properties, environment, hippo4JInetUtils); } @@ -177,10 +185,16 @@ public class DynamicThreadPoolAutoConfiguration { } @Bean - public ServerThreadPoolDynamicRefresh threadPoolDynamicRefresh(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler) { + public ThreadPoolDynamicRefresh threadPoolDynamicRefresh(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler) { return new ServerThreadPoolDynamicRefresh(threadPoolNotifyAlarmHandler); } + @Bean + public DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig(ThreadPoolDynamicRefresh threadPoolDynamicRefresh, + ClientWorker clientWorker) { + return new DynamicThreadPoolSubscribeConfig(threadPoolDynamicRefresh, clientWorker, properties); + } + @Bean public ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler(Hippo4jSendMessageService hippoSendMessageService) { return new ThreadPoolNotifyAlarmHandler(hippoSendMessageService); diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolConfigService.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolConfigService.java new file mode 100644 index 00000000..d7db55ce --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolConfigService.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.springboot.starter.core; + +import cn.hippo4j.common.model.ThreadPoolParameterInfo; +import cn.hippo4j.common.toolkit.Assert; +import cn.hippo4j.common.toolkit.JSONUtil; +import cn.hippo4j.common.web.base.Result; +import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; +import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; +import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter; +import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; +import cn.hippo4j.core.executor.support.DynamicThreadPoolService; +import cn.hippo4j.springboot.starter.config.BootstrapProperties; +import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent; +import cn.hippo4j.springboot.starter.remote.HttpAgent; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.ApplicationListener; + +import static cn.hippo4j.common.constant.Constants.REGISTER_DYNAMIC_THREAD_POOL_PATH; + +/** + * Dynamic thread-pool config service. + */ +@Slf4j +@RequiredArgsConstructor +public class DynamicThreadPoolConfigService implements DynamicThreadPoolService, ApplicationListener { + + private final HttpAgent httpAgent; + + private final ClientWorker clientWorker; + + private final BootstrapProperties properties; + + private final DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig; + + @Override + public void onApplicationEvent(ApplicationCompleteEvent event) { + clientWorker.notifyApplicationComplete(); + } + + @Override + public void registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper) { + DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); + checkThreadPoolParameter(registerParameter); + ThreadPoolParameterInfo parameter = JSONUtil.parseObject(JSONUtil.toJSONString(registerParameter), ThreadPoolParameterInfo.class); + String threadPoolId = registerParameter.getThreadPoolId(); + Result registerResult; + try { + failDynamicThreadPoolRegisterWrapper(registerWrapper); + registerResult = httpAgent.httpPost(REGISTER_DYNAMIC_THREAD_POOL_PATH, registerWrapper); + } catch (Throwable ex) { + log.error("Failed to dynamically register thread pool: {}", threadPoolId, ex); + throw ex; + } + if (registerResult.isSuccess()) { + DynamicThreadPoolWrapper dynamicThreadPoolWrapper = DynamicThreadPoolWrapper.builder() + .threadPoolId(threadPoolId) + .executor(registerWrapper.getDynamicThreadPoolExecutor()) + .build(); + GlobalThreadPoolManage.register(threadPoolId, parameter, dynamicThreadPoolWrapper); + dynamicThreadPoolSubscribeConfig.subscribeConfig(threadPoolId); + clientWorker.addCacheDataIfAbsent(properties.getNamespace(), properties.getItemId(), threadPoolId); + } + } + + private void checkThreadPoolParameter(DynamicThreadPoolRegisterParameter registerParameter) { + Assert.isTrue(!registerParameter.getThreadPoolId().contains("+"), "The thread pool contains sensitive characters."); + } + + private void failDynamicThreadPoolRegisterWrapper(DynamicThreadPoolRegisterWrapper registerWrapper) { + registerWrapper.setTenantId(properties.getNamespace()); + registerWrapper.setItemId(properties.getItemId()); + } +} diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolPostProcessor.java index 06a73452..9d6c2fa4 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolPostProcessor.java @@ -44,7 +44,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -61,9 +60,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { private final HttpAgent httpAgent; - private final ThreadPoolOperation threadPoolOperation; - - private final ServerThreadPoolDynamicRefresh serverThreadPoolDynamicRefresh; + private final DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig; @Override public Object postProcessBeforeInitialization(Object bean, String beanName) { @@ -174,25 +171,14 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { return newDynamicThreadPoolExecutor; } - private final ExecutorService configRefreshExecutorService = ThreadPoolBuilder.builder() - .corePoolSize(1) - .maxPoolNum(2) - .keepAliveTime(2000) - .timeUnit(TimeUnit.MILLISECONDS) - .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE) - .allowCoreThreadTimeOut(true) - .threadFactory("client.dynamic.threadPool.change.config") - .rejected(new ThreadPoolExecutor.AbortPolicy()) - .build(); - /** * Client dynamic thread pool subscription server configuration. * - * @param dynamicThreadPoolWrap + * @param dynamicThreadPoolWrapper */ - protected void subscribeConfig(DynamicThreadPoolWrapper dynamicThreadPoolWrap) { - if (dynamicThreadPoolWrap.isSubscribeFlag()) { - threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getThreadPoolId(), configRefreshExecutorService, config -> serverThreadPoolDynamicRefresh.dynamicRefresh(config)); + protected void subscribeConfig(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) { + if (dynamicThreadPoolWrapper.isSubscribeFlag()) { + dynamicThreadPoolSubscribeConfig.subscribeConfig(dynamicThreadPoolWrapper.getThreadPoolId()); } } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolSubscribeConfig.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolSubscribeConfig.java new file mode 100644 index 00000000..a2649d6e --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolSubscribeConfig.java @@ -0,0 +1,57 @@ +package cn.hippo4j.springboot.starter.core; + +import cn.hippo4j.common.api.ThreadPoolDynamicRefresh; +import cn.hippo4j.core.executor.support.QueueTypeEnum; +import cn.hippo4j.core.executor.support.ThreadPoolBuilder; +import cn.hippo4j.springboot.starter.config.BootstrapProperties; +import lombok.RequiredArgsConstructor; + +import java.util.Arrays; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Dynamic thread-pool subscribe config. + */ +@RequiredArgsConstructor +public class DynamicThreadPoolSubscribeConfig { + + private final ThreadPoolDynamicRefresh threadPoolDynamicRefresh; + + private final ClientWorker clientWorker; + + private final BootstrapProperties properties; + + private final ExecutorService configRefreshExecutorService = ThreadPoolBuilder.builder() + .corePoolSize(1) + .maxPoolNum(2) + .keepAliveTime(2000) + .timeUnit(TimeUnit.MILLISECONDS) + .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE) + .allowCoreThreadTimeOut(true) + .threadFactory("client.dynamic.threadPool.change.config") + .rejected(new ThreadPoolExecutor.AbortPolicy()) + .build(); + + public void subscribeConfig(String threadPoolId) { + subscribeConfig(threadPoolId, config -> threadPoolDynamicRefresh.dynamicRefresh(config)); + } + + public void subscribeConfig(String threadPoolId, ThreadPoolSubscribeCallback threadPoolSubscribeCallback) { + Listener configListener = new Listener() { + + @Override + public void receiveConfigInfo(String config) { + threadPoolSubscribeCallback.callback(config); + } + + @Override + public Executor getExecutor() { + return configRefreshExecutorService; + } + }; + clientWorker.addTenantListeners(properties.getNamespace(), properties.getItemId(), threadPoolId, Arrays.asList(configListener)); + } +} diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolConfigService.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolConfigService.java deleted file mode 100644 index df4da53a..00000000 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolConfigService.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.hippo4j.springboot.starter.core; - -import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent; -import cn.hippo4j.springboot.starter.remote.HttpAgent; -import cn.hippo4j.springboot.starter.remote.ServerHealthCheck; -import org.springframework.context.ApplicationListener; - -import java.util.Arrays; - -/** - * Thread-pool config service. - */ -public class ThreadPoolConfigService implements ConfigService, ApplicationListener { - - private final ClientWorker clientWorker; - - private final ServerHealthCheck serverHealthCheck; - - public ThreadPoolConfigService(HttpAgent httpAgent, String identify, ServerHealthCheck serverHealthCheck) { - this.serverHealthCheck = serverHealthCheck; - this.clientWorker = new ClientWorker(httpAgent, identify, serverHealthCheck); - } - - @Override - public void addListener(String tenantId, String itemId, String threadPoolId, Listener listener) { - clientWorker.addTenantListeners(tenantId, itemId, threadPoolId, Arrays.asList(listener)); - } - - @Override - public String getServerStatus() { - if (serverHealthCheck.isHealthStatus()) { - return "UP"; - } else { - return "DOWN"; - } - } - - @Override - public void onApplicationEvent(ApplicationCompleteEvent event) { - clientWorker.notifyApplicationComplete(); - } -} diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolOperation.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolOperation.java deleted file mode 100644 index 5bb0512b..00000000 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolOperation.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.hippo4j.springboot.starter.core; - -import cn.hippo4j.springboot.starter.config.BootstrapProperties; - -import java.util.concurrent.Executor; - -/** - * Thread-pool operation. - */ -public class ThreadPoolOperation { - - private final ConfigService configService; - - private final BootstrapProperties properties; - - public ThreadPoolOperation(BootstrapProperties properties, ConfigService configService) { - this.properties = properties; - this.configService = configService; - } - - public Listener subscribeConfig(String threadPoolId, Executor executor, ThreadPoolSubscribeCallback threadPoolSubscribeCallback) { - Listener configListener = new Listener() { - - @Override - public void receiveConfigInfo(String config) { - threadPoolSubscribeCallback.callback(config); - } - - @Override - public Executor getExecutor() { - return executor; - } - }; - configService.addListener(properties.getNamespace(), properties.getItemId(), threadPoolId, configListener); - return configListener; - } -} diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/toolkit/HttpClientUtil.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/toolkit/HttpClientUtil.java index cd395cb2..184c77c9 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/toolkit/HttpClientUtil.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/toolkit/HttpClientUtil.java @@ -121,7 +121,7 @@ public class HttpClientUtil { try { return doPost(url, body); } catch (Exception e) { - log.error("httpPost 调用失败. {} message : {}", url, e.getMessage()); + log.error("httpPost 调用失败. {} message: {}", url, e.getMessage()); throw e; } }