Merge pull request #494 from mabaiwan/develop

Add thread pool automatic registration function
pull/499/head
小马哥 2 years ago committed by GitHub
commit 92ede063d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -68,6 +68,8 @@ public class Constants {
public static final String REGISTER_ADAPTER_PATH = REGISTER_ADAPTER_BASE_PATH + "/register";
public static final String REGISTER_DYNAMIC_THREAD_POOL_PATH = CONFIG_CONTROLLER_PATH + "/register";
public static final String HEALTH_CHECK_PATH = BASE_PATH + "/health/check";
public static final String PROBE_MODIFY_REQUEST = "Listening-Configs";

@ -38,33 +38,33 @@ public class ThreadPoolParameterInfo implements ThreadPoolParameter, Serializabl
private static final long serialVersionUID = -7123935122108553864L;
/**
* tenantId
* Tenant id
*/
private String tenantId;
/**
* itemId
* Item id
*/
private String itemId;
/**
* tpId
* Thread-pool id
*/
private String tpId;
/**
* content
* Content
*/
private String content;
/**
* coreSize
* Core size
*/
@Deprecated
private Integer coreSize;
/**
* maxSize
* Max size
*/
@Deprecated
private Integer maxSize;
@ -80,42 +80,42 @@ public class ThreadPoolParameterInfo implements ThreadPoolParameter, Serializabl
private Integer maximumPoolSize;
/**
* queueType
* Queue type
*/
private Integer queueType;
/**
* capacity
* Capacity
*/
private Integer capacity;
/**
* keepAliveTime
* Keep alive time
*/
private Integer keepAliveTime;
/**
* rejectedType
* Rejected type
*/
private Integer rejectedType;
/**
* isAlarm
* Is alarm
*/
private Integer isAlarm;
/**
* capacityAlarm
* Capacity alarm
*/
private Integer capacityAlarm;
/**
* livenessAlarm
* Liveness alarm
*/
private Integer livenessAlarm;
/**
* allowCoreThreadTimeOut
* Allow core thread timeout
*/
private Integer allowCoreThreadTimeOut;

@ -0,0 +1,77 @@
package cn.hippo4j.common.model.register;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* Dynamic thread-pool register parameter.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DynamicThreadPoolRegisterParameter {
/**
* Thread-pool id
* Empty or empty strings are not allowed, and `+` signs are not allowed
*/
private String threadPoolId;
/**
* Content
*/
private String content;
/**
* Core pool size
*/
private Integer corePoolSize;
/**
* Maximum pool size
*/
private Integer maximumPoolSize;
/**
* Queue type
*/
private Integer queueType;
/**
* Capacity
*/
private Integer capacity;
/**
* Keep alive time
*/
private Integer keepAliveTime;
/**
* Rejected type
*/
private Integer rejectedType;
/**
* Is alarm
*/
private Integer isAlarm;
/**
* Capacity alarm
*/
private Integer capacityAlarm;
/**
* Liveness alarm
*/
private Integer livenessAlarm;
/**
* Allow core thread timeout
*/
private Integer allowCoreThreadTimeOut;
}

@ -0,0 +1,45 @@
package cn.hippo4j.common.model.register;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Dynamic thread-pool register wrapper.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DynamicThreadPoolRegisterWrapper {
/**
* Tenant id
*/
private String tenantId;
/**
* Item id
*/
private String itemId;
/**
* Update if exists
*/
private Boolean updateIfExists = Boolean.TRUE;
/**
* Dynamic thread-pool executor
*/
@JsonIgnore
private ThreadPoolExecutor dynamicThreadPoolExecutor;
/**
* Dynamic thread-pool register parameter
*/
private DynamicThreadPoolRegisterParameter dynamicThreadPoolRegisterParameter;
}

@ -17,15 +17,16 @@
package cn.hippo4j.config.controller;
import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import cn.hippo4j.config.model.ConfigAllInfo;
import cn.hippo4j.config.model.ConfigInfoBase;
import cn.hippo4j.config.service.ConfigCacheService;
import cn.hippo4j.config.service.ConfigServletInner;
import cn.hippo4j.config.toolkit.Md5ConfigUtil;
import cn.hippo4j.config.service.biz.ConfigService;
import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import cn.hippo4j.config.toolkit.Md5ConfigUtil;
import cn.hutool.core.util.StrUtil;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
@ -91,4 +92,10 @@ public class ConfigController {
}
return Results.success();
}
@PostMapping("/register")
public Result register(@RequestBody DynamicThreadPoolRegisterWrapper registerWrapper) {
configService.register(registerWrapper);
return Results.success();
}
}

