Dynamic thread pool console function changes (#447) (#453)

pull/467/head
小马哥 2 years ago committed by GitHub
parent b34c84f97d
commit f241b74c9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -54,10 +54,10 @@ public class ConfigController {
@GetMapping
public Result<ConfigInfoBase> detailConfigInfo(
@RequestParam("tpId") String tpId,
@RequestParam("itemId") String itemId,
@RequestParam("namespace") String namespace,
@RequestParam(value = "instanceId", required = false) String instanceId) {
@RequestParam("tpId") String tpId,
@RequestParam("itemId") String itemId,
@RequestParam("namespace") String namespace,
@RequestParam(value = "instanceId", required = false) String instanceId) {
ConfigAllInfo configAllInfo = configService.findConfigRecentInfo(tpId, itemId, namespace, instanceId);
return Results.success(configAllInfo);
}
@ -65,7 +65,7 @@ public class ConfigController {
@PostMapping
public Result<Boolean> publishConfig(@RequestParam(value = "identify", required = false) String identify,
@RequestBody ConfigAllInfo config) {
configService.insertOrUpdate(identify, config);
configService.insertOrUpdate(identify, true, config);
return Results.success(true);
}

@ -22,6 +22,7 @@ import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.design.observer.AbstractSubjectCenter;
import cn.hippo4j.common.design.observer.Observer;
import cn.hippo4j.common.design.observer.ObserverMessage;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.toolkit.Md5Util;
import cn.hippo4j.config.event.LocalDataChangeEvent;
@ -32,17 +33,19 @@ import cn.hippo4j.config.service.biz.ConfigService;
import cn.hippo4j.config.toolkit.MapUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER;
import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER_TRANSLATION;
/**
* Config cache service.
*/
@ -84,7 +87,7 @@ public class ConfigCacheService {
if (CONFIG_SERVICE == null) {
CONFIG_SERVICE = ApplicationContextHolder.getBean(ConfigService.class);
}
String[] params = groupKey.split("\\+");
String[] params = groupKey.split(GROUP_KEY_DELIMITER_TRANSLATION);
ConfigAllInfo config = CONFIG_SERVICE.findConfigRecentInfo(params);
if (config != null && StrUtil.isNotBlank(config.getTpId())) {
cacheItem = new CacheItem(groupKey, config);
@ -98,7 +101,7 @@ public class ConfigCacheService {
if (CONFIG_SERVICE == null) {
CONFIG_SERVICE = ApplicationContextHolder.getBean(ConfigService.class);
}
String[] params = groupKey.split("\\+");
String[] params = groupKey.split(GROUP_KEY_DELIMITER_TRANSLATION);
ConfigAllInfo config = CONFIG_SERVICE.findConfigRecentInfo(params);
if (config == null || StringUtils.isEmpty(config.getTpId())) {
String errorMessage = String.format("config is null. tpId :: %s, itemId :: %s, tenantId :: %s", params[0], params[1], params[2]);
@ -111,7 +114,7 @@ public class ConfigCacheService {
CacheItem cache = makeSure(groupKey, identify);
if (cache.md5 == null || !cache.md5.equals(md5)) {
cache.md5 = md5;
String[] params = groupKey.split("\\+");
String[] params = groupKey.split(GROUP_KEY_DELIMITER_TRANSLATION);
ConfigAllInfo config = CONFIG_SERVICE.findConfigRecentInfo(params);
cache.configAllInfo = config;
cache.lastModifiedTs = System.currentTimeMillis();
@ -145,6 +148,22 @@ public class ConfigCacheService {
return total.get();
}
public static List<String> getIdentifyList(String tenantId, String itemId, String threadPoolId) {
List<String> identifyList = null;
String buildKey = Joiner.on(GROUP_KEY_DELIMITER).join(Lists.newArrayList(threadPoolId, itemId, tenantId));
List<String> keys = MapUtil.parseMapForFilter(CLIENT_CONFIG_CACHE, buildKey);
if (CollectionUtil.isNotEmpty(keys)) {
identifyList = new ArrayList(keys.size());
for (String each : keys) {
String[] keyArray = each.split(GROUP_KEY_DELIMITER_TRANSLATION);
if (keyArray != null && keyArray.length > 2) {
identifyList.add(keyArray[3]);
}
}
}
return identifyList;
}
/**
* Remove config cache.
*

@ -49,8 +49,8 @@ public interface ConfigService {
* Insert or update.
*
* @param identify
* @param isChangeNotice
* @param configAllInfo
*/
void insertOrUpdate(String identify, ConfigAllInfo configAllInfo);
void insertOrUpdate(String identify, boolean isChangeNotice, ConfigAllInfo configAllInfo);
}

@ -27,6 +27,7 @@ import cn.hippo4j.config.mapper.ConfigInstanceMapper;
import cn.hippo4j.config.model.ConfigAllInfo;
import cn.hippo4j.config.model.ConfigInfoBase;
import cn.hippo4j.config.model.ConfigInstanceInfo;
import cn.hippo4j.config.service.ConfigCacheService;
import cn.hippo4j.config.service.ConfigChangePublisher;
import cn.hippo4j.config.service.biz.ConfigService;
import cn.hippo4j.config.toolkit.BeanUtil;
@ -71,7 +72,6 @@ public class ConfigServiceImpl implements ConfigService {
.eq(StrUtil.isNotBlank(tpId), ConfigAllInfo::getTpId, tpId)
.eq(StrUtil.isNotBlank(itemId), ConfigAllInfo::getItemId, itemId)
.eq(StrUtil.isNotBlank(tenantId), ConfigAllInfo::getTenantId, tenantId);
ConfigAllInfo configAllInfo = configInfoMapper.selectOne(wrapper);
return configAllInfo;
}
@ -80,7 +80,6 @@ public class ConfigServiceImpl implements ConfigService {
public ConfigAllInfo findConfigRecentInfo(String... params) {
ConfigAllInfo resultConfig;
ConfigAllInfo configInstance = null;
String instanceId = params[3];
if (StrUtil.isNotBlank(instanceId)) {
LambdaQueryWrapper<ConfigInstanceInfo> instanceQueryWrapper = Wrappers.lambdaQuery(ConfigInstanceInfo.class)
@ -90,7 +89,6 @@ public class ConfigServiceImpl implements ConfigService {
.eq(ConfigInstanceInfo::getInstanceId, params[3])
.orderByDesc(ConfigInstanceInfo::getGmtCreate)
.last("LIMIT 1");
ConfigInstanceInfo instanceInfo = configInstanceMapper.selectOne(instanceQueryWrapper);
if (instanceInfo != null) {
String content = instanceInfo.getContent();
@ -100,7 +98,6 @@ public class ConfigServiceImpl implements ConfigService {
configInstance.setMd5(Md5Util.getTpContentMd5(configInstance));
}
}
ConfigAllInfo configAllInfo = findConfigAllInfo(params[0], params[1], params[2]);
if (configAllInfo == null && configInstance == null) {
throw new ServiceException("Thread pool configuration is not defined.");
@ -115,29 +112,27 @@ public class ConfigServiceImpl implements ConfigService {
resultConfig = configAllInfo;
}
}
return resultConfig;
}
@Override
public void insertOrUpdate(String identify, ConfigAllInfo configInfo) {
public void insertOrUpdate(String identify, boolean isChangeNotice, ConfigAllInfo configInfo) {
verification(identify);
LambdaQueryWrapper<ConfigAllInfo> queryWrapper = Wrappers.lambdaQuery(ConfigAllInfo.class)
.eq(ConfigAllInfo::getTenantId, configInfo.getTenantId())
.eq(ConfigInfoBase::getItemId, configInfo.getItemId())
.eq(ConfigInfoBase::getTpId, configInfo.getTpId());
ConfigAllInfo existConfig = configInfoMapper.selectOne(queryWrapper);
ConfigServiceImpl configService = ApplicationContextHolder.getBean(this.getClass());
configInfo.setCapacity(getQueueCapacityByType(configInfo));
ConditionUtil
.condition(
existConfig == null,
() -> configService.addConfigInfo(configInfo),
() -> configService.updateConfigInfo(identify, configInfo));
ConfigChangePublisher.notifyConfigChange(new LocalDataChangeEvent(identify, ContentUtil.getGroupKey(configInfo)));
() -> configService.updateConfigInfo(identify, isChangeNotice, configInfo));
if (isChangeNotice) {
ConfigChangePublisher.notifyConfigChange(new LocalDataChangeEvent(identify, ContentUtil.getGroupKey(configInfo)));
}
}
private void verification(String identify) {
@ -150,7 +145,6 @@ public class ConfigServiceImpl implements ConfigService {
public Long addConfigInfo(ConfigAllInfo config) {
config.setContent(ContentUtil.getPoolContent(config));
config.setMd5(Md5Util.getTpContentMd5(config));
try {
// 当前为单体应用, 后续支持集群部署时切换分布式锁.
synchronized (ConfigService.class) {
@ -159,7 +153,6 @@ public class ConfigServiceImpl implements ConfigService {
.eq(ConfigAllInfo::getTpId, config.getTpId())
.eq(ConfigAllInfo::getDelFlag, DelEnum.NORMAL.getIntCode()));
Assert.isNull(configAllInfo, "线程池配置已存在.");
if (SqlHelper.retBool(configInfoMapper.insert(config))) {
return config.getId();
}
@ -168,30 +161,36 @@ public class ConfigServiceImpl implements ConfigService {
log.error("[db-error] message :: {}", ex.getMessage(), ex);
throw ex;
}
return null;
}
@LogRecord(bizNo = "{{#config.itemId}}_{{#config.tpId}}", category = "THREAD_POOL_UPDATE", success = "核心线程: {{#config.coreSize}}, 最大线程: {{#config.maxSize}}, 队列类型: {{#config.queueType}}, 队列容量: {{#config.capacity}}, 拒绝策略: {{#config.rejectedType}}", detail = "{{#config.toString()}}")
public void updateConfigInfo(String identify, ConfigAllInfo config) {
public void updateConfigInfo(String identify, boolean isChangeNotice, ConfigAllInfo config) {
LambdaUpdateWrapper<ConfigAllInfo> wrapper = Wrappers.lambdaUpdate(ConfigAllInfo.class)
.eq(ConfigAllInfo::getTpId, config.getTpId())
.eq(ConfigAllInfo::getItemId, config.getItemId())
.eq(ConfigAllInfo::getTenantId, config.getTenantId());
config.setGmtCreate(null);
config.setContent(ContentUtil.getPoolContent(config));
config.setMd5(Md5Util.getTpContentMd5(config));
try {
// 创建线程池配置实例临时配置, 也可以当作历史配置, 不过针对的是单节点
if (StrUtil.isNotBlank(identify)) {
if (StringUtil.isNotBlank(identify)) {
ConfigInstanceInfo instanceInfo = BeanUtil.convert(config, ConfigInstanceInfo.class);
instanceInfo.setInstanceId(identify);
configInstanceMapper.insert(instanceInfo);
return;
} else if (StringUtil.isEmpty(identify) && isChangeNotice) {
List<String> identifyList = ConfigCacheService.getIdentifyList(config.getTenantId(), config.getItemId(), config.getTpId());
if (CollectionUtil.isNotEmpty(identifyList)) {
for (String each : identifyList) {
ConfigInstanceInfo instanceInfo = BeanUtil.convert(config, ConfigInstanceInfo.class);
instanceInfo.setInstanceId(each);
configInstanceMapper.insert(instanceInfo);
}
}
return;
}
configInfoMapper.update(config, wrapper);
} catch (Exception ex) {
log.error("[db-error] message :: {}", ex.getMessage(), ex);
@ -217,14 +216,11 @@ public class ConfigServiceImpl implements ConfigService {
queueCapacity = config.getCapacity();
break;
}
List<Integer> queueTypes = Stream.of(1, 2, 3, 6, 9).collect(Collectors.toList());
boolean setDefaultFlag = queueTypes.contains(config.getQueueType()) && (config.getCapacity() == null || Objects.equals(config.getCapacity(), 0));
if (setDefaultFlag) {
queueCapacity = 1024;
}
return queueCapacity;
}
}

@ -80,7 +80,7 @@ public class ThreadPoolServiceImpl implements ThreadPoolService {
@Override
public void saveOrUpdateThreadPoolConfig(String identify, ThreadPoolSaveOrUpdateReqDTO reqDTO) {
configService.insertOrUpdate(identify, BeanUtil.convert(reqDTO, ConfigAllInfo.class));
configService.insertOrUpdate(identify, false, BeanUtil.convert(reqDTO, ConfigAllInfo.class));
}
@LogRecord(bizNo = "{{#reqDTO.itemId}}_{{#reqDTO.tpId}}", category = "THREAD_POOL_DELETE", success = "删除线程池: {{#reqDTO.tpId}}", detail = "{{#reqDTO.toString()}}")
@ -98,7 +98,6 @@ public class ThreadPoolServiceImpl implements ThreadPoolService {
ConfigAllInfo configAllInfo = configInfoMapper.selectById(id);
configAllInfo.setIsAlarm(isAlarm);
// TODO: 是否报警变更, 虽然通知了客户端, 但是并没有在客户端实时生效, 需要考虑一个好的场景思路
configService.insertOrUpdate(null, configAllInfo);
configService.insertOrUpdate(null, false, configAllInfo);
}
}

Loading…
Cancel
Save