Add thread pool automatic registration function & log printing optimization

pull/494/head
chen.ma 2 years ago
parent 4863f8ef47
commit ef93ab435d

@ -68,6 +68,8 @@ public class Constants {
public static final String REGISTER_ADAPTER_PATH = REGISTER_ADAPTER_BASE_PATH + "/register"; public static final String REGISTER_ADAPTER_PATH = REGISTER_ADAPTER_BASE_PATH + "/register";
public static final String REGISTER_DYNAMIC_THREAD_POOL_PATH = CONFIG_CONTROLLER_PATH + "/register";
public static final String HEALTH_CHECK_PATH = BASE_PATH + "/health/check"; public static final String HEALTH_CHECK_PATH = BASE_PATH + "/health/check";
public static final String PROBE_MODIFY_REQUEST = "Listening-Configs"; public static final String PROBE_MODIFY_REQUEST = "Listening-Configs";

@ -38,33 +38,33 @@ public class ThreadPoolParameterInfo implements ThreadPoolParameter, Serializabl
private static final long serialVersionUID = -7123935122108553864L; private static final long serialVersionUID = -7123935122108553864L;
/** /**
* tenantId * Tenant id
*/ */
private String tenantId; private String tenantId;
/** /**
* itemId * Item id
*/ */
private String itemId; private String itemId;
/** /**
* tpId * Thread-pool id
*/ */
private String tpId; private String tpId;
/** /**
* content * Content
*/ */
private String content; private String content;
/** /**
* coreSize * Core size
*/ */
@Deprecated @Deprecated
private Integer coreSize; private Integer coreSize;
/** /**
* maxSize * Max size
*/ */
@Deprecated @Deprecated
private Integer maxSize; private Integer maxSize;
@ -80,42 +80,42 @@ public class ThreadPoolParameterInfo implements ThreadPoolParameter, Serializabl
private Integer maximumPoolSize; private Integer maximumPoolSize;
/** /**
* queueType * Queue type
*/ */
private Integer queueType; private Integer queueType;
/** /**
* capacity * Capacity
*/ */
private Integer capacity; private Integer capacity;
/** /**
* keepAliveTime * Keep alive time
*/ */
private Integer keepAliveTime; private Integer keepAliveTime;
/** /**
* rejectedType * Rejected type
*/ */
private Integer rejectedType; private Integer rejectedType;
/** /**
* isAlarm * Is alarm
*/ */
private Integer isAlarm; private Integer isAlarm;
/** /**
* capacityAlarm * Capacity alarm
*/ */
private Integer capacityAlarm; private Integer capacityAlarm;
/** /**
* livenessAlarm * Liveness alarm
*/ */
private Integer livenessAlarm; private Integer livenessAlarm;
/** /**
* allowCoreThreadTimeOut * Allow core thread timeout
*/ */
private Integer allowCoreThreadTimeOut; private Integer allowCoreThreadTimeOut;

@ -0,0 +1,77 @@
package cn.hippo4j.common.model.register;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* Dynamic thread-pool register parameter.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DynamicThreadPoolRegisterParameter {
/**
* Thread-pool id
* Empty or empty strings are not allowed, and `+` signs are not allowed
*/
private String threadPoolId;
/**
* Content
*/
private String content;
/**
* Core pool size
*/
private Integer corePoolSize;
/**
* Maximum pool size
*/
private Integer maximumPoolSize;
/**
* Queue type
*/
private Integer queueType;
/**
* Capacity
*/
private Integer capacity;
/**
* Keep alive time
*/
private Integer keepAliveTime;
/**
* Rejected type
*/
private Integer rejectedType;
/**
* Is alarm
*/
private Integer isAlarm;
/**
* Capacity alarm
*/
private Integer capacityAlarm;
/**
* Liveness alarm
*/
private Integer livenessAlarm;
/**
* Allow core thread timeout
*/
private Integer allowCoreThreadTimeOut;
}

@ -0,0 +1,45 @@
package cn.hippo4j.common.model.register;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Dynamic thread-pool register wrapper.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DynamicThreadPoolRegisterWrapper {
/**
* Tenant id
*/
private String tenantId;
/**
* Item id
*/
private String itemId;
/**
* Update if exists
*/
private Boolean updateIfExists = Boolean.TRUE;
/**
* Dynamic thread-pool executor
*/
@JsonIgnore
private ThreadPoolExecutor dynamicThreadPoolExecutor;
/**
* Dynamic thread-pool register parameter
*/
private DynamicThreadPoolRegisterParameter dynamicThreadPoolRegisterParameter;
}

@ -17,15 +17,16 @@
package cn.hippo4j.config.controller; package cn.hippo4j.config.controller;
import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import cn.hippo4j.config.model.ConfigAllInfo; import cn.hippo4j.config.model.ConfigAllInfo;
import cn.hippo4j.config.model.ConfigInfoBase; import cn.hippo4j.config.model.ConfigInfoBase;
import cn.hippo4j.config.service.ConfigCacheService; import cn.hippo4j.config.service.ConfigCacheService;
import cn.hippo4j.config.service.ConfigServletInner; import cn.hippo4j.config.service.ConfigServletInner;
import cn.hippo4j.config.toolkit.Md5ConfigUtil;
import cn.hippo4j.config.service.biz.ConfigService; import cn.hippo4j.config.service.biz.ConfigService;
import cn.hippo4j.common.constant.Constants; import cn.hippo4j.config.toolkit.Md5ConfigUtil;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
@ -91,4 +92,10 @@ public class ConfigController {
} }
return Results.success(); return Results.success();
} }
@PostMapping("/register")
public Result register(@RequestBody DynamicThreadPoolRegisterWrapper registerWrapper) {
configService.register(registerWrapper);
return Results.success();
}
} }