@ -19,6 +19,7 @@ package cn.hippo4j.config.model;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.fasterxml.jackson.annotation.JsonAlias;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
@ -56,11 +57,13 @@ public class ConfigInfoBase implements Serializable {
/**
* coreSize
*/
@JsonAlias("corePoolSize")
private Integer coreSize;
/**
* maxSize
*/
@JsonAlias("maximumPoolSize")
private Integer maxSize;
/**

@ -111,7 +111,7 @@ public class DefaultPublisher extends Thread implements EventPublisher {
public boolean publish(AbstractEvent event) {
boolean success = this.queue.offer(event);
if (!success) {
log.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
log.warn("Unable to plug in due to interruption, synchronize sending time, event: {}", event);
receiveEvent(event);
return true;
}
@ -130,7 +130,7 @@ public class DefaultPublisher extends Thread implements EventPublisher {
try {
job.run();
} catch (Throwable e) {
log.error("Event callback exception : {}", e);
log.error("Event callback exception: {}", e);
}
}
}

@ -58,7 +58,7 @@ public class NotifyCenter {
publisher.init(cls, buffer);
return publisher;
} catch (Throwable ex) {
log.error("Service class newInstance has error : {}", ex);
log.error("Service class newInstance has error: {}", ex);
throw new RuntimeException(ex);
}
};
@ -98,7 +98,7 @@ public class NotifyCenter {
try {
return publishEvent(event.getClass(), event);
} catch (Throwable ex) {
log.error("There was an exception to the message publishing : {}", ex);
log.error("There was an exception to the message publishing: {}", ex);
return false;
}
}

@ -178,7 +178,7 @@ public class ConfigCacheService {
List<String> identificationList = MapUtil.parseMapForFilter(CLIENT_CONFIG_CACHE, coarse);
for (String cacheMapKey : identificationList) {
Map<String, CacheItem> removeCacheItem = CLIENT_CONFIG_CACHE.remove(cacheMapKey);
log.info("Remove invalidated config cache. config info : {}", JSONUtil.toJSONString(removeCacheItem));
log.info("Remove invalidated config cache. config info: {}", JSONUtil.toJSONString(removeCacheItem));
}
}
@ -189,7 +189,7 @@ public class ConfigCacheService {
@Override
public void accept(ObserverMessage<String> observerMessage) {
log.info("Clean up the configuration cache. Key : {}", observerMessage.message());
log.info("Clean up the configuration cache. Key: {}", observerMessage.message());
coarseRemove(observerMessage.message());
}
}

@ -17,6 +17,7 @@
package cn.hippo4j.config.service.biz;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.config.model.ConfigAllInfo;
/**
@ -50,4 +51,11 @@ public interface ConfigService {
* @param configAllInfo
*/
void insertOrUpdate(String identify, boolean isChangeNotice, ConfigAllInfo configAllInfo);
/**
* Register.
*
* @param registerWrapper
*/
void register(DynamicThreadPoolRegisterWrapper registerWrapper);
}

