diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/controller/ConfigController.java b/hippo4j-config/src/main/java/cn/hippo4j/config/controller/ConfigController.java index 0aaf9a84..05ebc159 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/controller/ConfigController.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/controller/ConfigController.java @@ -54,10 +54,10 @@ public class ConfigController { @GetMapping public Result detailConfigInfo( - @RequestParam("tpId") String tpId, - @RequestParam("itemId") String itemId, - @RequestParam("namespace") String namespace, - @RequestParam(value = "instanceId", required = false) String instanceId) { + @RequestParam("tpId") String tpId, + @RequestParam("itemId") String itemId, + @RequestParam("namespace") String namespace, + @RequestParam(value = "instanceId", required = false) String instanceId) { ConfigAllInfo configAllInfo = configService.findConfigRecentInfo(tpId, itemId, namespace, instanceId); return Results.success(configAllInfo); } @@ -65,7 +65,7 @@ public class ConfigController { @PostMapping public Result publishConfig(@RequestParam(value = "identify", required = false) String identify, @RequestBody ConfigAllInfo config) { - configService.insertOrUpdate(identify, config); + configService.insertOrUpdate(identify, true, config); return Results.success(true); } diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/service/ConfigCacheService.java b/hippo4j-config/src/main/java/cn/hippo4j/config/service/ConfigCacheService.java index 1dabf5f7..c1b93c65 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/service/ConfigCacheService.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/service/ConfigCacheService.java @@ -22,6 +22,7 @@ import cn.hippo4j.common.constant.Constants; import cn.hippo4j.common.design.observer.AbstractSubjectCenter; import cn.hippo4j.common.design.observer.Observer; import cn.hippo4j.common.design.observer.ObserverMessage; +import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.toolkit.Md5Util; import cn.hippo4j.config.event.LocalDataChangeEvent; @@ -32,17 +33,19 @@ import cn.hippo4j.config.service.biz.ConfigService; import cn.hippo4j.config.toolkit.MapUtil; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.springframework.util.StringUtils; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER; +import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER_TRANSLATION; + /** * Config cache service. */ @@ -84,7 +87,7 @@ public class ConfigCacheService { if (CONFIG_SERVICE == null) { CONFIG_SERVICE = ApplicationContextHolder.getBean(ConfigService.class); } - String[] params = groupKey.split("\\+"); + String[] params = groupKey.split(GROUP_KEY_DELIMITER_TRANSLATION); ConfigAllInfo config = CONFIG_SERVICE.findConfigRecentInfo(params); if (config != null && StrUtil.isNotBlank(config.getTpId())) { cacheItem = new CacheItem(groupKey, config); @@ -98,7 +101,7 @@ public class ConfigCacheService { if (CONFIG_SERVICE == null) { CONFIG_SERVICE = ApplicationContextHolder.getBean(ConfigService.class); } - String[] params = groupKey.split("\\+"); + String[] params = groupKey.split(GROUP_KEY_DELIMITER_TRANSLATION); ConfigAllInfo config = CONFIG_SERVICE.findConfigRecentInfo(params); if (config == null || StringUtils.isEmpty(config.getTpId())) { String errorMessage = String.format("config is null. tpId :: %s, itemId :: %s, tenantId :: %s", params[0], params[1], params[2]); @@ -111,7 +114,7 @@ public class ConfigCacheService { CacheItem cache = makeSure(groupKey, identify); if (cache.md5 == null || !cache.md5.equals(md5)) { cache.md5 = md5; - String[] params = groupKey.split("\\+"); + String[] params = groupKey.split(GROUP_KEY_DELIMITER_TRANSLATION); ConfigAllInfo config = CONFIG_SERVICE.findConfigRecentInfo(params); cache.configAllInfo = config; cache.lastModifiedTs = System.currentTimeMillis(); @@ -145,6 +148,22 @@ public class ConfigCacheService { return total.get(); } + public static List getIdentifyList(String tenantId, String itemId, String threadPoolId) { + List identifyList = null; + String buildKey = Joiner.on(GROUP_KEY_DELIMITER).join(Lists.newArrayList(threadPoolId, itemId, tenantId)); + List keys = MapUtil.parseMapForFilter(CLIENT_CONFIG_CACHE, buildKey); + if (CollectionUtil.isNotEmpty(keys)) { + identifyList = new ArrayList(keys.size()); + for (String each : keys) { + String[] keyArray = each.split(GROUP_KEY_DELIMITER_TRANSLATION); + if (keyArray != null && keyArray.length > 2) { + identifyList.add(keyArray[3]); + } + } + } + return identifyList; + } + /** * Remove config cache. * diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/ConfigService.java b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/ConfigService.java index 1d41253c..267df744 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/ConfigService.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/ConfigService.java @@ -49,8 +49,8 @@ public interface ConfigService { * Insert or update. * * @param identify + * @param isChangeNotice * @param configAllInfo */ - void insertOrUpdate(String identify, ConfigAllInfo configAllInfo); - + void insertOrUpdate(String identify, boolean isChangeNotice, ConfigAllInfo configAllInfo); } diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ConfigServiceImpl.java b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ConfigServiceImpl.java index e0247925..eaef6b99 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ConfigServiceImpl.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ConfigServiceImpl.java @@ -27,6 +27,7 @@ import cn.hippo4j.config.mapper.ConfigInstanceMapper; import cn.hippo4j.config.model.ConfigAllInfo; import cn.hippo4j.config.model.ConfigInfoBase; import cn.hippo4j.config.model.ConfigInstanceInfo; +import cn.hippo4j.config.service.ConfigCacheService; import cn.hippo4j.config.service.ConfigChangePublisher; import cn.hippo4j.config.service.biz.ConfigService; import cn.hippo4j.config.toolkit.BeanUtil; @@ -71,7 +72,6 @@ public class ConfigServiceImpl implements ConfigService { .eq(StrUtil.isNotBlank(tpId), ConfigAllInfo::getTpId, tpId) .eq(StrUtil.isNotBlank(itemId), ConfigAllInfo::getItemId, itemId) .eq(StrUtil.isNotBlank(tenantId), ConfigAllInfo::getTenantId, tenantId); - ConfigAllInfo configAllInfo = configInfoMapper.selectOne(wrapper); return configAllInfo; } @@ -80,7 +80,6 @@ public class ConfigServiceImpl implements ConfigService { public ConfigAllInfo findConfigRecentInfo(String... params) { ConfigAllInfo resultConfig; ConfigAllInfo configInstance = null; - String instanceId = params[3]; if (StrUtil.isNotBlank(instanceId)) { LambdaQueryWrapper instanceQueryWrapper = Wrappers.lambdaQuery(ConfigInstanceInfo.class) @@ -90,7 +89,6 @@ public class ConfigServiceImpl implements ConfigService { .eq(ConfigInstanceInfo::getInstanceId, params[3]) .orderByDesc(ConfigInstanceInfo::getGmtCreate) .last("LIMIT 1"); - ConfigInstanceInfo instanceInfo = configInstanceMapper.selectOne(instanceQueryWrapper); if (instanceInfo != null) { String content = instanceInfo.getContent(); @@ -100,7 +98,6 @@ public class ConfigServiceImpl implements ConfigService { configInstance.setMd5(Md5Util.getTpContentMd5(configInstance)); } } - ConfigAllInfo configAllInfo = findConfigAllInfo(params[0], params[1], params[2]); if (configAllInfo == null && configInstance == null) { throw new ServiceException("Thread pool configuration is not defined."); @@ -115,29 +112,27 @@ public class ConfigServiceImpl implements ConfigService { resultConfig = configAllInfo; } } - return resultConfig; } @Override - public void insertOrUpdate(String identify, ConfigAllInfo configInfo) { + public void insertOrUpdate(String identify, boolean isChangeNotice, ConfigAllInfo configInfo) { verification(identify); LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(ConfigAllInfo.class) .eq(ConfigAllInfo::getTenantId, configInfo.getTenantId()) .eq(ConfigInfoBase::getItemId, configInfo.getItemId()) .eq(ConfigInfoBase::getTpId, configInfo.getTpId()); ConfigAllInfo existConfig = configInfoMapper.selectOne(queryWrapper); - ConfigServiceImpl configService = ApplicationContextHolder.getBean(this.getClass()); configInfo.setCapacity(getQueueCapacityByType(configInfo)); - ConditionUtil .condition( existConfig == null, () -> configService.addConfigInfo(configInfo), - () -> configService.updateConfigInfo(identify, configInfo)); - - ConfigChangePublisher.notifyConfigChange(new LocalDataChangeEvent(identify, ContentUtil.getGroupKey(configInfo))); + () -> configService.updateConfigInfo(identify, isChangeNotice, configInfo)); + if (isChangeNotice) { + ConfigChangePublisher.notifyConfigChange(new LocalDataChangeEvent(identify, ContentUtil.getGroupKey(configInfo))); + } } private void verification(String identify) { @@ -150,7 +145,6 @@ public class ConfigServiceImpl implements ConfigService { public Long addConfigInfo(ConfigAllInfo config) { config.setContent(ContentUtil.getPoolContent(config)); config.setMd5(Md5Util.getTpContentMd5(config)); - try { // 当前为单体应用, 后续支持集群部署时切换分布式锁. synchronized (ConfigService.class) { @@ -159,7 +153,6 @@ public class ConfigServiceImpl implements ConfigService { .eq(ConfigAllInfo::getTpId, config.getTpId()) .eq(ConfigAllInfo::getDelFlag, DelEnum.NORMAL.getIntCode())); Assert.isNull(configAllInfo, "线程池配置已存在."); - if (SqlHelper.retBool(configInfoMapper.insert(config))) { return config.getId(); } @@ -168,30 +161,36 @@ public class ConfigServiceImpl implements ConfigService { log.error("[db-error] message :: {}", ex.getMessage(), ex); throw ex; } - return null; } @LogRecord(bizNo = "{{#config.itemId}}_{{#config.tpId}}", category = "THREAD_POOL_UPDATE", success = "核心线程: {{#config.coreSize}}, 最大线程: {{#config.maxSize}}, 队列类型: {{#config.queueType}}, 队列容量: {{#config.capacity}}, 拒绝策略: {{#config.rejectedType}}", detail = "{{#config.toString()}}") - public void updateConfigInfo(String identify, ConfigAllInfo config) { + public void updateConfigInfo(String identify, boolean isChangeNotice, ConfigAllInfo config) { LambdaUpdateWrapper wrapper = Wrappers.lambdaUpdate(ConfigAllInfo.class) .eq(ConfigAllInfo::getTpId, config.getTpId()) .eq(ConfigAllInfo::getItemId, config.getItemId()) .eq(ConfigAllInfo::getTenantId, config.getTenantId()); - config.setGmtCreate(null); config.setContent(ContentUtil.getPoolContent(config)); config.setMd5(Md5Util.getTpContentMd5(config)); - try { // 创建线程池配置实例临时配置, 也可以当作历史配置, 不过针对的是单节点 - if (StrUtil.isNotBlank(identify)) { + if (StringUtil.isNotBlank(identify)) { ConfigInstanceInfo instanceInfo = BeanUtil.convert(config, ConfigInstanceInfo.class); instanceInfo.setInstanceId(identify); configInstanceMapper.insert(instanceInfo); return; + } else if (StringUtil.isEmpty(identify) && isChangeNotice) { + List identifyList = ConfigCacheService.getIdentifyList(config.getTenantId(), config.getItemId(), config.getTpId()); + if (CollectionUtil.isNotEmpty(identifyList)) { + for (String each : identifyList) { + ConfigInstanceInfo instanceInfo = BeanUtil.convert(config, ConfigInstanceInfo.class); + instanceInfo.setInstanceId(each); + configInstanceMapper.insert(instanceInfo); + } + } + return; } - configInfoMapper.update(config, wrapper); } catch (Exception ex) { log.error("[db-error] message :: {}", ex.getMessage(), ex); @@ -217,14 +216,11 @@ public class ConfigServiceImpl implements ConfigService { queueCapacity = config.getCapacity(); break; } - List queueTypes = Stream.of(1, 2, 3, 6, 9).collect(Collectors.toList()); boolean setDefaultFlag = queueTypes.contains(config.getQueueType()) && (config.getCapacity() == null || Objects.equals(config.getCapacity(), 0)); if (setDefaultFlag) { queueCapacity = 1024; } - return queueCapacity; } - } diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ThreadPoolServiceImpl.java b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ThreadPoolServiceImpl.java index 456faf28..ebecdf8d 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ThreadPoolServiceImpl.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/service/biz/impl/ThreadPoolServiceImpl.java @@ -80,7 +80,7 @@ public class ThreadPoolServiceImpl implements ThreadPoolService { @Override public void saveOrUpdateThreadPoolConfig(String identify, ThreadPoolSaveOrUpdateReqDTO reqDTO) { - configService.insertOrUpdate(identify, BeanUtil.convert(reqDTO, ConfigAllInfo.class)); + configService.insertOrUpdate(identify, false, BeanUtil.convert(reqDTO, ConfigAllInfo.class)); } @LogRecord(bizNo = "{{#reqDTO.itemId}}_{{#reqDTO.tpId}}", category = "THREAD_POOL_DELETE", success = "删除线程池: {{#reqDTO.tpId}}", detail = "{{#reqDTO.toString()}}") @@ -98,7 +98,6 @@ public class ThreadPoolServiceImpl implements ThreadPoolService { ConfigAllInfo configAllInfo = configInfoMapper.selectById(id); configAllInfo.setIsAlarm(isAlarm); // TODO: 是否报警变更, 虽然通知了客户端, 但是并没有在客户端实时生效, 需要考虑一个好的场景思路 - configService.insertOrUpdate(null, configAllInfo); + configService.insertOrUpdate(null, false, configAllInfo); } - }