@ -19,6 +19,7 @@ package cn.hippo4j.config.model;
import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableId;
import com.fasterxml.jackson.annotation.JsonAlias;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data; import lombok.Data;
@ -56,11 +57,13 @@ public class ConfigInfoBase implements Serializable {
/** /**
* coreSize * coreSize
*/ */
@JsonAlias("corePoolSize")
private Integer coreSize; private Integer coreSize;
/** /**
* maxSize * maxSize
*/ */
@JsonAlias("maximumPoolSize")
private Integer maxSize; private Integer maxSize;
/** /**

@ -17,6 +17,7 @@
package cn.hippo4j.config.service.biz; package cn.hippo4j.config.service.biz;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.config.model.ConfigAllInfo; import cn.hippo4j.config.model.ConfigAllInfo;
/** /**
@ -50,4 +51,11 @@ public interface ConfigService {
* @param configAllInfo * @param configAllInfo
*/ */
void insertOrUpdate(String identify, boolean isChangeNotice, ConfigAllInfo configAllInfo); void insertOrUpdate(String identify, boolean isChangeNotice, ConfigAllInfo configAllInfo);
/**
* Register.
*
* @param registerWrapper
*/
void register(DynamicThreadPoolRegisterWrapper registerWrapper);
} }

@ -19,6 +19,8 @@ package cn.hippo4j.config.service.biz.impl;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.enums.DelEnum; import cn.hippo4j.common.enums.DelEnum;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.common.toolkit.*; import cn.hippo4j.common.toolkit.*;
import cn.hippo4j.common.web.exception.ServiceException; import cn.hippo4j.common.web.exception.ServiceException;
import cn.hippo4j.config.event.LocalDataChangeEvent; import cn.hippo4j.config.event.LocalDataChangeEvent;
@ -31,7 +33,9 @@ import cn.hippo4j.config.model.LogRecordInfo;
import cn.hippo4j.config.service.ConfigCacheService; import cn.hippo4j.config.service.ConfigCacheService;
import cn.hippo4j.config.service.ConfigChangePublisher; import cn.hippo4j.config.service.ConfigChangePublisher;
import cn.hippo4j.config.service.biz.ConfigService; import cn.hippo4j.config.service.biz.ConfigService;
import cn.hippo4j.config.service.biz.ItemService;
import cn.hippo4j.config.service.biz.OperationLogService; import cn.hippo4j.config.service.biz.OperationLogService;
import cn.hippo4j.config.service.biz.TenantService;
import cn.hippo4j.config.toolkit.BeanUtil; import cn.hippo4j.config.toolkit.BeanUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@ -133,6 +137,26 @@ public class ConfigServiceImpl implements ConfigService {
} }
} }
@Override
public void register(DynamicThreadPoolRegisterWrapper registerWrapper) {
DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter();
ConfigAllInfo configAllInfo = JSONUtil.parseObject(JSONUtil.toJSONString(registerParameter), ConfigAllInfo.class);
configAllInfo.setTenantId(registerWrapper.getTenantId());
configAllInfo.setItemId(registerWrapper.getItemId());
configAllInfo.setTpId(registerParameter.getThreadPoolId());
TenantService tenantService = ApplicationContextHolder.getBean(TenantService.class);
ItemService itemService = ApplicationContextHolder.getBean(ItemService.class);
Assert.isTrue(tenantService.getTenantByTenantId(registerWrapper.getTenantId()) != null, "Tenant does not exist");
Assert.isTrue(itemService.queryItemById(registerWrapper.getTenantId(), registerWrapper.getItemId()) != null, "Item does not exist");
ConfigAllInfo existConfigAllInfo = findConfigAllInfo(configAllInfo.getTpId(), registerWrapper.getItemId(), registerWrapper.getTenantId());
if (existConfigAllInfo == null) {
addConfigInfo(configAllInfo);
} else if (registerWrapper.getUpdateIfExists()) {
ConfigServiceImpl configService = ApplicationContextHolder.getBean(this.getClass());
configService.updateConfigInfo(null, false, configAllInfo);
}
}
private void verification(String identify) { private void verification(String identify) {
if (StringUtil.isNotBlank(identify)) { if (StringUtil.isNotBlank(identify)) {
Map content = getContent(identify); Map content = getContent(identify);

@ -19,7 +19,10 @@ package cn.hippo4j.core.executor;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.executor.support.CommonDynamicThreadPool; import cn.hippo4j.core.executor.support.CommonDynamicThreadPool;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.DisposableBean;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -30,6 +33,9 @@ import java.util.concurrent.ThreadPoolExecutor;
* Dynamic thread-pool wrapper. * Dynamic thread-pool wrapper.
*/ */
@Data @Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DynamicThreadPoolWrapper implements DisposableBean { public class DynamicThreadPoolWrapper implements DisposableBean {
private String tenantId, itemId, threadPoolId; private String tenantId, itemId, threadPoolId;

@ -0,0 +1,16 @@
package cn.hippo4j.core.executor.support;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
/**
* Dynamic thread-pool service.
*/
public interface DynamicThreadPoolService {
/**
* Registering dynamic thread pools at runtime.
*
* @param registerWrapper
*/
void registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper);
}

@ -20,11 +20,13 @@ package cn.hippo4j.springboot.starter.config;
import cn.hippo4j.adapter.base.ThreadPoolAdapterBeanContainer; import cn.hippo4j.adapter.base.ThreadPoolAdapterBeanContainer;
import cn.hippo4j.adapter.web.WebThreadPoolHandlerChoose; import cn.hippo4j.adapter.web.WebThreadPoolHandlerChoose;
import cn.hippo4j.common.api.ThreadDetailState; import cn.hippo4j.common.api.ThreadDetailState;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.config.UtilAutoConfiguration; import cn.hippo4j.core.config.UtilAutoConfiguration;
import cn.hippo4j.core.enable.MarkerConfiguration; import cn.hippo4j.core.enable.MarkerConfiguration;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler; import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import cn.hippo4j.core.executor.support.DynamicThreadPoolService;
import cn.hippo4j.core.handler.DynamicThreadPoolBannerHandler; import cn.hippo4j.core.handler.DynamicThreadPoolBannerHandler;
import cn.hippo4j.core.toolkit.IdentifyUtil; import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.core.toolkit.inet.InetUtils; import cn.hippo4j.core.toolkit.inet.InetUtils;
@ -85,24 +87,28 @@ public class DynamicThreadPoolAutoConfiguration {
} }
@Bean @Bean
@SuppressWarnings("all") public ClientWorker hippo4jClientWorker(HttpAgent httpAgent,
public ConfigService configService(HttpAgent httpAgent, InetUtils hippo4JInetUtils, ServerHealthCheck serverHealthCheck) { InetUtils hippo4JInetUtils,
ServerHealthCheck serverHealthCheck) {
String identify = IdentifyUtil.generate(environment, hippo4JInetUtils); String identify = IdentifyUtil.generate(environment, hippo4JInetUtils);
return new ThreadPoolConfigService(httpAgent, identify, serverHealthCheck); return new ClientWorker(httpAgent, identify, serverHealthCheck);
} }
@Bean @Bean
public ThreadPoolOperation threadPoolOperation(ConfigService configService) { @SuppressWarnings("all")
return new ThreadPoolOperation(properties, configService); public DynamicThreadPoolService dynamicThreadPoolConfigService(HttpAgent httpAgent,
ClientWorker clientWorker,
ServerHealthCheck serverHealthCheck,
DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig) {
return new DynamicThreadPoolConfigService(httpAgent, clientWorker, properties, dynamicThreadPoolSubscribeConfig);
} }
@Bean @Bean
@SuppressWarnings("all") @SuppressWarnings("all")
public DynamicThreadPoolPostProcessor threadPoolBeanPostProcessor(HttpAgent httpAgent, public DynamicThreadPoolPostProcessor threadPoolBeanPostProcessor(HttpAgent httpAgent,
ThreadPoolOperation threadPoolOperation,
ApplicationContextHolder hippo4JApplicationContextHolder, ApplicationContextHolder hippo4JApplicationContextHolder,
ServerThreadPoolDynamicRefresh threadPoolDynamicRefresh) { DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig) {
return new DynamicThreadPoolPostProcessor(properties, httpAgent, threadPoolOperation, threadPoolDynamicRefresh); return new DynamicThreadPoolPostProcessor(properties, httpAgent, dynamicThreadPoolSubscribeConfig);
} }
@Bean @Bean
@ -125,7 +131,8 @@ public class DynamicThreadPoolAutoConfiguration {
} }
@Bean @Bean
public ReportingEventExecutor reportingEventExecutor(BootstrapProperties properties, MessageSender messageSender, public ReportingEventExecutor reportingEventExecutor(BootstrapProperties properties,
MessageSender messageSender,
ServerHealthCheck serverHealthCheck) { ServerHealthCheck serverHealthCheck) {
return new ReportingEventExecutor(properties, messageSender, serverHealthCheck); return new ReportingEventExecutor(properties, messageSender, serverHealthCheck);
} }
@ -165,7 +172,8 @@ public class DynamicThreadPoolAutoConfiguration {
@Bean @Bean
@SuppressWarnings("all") @SuppressWarnings("all")
public ThreadPoolAdapterRegister threadPoolAdapterRegister(HttpAgent httpAgent, InetUtils hippo4JInetUtils) { public ThreadPoolAdapterRegister threadPoolAdapterRegister(HttpAgent httpAgent,
InetUtils hippo4JInetUtils) {
return new ThreadPoolAdapterRegister(httpAgent, properties, environment, hippo4JInetUtils); return new ThreadPoolAdapterRegister(httpAgent, properties, environment, hippo4JInetUtils);
} }
@ -177,10 +185,16 @@ public class DynamicThreadPoolAutoConfiguration {
} }
@Bean @Bean
public ServerThreadPoolDynamicRefresh threadPoolDynamicRefresh(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler) { public ThreadPoolDynamicRefresh threadPoolDynamicRefresh(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler) {
return new ServerThreadPoolDynamicRefresh(threadPoolNotifyAlarmHandler); return new ServerThreadPoolDynamicRefresh(threadPoolNotifyAlarmHandler);
} }
@Bean
public DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig(ThreadPoolDynamicRefresh threadPoolDynamicRefresh,
ClientWorker clientWorker) {
return new DynamicThreadPoolSubscribeConfig(threadPoolDynamicRefresh, clientWorker, properties);
}
@Bean @Bean
public ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler(Hippo4jSendMessageService hippoSendMessageService) { public ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler(Hippo4jSendMessageService hippoSendMessageService) {
return new ThreadPoolNotifyAlarmHandler(hippoSendMessageService); return new ThreadPoolNotifyAlarmHandler(hippoSendMessageService);

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.core.executor.support.DynamicThreadPoolService;
import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent;
import cn.hippo4j.springboot.starter.remote.HttpAgent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import static cn.hippo4j.common.constant.Constants.REGISTER_DYNAMIC_THREAD_POOL_PATH;
/**
* Dynamic thread-pool config service.
*/
@Slf4j
@RequiredArgsConstructor
public class DynamicThreadPoolConfigService implements DynamicThreadPoolService, ApplicationListener<ApplicationCompleteEvent> {
private final HttpAgent httpAgent;
private final ClientWorker clientWorker;
private final BootstrapProperties properties;
private final DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig;
@Override
public void onApplicationEvent(ApplicationCompleteEvent event) {
clientWorker.notifyApplicationComplete();
}
@Override
public void registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper) {
DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter();
checkThreadPoolParameter(registerParameter);
ThreadPoolParameterInfo parameter = JSONUtil.parseObject(JSONUtil.toJSONString(registerParameter), ThreadPoolParameterInfo.class);
String threadPoolId = registerParameter.getThreadPoolId();
Result registerResult;
try {
failDynamicThreadPoolRegisterWrapper(registerWrapper);
registerResult = httpAgent.httpPost(REGISTER_DYNAMIC_THREAD_POOL_PATH, registerWrapper);
} catch (Throwable ex) {
log.error("Failed to dynamically register thread pool: {}", threadPoolId, ex);
throw ex;
}
if (registerResult.isSuccess()) {
DynamicThreadPoolWrapper dynamicThreadPoolWrapper = DynamicThreadPoolWrapper.builder()
.threadPoolId(threadPoolId)
.executor(registerWrapper.getDynamicThreadPoolExecutor())
.build();
GlobalThreadPoolManage.register(threadPoolId, parameter, dynamicThreadPoolWrapper);
dynamicThreadPoolSubscribeConfig.subscribeConfig(threadPoolId);
clientWorker.addCacheDataIfAbsent(properties.getNamespace(), properties.getItemId(), threadPoolId);
}
}
private void checkThreadPoolParameter(DynamicThreadPoolRegisterParameter registerParameter) {
Assert.isTrue(!registerParameter.getThreadPoolId().contains("+"), "The thread pool contains sensitive characters.");
}
private void failDynamicThreadPoolRegisterWrapper(DynamicThreadPoolRegisterWrapper registerWrapper) {
registerWrapper.setTenantId(properties.getNamespace());
registerWrapper.setItemId(properties.getItemId());
}
}

@ -44,7 +44,6 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -61,9 +60,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
private final HttpAgent httpAgent; private final HttpAgent httpAgent;
private final ThreadPoolOperation threadPoolOperation; private final DynamicThreadPoolSubscribeConfig dynamicThreadPoolSubscribeConfig;
private final ServerThreadPoolDynamicRefresh serverThreadPoolDynamicRefresh;
@Override @Override
public Object postProcessBeforeInitialization(Object bean, String beanName) { public Object postProcessBeforeInitialization(Object bean, String beanName) {
@ -174,25 +171,14 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
return newDynamicThreadPoolExecutor; return newDynamicThreadPoolExecutor;
} }
private final ExecutorService configRefreshExecutorService = ThreadPoolBuilder.builder()
.corePoolSize(1)
.maxPoolNum(2)
.keepAliveTime(2000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE)
.allowCoreThreadTimeOut(true)
.threadFactory("client.dynamic.threadPool.change.config")
.rejected(new ThreadPoolExecutor.AbortPolicy())
.build();
/** /**
* Client dynamic thread pool subscription server configuration. * Client dynamic thread pool subscription server configuration.
* *
* @param dynamicThreadPoolWrap * @param dynamicThreadPoolWrapper
*/ */
protected void subscribeConfig(DynamicThreadPoolWrapper dynamicThreadPoolWrap) { protected void subscribeConfig(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) {
if (dynamicThreadPoolWrap.isSubscribeFlag()) { if (dynamicThreadPoolWrapper.isSubscribeFlag()) {
threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getThreadPoolId(), configRefreshExecutorService, config -> serverThreadPoolDynamicRefresh.dynamicRefresh(config)); dynamicThreadPoolSubscribeConfig.subscribeConfig(dynamicThreadPoolWrapper.getThreadPoolId());
} }
} }
} }

@ -0,0 +1,57 @@
package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.core.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import lombok.RequiredArgsConstructor;
import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Dynamic thread-pool subscribe config.
*/
@RequiredArgsConstructor
public class DynamicThreadPoolSubscribeConfig {
private final ThreadPoolDynamicRefresh threadPoolDynamicRefresh;
private final ClientWorker clientWorker;
private final BootstrapProperties properties;
private final ExecutorService configRefreshExecutorService = ThreadPoolBuilder.builder()
.corePoolSize(1)
.maxPoolNum(2)
.keepAliveTime(2000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE)
.allowCoreThreadTimeOut(true)
.threadFactory("client.dynamic.threadPool.change.config")
.rejected(new ThreadPoolExecutor.AbortPolicy())
.build();
public void subscribeConfig(String threadPoolId) {
subscribeConfig(threadPoolId, config -> threadPoolDynamicRefresh.dynamicRefresh(config));
}
public void subscribeConfig(String threadPoolId, ThreadPoolSubscribeCallback threadPoolSubscribeCallback) {
Listener configListener = new Listener() {
@Override
public void receiveConfigInfo(String config) {
threadPoolSubscribeCallback.callback(config);
}
@Override
public Executor getExecutor() {
return configRefreshExecutorService;
}
};
clientWorker.addTenantListeners(properties.getNamespace(), properties.getItemId(), threadPoolId, Arrays.asList(configListener));
}
}

@ -1,59 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent;
import cn.hippo4j.springboot.starter.remote.HttpAgent;
import cn.hippo4j.springboot.starter.remote.ServerHealthCheck;
import org.springframework.context.ApplicationListener;
import java.util.Arrays;
/**
* Thread-pool config service.
*/
public class ThreadPoolConfigService implements ConfigService, ApplicationListener<ApplicationCompleteEvent> {
private final ClientWorker clientWorker;
private final ServerHealthCheck serverHealthCheck;
public ThreadPoolConfigService(HttpAgent httpAgent, String identify, ServerHealthCheck serverHealthCheck) {
this.serverHealthCheck = serverHealthCheck;
this.clientWorker = new ClientWorker(httpAgent, identify, serverHealthCheck);
}
@Override
public void addListener(String tenantId, String itemId, String threadPoolId, Listener listener) {
clientWorker.addTenantListeners(tenantId, itemId, threadPoolId, Arrays.asList(listener));
}
@Override
public String getServerStatus() {
if (serverHealthCheck.isHealthStatus()) {
return "UP";
} else {
return "DOWN";
}
}
@Override
public void onApplicationEvent(ApplicationCompleteEvent event) {
clientWorker.notifyApplicationComplete();
}
}

@ -1,54 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import java.util.concurrent.Executor;
/**
* Thread-pool operation.
*/
public class ThreadPoolOperation {
private final ConfigService configService;
private final BootstrapProperties properties;
public ThreadPoolOperation(BootstrapProperties properties, ConfigService configService) {
this.properties = properties;
this.configService = configService;
}
public Listener subscribeConfig(String threadPoolId, Executor executor, ThreadPoolSubscribeCallback threadPoolSubscribeCallback) {
Listener configListener = new Listener() {
@Override
public void receiveConfigInfo(String config) {
threadPoolSubscribeCallback.callback(config);
}
@Override
public Executor getExecutor() {
return executor;
}
};
configService.addListener(properties.getNamespace(), properties.getItemId(), threadPoolId, configListener);
return configListener;
}
}
Loading…
Cancel
Save