@ -19,6 +19,8 @@ package cn.hippo4j.config.service.biz.impl;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.enums.DelEnum;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.common.toolkit.*;
import cn.hippo4j.common.web.exception.ServiceException;
import cn.hippo4j.config.event.LocalDataChangeEvent;
@ -31,7 +33,9 @@ import cn.hippo4j.config.model.LogRecordInfo;
import cn.hippo4j.config.service.ConfigCacheService;
import cn.hippo4j.config.service.ConfigChangePublisher;
import cn.hippo4j.config.service.biz.ConfigService;
import cn.hippo4j.config.service.biz.ItemService;
import cn.hippo4j.config.service.biz.OperationLogService;
import cn.hippo4j.config.service.biz.TenantService;
import cn.hippo4j.config.toolkit.BeanUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@ -133,6 +137,26 @@ public class ConfigServiceImpl implements ConfigService {
}
}
@Override
public void register(DynamicThreadPoolRegisterWrapper registerWrapper) {
DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter();
ConfigAllInfo configAllInfo = JSONUtil.parseObject(JSONUtil.toJSONString(registerParameter), ConfigAllInfo.class);
configAllInfo.setTenantId(registerWrapper.getTenantId());
configAllInfo.setItemId(registerWrapper.getItemId());
configAllInfo.setTpId(registerParameter.getThreadPoolId());
TenantService tenantService = ApplicationContextHolder.getBean(TenantService.class);
ItemService itemService = ApplicationContextHolder.getBean(ItemService.class);
Assert.isTrue(tenantService.getTenantByTenantId(registerWrapper.getTenantId()) != null, "Tenant does not exist");
Assert.isTrue(itemService.queryItemById(registerWrapper.getTenantId(), registerWrapper.getItemId()) != null, "Item does not exist");
ConfigAllInfo existConfigAllInfo = findConfigAllInfo(configAllInfo.getTpId(), registerWrapper.getItemId(), registerWrapper.getTenantId());
if (existConfigAllInfo == null) {
addConfigInfo(configAllInfo);
} else if (registerWrapper.getUpdateIfExists()) {
ConfigServiceImpl configService = ApplicationContextHolder.getBean(this.getClass());
configService.updateConfigInfo(null, false, configAllInfo);
}
}
private void verification(String identify) {
if (StringUtil.isNotBlank(identify)) {
Map content = getContent(identify);
@ -156,7 +180,7 @@ public class ConfigServiceImpl implements ConfigService {
}
}
} catch (Exception ex) {
log.error("[db-error] message : {}", ex.getMessage(), ex);
log.error("[db-error] message: {}", ex.getMessage(), ex);
throw ex;
}
return null;
@ -192,7 +216,7 @@ public class ConfigServiceImpl implements ConfigService {
}
configInfoMapper.update(config, wrapper);
} catch (Exception ex) {
log.error("[db-error] message : {}", ex.getMessage(), ex);
log.error("[db-error] message: {}", ex.getMessage(), ex);
throw ex;
}
}

@ -33,7 +33,7 @@ public class ClientCloseHookRemoveConfigCache implements ClientCloseHookExecute
@Override
public void closeHook(ClientCloseHookReq requestParam) {
log.info("Remove Config Cache, Execute client hook function. Request : {}", JSONUtil.toJSONString(requestParam));
log.info("Remove Config Cache, Execute client hook function. Request: {}", JSONUtil.toJSONString(requestParam));
try {
String groupKey = requestParam.getGroupKey();
if (StrUtil.isNotBlank(groupKey)) {

@ -32,7 +32,7 @@ public class RemoveThreadPoolAdapterCache implements ClientCloseHookExecute {
@Override
public void closeHook(ClientCloseHookReq requestParam) {
log.info("Remove thread-pool adapter cache, Execute client hook function. Req : {}", JSONUtil.toJSONString(requestParam));
log.info("Remove thread-pool adapter cache, Execute client hook function. Req: {}", JSONUtil.toJSONString(requestParam));
ThreadPoolAdapterService.remove(requestParam.getInstanceId());
}
}

@ -19,7 +19,10 @@ package cn.hippo4j.core.executor;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.executor.support.CommonDynamicThreadPool;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.beans.factory.DisposableBean;
import java.util.concurrent.Callable;
@ -30,6 +33,9 @@ import java.util.concurrent.ThreadPoolExecutor;
* Dynamic thread-pool wrapper.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DynamicThreadPoolWrapper implements DisposableBean {
private String tenantId, itemId, threadPoolId;

@ -0,0 +1,16 @@
package cn.hippo4j.core.executor.support;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
/**
* Dynamic thread-pool service.
*/
public interface DynamicThreadPoolService {
/**
* Registering dynamic thread pools at runtime.
*
* @param registerWrapper
*/
void registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper);
}

@ -58,7 +58,7 @@ public class ApplicationController {
public Result<Void> renew(@RequestBody InstanceInfo.InstanceRenew instanceRenew) {
boolean isSuccess = instanceRegistry.renew(instanceRenew);
if (!isSuccess) {
log.warn("Not Found (Renew) : {} - {}", instanceRenew.getAppName(), instanceRenew.getInstanceId());
log.warn("Not Found (Renew): {} - {}", instanceRenew.getAppName(), instanceRenew.getInstanceId());
return Results.failure(ErrorCodeEnum.NOT_FOUND);
}
return Results.success();

@ -113,15 +113,15 @@ public class BaseInstanceRegistry implements InstanceRegistry<InstanceInfo> {
String instanceId = info.getInstanceId();
Map<String, Lease<InstanceInfo>> leaseMap = registry.get(appName);
if (CollectionUtils.isEmpty(leaseMap)) {
log.warn("Failed to remove unhealthy node, no application found : {}", appName);
log.warn("Failed to remove unhealthy node, no application found: {}", appName);
return;
}
Lease<InstanceInfo> remove = leaseMap.remove(instanceId);
if (remove == null) {
log.warn("Failed to remove unhealthy node, no instance found : {}", instanceId);
log.warn("Failed to remove unhealthy node, no instance found: {}", instanceId);
return;
}
log.info("Remove unhealthy node, node ID : {}", instanceId);
log.info("Remove unhealthy node, node ID: {}", instanceId);
}
public void evict(long additionalLeaseMs) {
@ -150,7 +150,7 @@ public class BaseInstanceRegistry implements InstanceRegistry<InstanceInfo> {
if (!CollectionUtils.isEmpty(registerMap)) {
registerMap.remove(id);
AbstractSubjectCenter.notify(AbstractSubjectCenter.SubjectType.CLEAR_CONFIG_CACHE, () -> identify);
log.info("Clean up unhealthy nodes. Node id : {}", id);
log.info("Clean up unhealthy nodes. Node id: {}", id);
}
return true;
}

@ -71,7 +71,7 @@ public class TaskDecoratorTest {
* , taskDecorator .
* taskDecorator {@link ThreadPoolConfig#messageConsumeDynamicThreadPool()}
*/
log.info("通过 taskDecorator MDC 传递上下文 : {}", MDC.get(PLACEHOLDER));
log.info("通过 taskDecorator MDC 传递上下文: {}", MDC.get(PLACEHOLDER));
});
});
}

@ -22,7 +22,7 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.core")
@SpringBootApplication(scanBasePackages = {"cn.hippo4j.example.core", "cn.hippo4j.example.server"})
public class Hippo4JServerExampleApplication {
public static void main(String[] args) {

@ -0,0 +1,49 @@
package cn.hippo4j.example.server;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.core.executor.support.DynamicThreadPoolService;
import lombok.AllArgsConstructor;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Register dynamic thread-pool test
*/
@Component
@AllArgsConstructor
public class RegisterDynamicThreadPoolTest implements ApplicationRunner {
private final DynamicThreadPoolService dynamicThreadPoolService;
@Override
public void run(ApplicationArguments args) throws Exception {
String threadPoolId = "register-dynamic-thread-pool";
DynamicThreadPoolRegisterParameter parameterInfo = new DynamicThreadPoolRegisterParameter();
parameterInfo.setThreadPoolId(threadPoolId);
parameterInfo.setCorePoolSize(2);
parameterInfo.setMaximumPoolSize(14);
parameterInfo.setQueueType(9);
parameterInfo.setCapacity(110);
parameterInfo.setKeepAliveTime(110);
parameterInfo.setRejectedType(2);
parameterInfo.setIsAlarm(0);
parameterInfo.setCapacityAlarm(90);
parameterInfo.setLivenessAlarm(90);
parameterInfo.setAllowCoreThreadTimeOut(0);
ThreadPoolExecutor threadPoolExecutor = ThreadPoolBuilder.builder()
.threadPoolId(threadPoolId)
.threadFactory(threadPoolId)
.dynamicPool()
.build();
DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder()
.dynamicThreadPoolRegisterParameter(parameterInfo)
.dynamicThreadPoolExecutor(threadPoolExecutor)
.build();
dynamicThreadPoolService.registerDynamicThreadPool(registerWrapper);
}
}

@ -70,7 +70,7 @@ public class StartingApplicationListener implements Hippo4JApplicationListener {
@Override
public void failed(ConfigurableApplicationContext context, Throwable exception) {
log.error("Startup errors : {}", exception);
log.error("Startup errors: {}", exception);
closeExecutor();
context.close();
log.error("Hippo4J failed to start, please see {} for more details.",

@ -20,11 +20,13 @@ package cn.hippo4j.springboot.starter.config;
import cn.hippo4j.adapter.base.ThreadPoolAdapterBeanContainer;
import cn.hippo4j.adapter.web.WebThreadPoolHandlerChoose;
import cn.hippo4j.common.api.ThreadDetailState;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.config.UtilAutoConfiguration;
import cn.hippo4j.core.enable.MarkerConfiguration;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import cn.hippo4j.core.executor.support.DynamicThreadPoolService;
import cn.hippo4j.core.handler.DynamicThreadPoolBannerHandler;
import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.core.toolkit.inet.InetUtils;
@ -85,24 +87,28 @@ public class DynamicThreadPoolAutoConfiguration {
}
@Bean
@SuppressWarnings("all")
public ConfigService configService(HttpAgent httpAgent, InetUtils hippo4JInetUtils, ServerHealthCheck serverHealthCheck) {
public ClientWorker hippo4jClientWorker(HttpAgent httpAgent,
InetUtils hippo4JInetUtils,
ServerHealthCheck serverHealthCheck) {
String identify = IdentifyUtil.generate(environment, hippo4JInetUtils);
return new ThreadPoolConfigService(httpAgent, identify, serverHealthCheck);
return new ClientWorker(httpAgent, identify, serverHealthCheck);
}
@Bean
public ThreadPoolOperation threadPoolOperation(ConfigService configService) {
return new ThreadPoolOperation(properties, configService);
@SuppressWarnings("all")
public DynamicThreadPoolService dynamicThreadPoolConfigService(HttpAgent httpAgent,
ClientWorker clientWorker,
ServerHealthCheck serverHealthCheck,
DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig) {
return new DynamicThreadPoolConfigService(httpAgent, clientWorker, properties, dynamicThreadPoolSubscribeConfig);
}
@Bean
@SuppressWarnings("all")
public DynamicThreadPoolPostProcessor threadPoolBeanPostProcessor(HttpAgent httpAgent,
ThreadPoolOperation threadPoolOperation,
ApplicationContextHolder hippo4JApplicationContextHolder,
ServerThreadPoolDynamicRefresh threadPoolDynamicRefresh) {
return new DynamicThreadPoolPostProcessor(properties, httpAgent, threadPoolOperation, threadPoolDynamicRefresh);
DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig) {
return new DynamicThreadPoolPostProcessor(properties, httpAgent, dynamicThreadPoolSubscribeConfig);
}
@Bean
@ -125,7 +131,8 @@ public class DynamicThreadPoolAutoConfiguration {
}
@Bean
public ReportingEventExecutor reportingEventExecutor(BootstrapProperties properties, MessageSender messageSender,
public ReportingEventExecutor reportingEventExecutor(BootstrapProperties properties,
MessageSender messageSender,
ServerHealthCheck serverHealthCheck) {
return new ReportingEventExecutor(properties, messageSender, serverHealthCheck);
}
@ -165,7 +172,8 @@ public class DynamicThreadPoolAutoConfiguration {
@Bean
@SuppressWarnings("all")
public ThreadPoolAdapterRegister threadPoolAdapterRegister(HttpAgent httpAgent, InetUtils hippo4JInetUtils) {
public ThreadPoolAdapterRegister threadPoolAdapterRegister(HttpAgent httpAgent,
InetUtils hippo4JInetUtils) {
return new ThreadPoolAdapterRegister(httpAgent, properties, environment, hippo4JInetUtils);
}
@ -177,10 +185,16 @@ public class DynamicThreadPoolAutoConfiguration {
}
@Bean
public ServerThreadPoolDynamicRefresh threadPoolDynamicRefresh(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler) {
public ThreadPoolDynamicRefresh threadPoolDynamicRefresh(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler) {
return new ServerThreadPoolDynamicRefresh(threadPoolNotifyAlarmHandler);
}
@Bean
public DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig(ThreadPoolDynamicRefresh threadPoolDynamicRefresh,
ClientWorker clientWorker) {
return new DynamicThreadPoolSubscribeConfig(threadPoolDynamicRefresh, clientWorker, properties);
}
@Bean
public ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler(Hippo4jSendMessageService hippoSendMessageService) {
return new ThreadPoolNotifyAlarmHandler(hippoSendMessageService);

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.core.executor.support.DynamicThreadPoolService;
import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent;
import cn.hippo4j.springboot.starter.remote.HttpAgent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import static cn.hippo4j.common.constant.Constants.REGISTER_DYNAMIC_THREAD_POOL_PATH;
/**
* Dynamic thread-pool config service.
*/
@Slf4j
@RequiredArgsConstructor
public class DynamicThreadPoolConfigService implements DynamicThreadPoolService, ApplicationListener<ApplicationCompleteEvent> {
private final HttpAgent httpAgent;
private final ClientWorker clientWorker;
private final BootstrapProperties properties;
private final DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig;
@Override
public void onApplicationEvent(ApplicationCompleteEvent event) {
clientWorker.notifyApplicationComplete();
}
@Override
public void registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper) {
DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter();
checkThreadPoolParameter(registerParameter);
ThreadPoolParameterInfo parameter = JSONUtil.parseObject(JSONUtil.toJSONString(registerParameter), ThreadPoolParameterInfo.class);
String threadPoolId = registerParameter.getThreadPoolId();
Result registerResult;
try {
failDynamicThreadPoolRegisterWrapper(registerWrapper);
registerResult = httpAgent.httpPost(REGISTER_DYNAMIC_THREAD_POOL_PATH, registerWrapper);
} catch (Throwable ex) {
log.error("Failed to dynamically register thread pool: {}", threadPoolId, ex);
throw ex;
}
if (registerResult.isSuccess()) {
DynamicThreadPoolWrapper dynamicThreadPoolWrapper = DynamicThreadPoolWrapper.builder()
.threadPoolId(threadPoolId)
.executor(registerWrapper.getDynamicThreadPoolExecutor())
.build();
GlobalThreadPoolManage.register(threadPoolId, parameter, dynamicThreadPoolWrapper);
dynamicThreadPoolSubscribeConfig.subscribeConfig(threadPoolId);
clientWorker.addCacheDataIfAbsent(properties.getNamespace(), properties.getItemId(), threadPoolId);
}
}
private void checkThreadPoolParameter(DynamicThreadPoolRegisterParameter registerParameter) {
Assert.isTrue(!registerParameter.getThreadPoolId().contains("+"), "The thread pool contains sensitive characters.");
}
private void failDynamicThreadPoolRegisterWrapper(DynamicThreadPoolRegisterWrapper registerWrapper) {
registerWrapper.setTenantId(properties.getNamespace());
registerWrapper.setItemId(properties.getItemId());
}
}

@ -44,7 +44,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -61,9 +60,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
private final HttpAgent httpAgent;
private final ThreadPoolOperation threadPoolOperation;
private final ServerThreadPoolDynamicRefresh serverThreadPoolDynamicRefresh;
private final DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig;
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) {
@ -174,25 +171,14 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
return newDynamicThreadPoolExecutor;
}
private final ExecutorService configRefreshExecutorService = ThreadPoolBuilder.builder()
.corePoolSize(1)
.maxPoolNum(2)
.keepAliveTime(2000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE)
.allowCoreThreadTimeOut(true)
.threadFactory("client.dynamic.threadPool.change.config")
.rejected(new ThreadPoolExecutor.AbortPolicy())
.build();
/**
* Client dynamic thread pool subscription server configuration.
*
* @param dynamicThreadPoolWrap
* @param dynamicThreadPoolWrapper
*/
protected void subscribeConfig(DynamicThreadPoolWrapper dynamicThreadPoolWrap) {
if (dynamicThreadPoolWrap.isSubscribeFlag()) {
threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getThreadPoolId(), configRefreshExecutorService, config -> serverThreadPoolDynamicRefresh.dynamicRefresh(config));
protected void subscribeConfig(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) {
if (dynamicThreadPoolWrapper.isSubscribeFlag()) {
dynamicThreadPoolSubscribeConfig.subscribeConfig(dynamicThreadPoolWrapper.getThreadPoolId());
}
}
}

@ -0,0 +1,57 @@
package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.core.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import lombok.RequiredArgsConstructor;
import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Dynamic thread-pool subscribe config.
*/
@RequiredArgsConstructor
public class DynamicThreadPoolSubscribeConfig {
private final ThreadPoolDynamicRefresh threadPoolDynamicRefresh;
private final ClientWorker clientWorker;
private final BootstrapProperties properties;
private final ExecutorService configRefreshExecutorService = ThreadPoolBuilder.builder()
.corePoolSize(1)
.maxPoolNum(2)
.keepAliveTime(2000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE)
.allowCoreThreadTimeOut(true)
.threadFactory("client.dynamic.threadPool.change.config")
.rejected(new ThreadPoolExecutor.AbortPolicy())
.build();
public void subscribeConfig(String threadPoolId) {
subscribeConfig(threadPoolId, config -> threadPoolDynamicRefresh.dynamicRefresh(config));
}
public void subscribeConfig(String threadPoolId, ThreadPoolSubscribeCallback threadPoolSubscribeCallback) {
Listener configListener = new Listener() {
@Override
public void receiveConfigInfo(String config) {
threadPoolSubscribeCallback.callback(config);
}
@Override
public Executor getExecutor() {
return configRefreshExecutorService;
}
};
clientWorker.addTenantListeners(properties.getNamespace(), properties.getItemId(), threadPoolId, Arrays.asList(configListener));
}
}

@ -1,59 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent;
import cn.hippo4j.springboot.starter.remote.HttpAgent;
import cn.hippo4j.springboot.starter.remote.ServerHealthCheck;
import org.springframework.context.ApplicationListener;
import java.util.Arrays;
/**
* Thread-pool config service.
*/
public class ThreadPoolConfigService implements ConfigService, ApplicationListener<ApplicationCompleteEvent> {
private final ClientWorker clientWorker;
private final ServerHealthCheck serverHealthCheck;
public ThreadPoolConfigService(HttpAgent httpAgent, String identify, ServerHealthCheck serverHealthCheck) {
this.serverHealthCheck = serverHealthCheck;
this.clientWorker = new ClientWorker(httpAgent, identify, serverHealthCheck);
}
@Override
public void addListener(String tenantId, String itemId, String threadPoolId, Listener listener) {
clientWorker.addTenantListeners(tenantId, itemId, threadPoolId, Arrays.asList(listener));
}
@Override
public String getServerStatus() {
if (serverHealthCheck.isHealthStatus()) {
return "UP";
} else {
return "DOWN";
}
}
@Override
public void onApplicationEvent(ApplicationCompleteEvent event) {
clientWorker.notifyApplicationComplete();
}
}

@ -1,54 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import java.util.concurrent.Executor;
/**
* Thread-pool operation.
*/
public class ThreadPoolOperation {
private final ConfigService configService;
private final BootstrapProperties properties;
public ThreadPoolOperation(BootstrapProperties properties, ConfigService configService) {
this.properties = properties;
this.configService = configService;
}
public Listener subscribeConfig(String threadPoolId, Executor executor, ThreadPoolSubscribeCallback threadPoolSubscribeCallback) {
Listener configListener = new Listener() {
@Override
public void receiveConfigInfo(String config) {
threadPoolSubscribeCallback.callback(config);
}
@Override
public Executor getExecutor() {
return executor;
}
};
configService.addListener(properties.getNamespace(), properties.getItemId(), threadPoolId, configListener);
return configListener;
}
}

@ -121,7 +121,7 @@ public class HttpClientUtil {
try {
return doPost(url, body);
} catch (Exception e) {
log.error("httpPost 调用失败. {} message : {}", url, e.getMessage());
log.error("httpPost 调用失败. {} message: {}", url, e.getMessage());
throw e;
}
}

Loading…
Cancel
Save