增强客户端集群部署, 线程池配置差异化.

pull/28/head
chen.ma 3 years ago
parent 435d424dd3
commit b027775329

@ -39,9 +39,9 @@ public class ConfigController {
public Result<ConfigInfoBase> detailConfigInfo( public Result<ConfigInfoBase> detailConfigInfo(
@RequestParam("tpId") String tpId, @RequestParam("tpId") String tpId,
@RequestParam("itemId") String itemId, @RequestParam("itemId") String itemId,
@RequestParam(value = "namespace") String namespace) { @RequestParam("namespace") String namespace,
@RequestParam(value = "instanceId", required = false) String instanceId) {
ConfigAllInfo configAllInfo = configService.findConfigAllInfo(tpId, itemId, namespace); ConfigAllInfo configAllInfo = configService.findConfigRecentInfo(tpId, itemId, namespace, instanceId);
return Results.success(configAllInfo); return Results.success(configAllInfo);
} }

@ -0,0 +1,15 @@
package cn.hippo4j.config.mapper;
import cn.hippo4j.config.model.ConfigInstanceInfo;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
/**
* Config instance mapper.
*
* @author chen.ma
* @date 2021/12/5 19:18
*/
@Mapper
public interface ConfigInstanceMapper extends BaseMapper<ConfigInstanceInfo> {
}

@ -0,0 +1,60 @@
package cn.hippo4j.config.model;
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.util.Date;
/**
* Config instance info.
*
* @author chen.ma
* @date 2021/12/5 19:19
*/
@Data
@TableName("config_instance")
public class ConfigInstanceInfo {
/**
* ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* tenantId
*/
private String tenantId;
/**
* itemId
*/
private String itemId;
/**
* tpId
*/
private String tpId;
/**
* instanceId
*/
private String instanceId;
/**
* MD5
*/
private String md5;
/**
* content
*/
private String content;
/**
* gmtCreate
*/
@TableField(fill = FieldFill.INSERT)
private Date gmtCreate;
}

@ -65,14 +65,14 @@ public class ConfigCacheService {
if (configService == null) { if (configService == null) {
configService = ApplicationContextHolder.getBean(ConfigService.class); configService = ApplicationContextHolder.getBean(ConfigService.class);
} }
String[] split = groupKey.split("\\+"); String[] params = groupKey.split("\\+");
ConfigAllInfo config = configService.findConfigRecentInfo(params);
ConfigAllInfo config = configService.findConfigAllInfo(split[0], split[1], split[2]);
if (config != null && !StringUtils.isEmpty(config.getTpId())) { if (config != null && !StringUtils.isEmpty(config.getTpId())) {
cacheItem = new CacheItem(groupKey, config); cacheItem = new CacheItem(groupKey, config);
cacheItemMap.put(ip, cacheItem); cacheItemMap.put(ip, cacheItem);
CACHE.put(groupKey, cacheItemMap); CACHE.put(groupKey, cacheItemMap);
} }
return (cacheItem != null) ? cacheItem.md5 : Constants.NULL; return (cacheItem != null) ? cacheItem.md5 : Constants.NULL;
} }
@ -81,10 +81,10 @@ public class ConfigCacheService {
configService = ApplicationContextHolder.getBean(ConfigService.class); configService = ApplicationContextHolder.getBean(ConfigService.class);
} }
String[] split = groupKey.split("\\+"); String[] params = groupKey.split("\\+");
ConfigAllInfo config = configService.findConfigAllInfo(split[0], split[1], split[2]); ConfigAllInfo config = configService.findConfigRecentInfo(params);
if (config == null || StringUtils.isEmpty(config.getTpId())) { if (config == null || StringUtils.isEmpty(config.getTpId())) {
String errorMessage = String.format("config is null. tpId :: %s, itemId :: %s, tenantId :: %s", split[0], split[1], split[2]); String errorMessage = String.format("config is null. tpId :: %s, itemId :: %s, tenantId :: %s", params[0], params[1], params[2]);
throw new RuntimeException(errorMessage); throw new RuntimeException(errorMessage);
} }
@ -95,8 +95,8 @@ public class ConfigCacheService {
CacheItem cache = makeSure(groupKey, ip); CacheItem cache = makeSure(groupKey, ip);
if (cache.md5 == null || !cache.md5.equals(md5)) { if (cache.md5 == null || !cache.md5.equals(md5)) {
cache.md5 = md5; cache.md5 = md5;
String[] split = groupKey.split("\\+"); String[] params = groupKey.split("\\+");
ConfigAllInfo config = configService.findConfigAllInfo(split[0], split[1], split[2]); ConfigAllInfo config = configService.findConfigRecentInfo(params);
cache.configAllInfo = config; cache.configAllInfo = config;
cache.lastModifiedTs = System.currentTimeMillis(); cache.lastModifiedTs = System.currentTimeMillis();
NotifyCenter.publishEvent(new LocalDataChangeEvent(ip, groupKey)); NotifyCenter.publishEvent(new LocalDataChangeEvent(ip, groupKey));

@ -114,7 +114,7 @@ public class LongPollingService {
parseMapForFilter.forEach(each -> { parseMapForFilter.forEach(each -> {
if (clientSub.clientMd5Map.containsKey(each)) { if (clientSub.clientMd5Map.containsKey(each)) {
getRetainIps().put(clientSub.ip, System.currentTimeMillis()); getRetainIps().put(clientSub.ip, System.currentTimeMillis());
ConfigCacheService.updateMd5(each, clientSub.ip, ConfigCacheService.getContentMd5(groupKey)); ConfigCacheService.updateMd5(each, clientSub.ip, ConfigCacheService.getContentMd5(identity));
iter.remove(); iter.remove();
clientSub.sendResponse(Arrays.asList(groupKey)); clientSub.sendResponse(Arrays.asList(groupKey));
} }

@ -20,6 +20,14 @@ public interface ConfigService {
*/ */
ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String tenantId); ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String tenantId);
/**
* Find config recent info.
*
* @param params
* @return
*/
ConfigAllInfo findConfigRecentInfo(String... params);
/** /**
* Insert or update. * Insert or update.
* *

@ -6,11 +6,16 @@ import cn.hippo4j.common.toolkit.ContentUtil;
import cn.hippo4j.common.toolkit.Md5Util; import cn.hippo4j.common.toolkit.Md5Util;
import cn.hippo4j.config.event.LocalDataChangeEvent; import cn.hippo4j.config.event.LocalDataChangeEvent;
import cn.hippo4j.config.mapper.ConfigInfoMapper; import cn.hippo4j.config.mapper.ConfigInfoMapper;
import cn.hippo4j.config.mapper.ConfigInstanceMapper;
import cn.hippo4j.config.model.ConfigAllInfo; import cn.hippo4j.config.model.ConfigAllInfo;
import cn.hippo4j.config.model.ConfigInfoBase; import cn.hippo4j.config.model.ConfigInfoBase;
import cn.hippo4j.config.model.ConfigInstanceInfo;
import cn.hippo4j.config.service.ConfigChangePublisher; import cn.hippo4j.config.service.ConfigChangePublisher;
import cn.hippo4j.config.service.biz.ConfigService; import cn.hippo4j.config.service.biz.ConfigService;
import cn.hippo4j.config.toolkit.BeanUtil;
import cn.hippo4j.tools.logrecord.annotation.LogRecord; import cn.hippo4j.tools.logrecord.annotation.LogRecord;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.StringUtils; import com.baomidou.mybatisplus.core.toolkit.StringUtils;
@ -40,6 +45,8 @@ public class ConfigServiceImpl implements ConfigService {
private final ConfigInfoMapper configInfoMapper; private final ConfigInfoMapper configInfoMapper;
private final ConfigInstanceMapper configInstanceMapper;
@Override @Override
public ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String tenantId) { public ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String tenantId) {
LambdaQueryWrapper<ConfigAllInfo> wrapper = Wrappers.lambdaQuery(ConfigAllInfo.class) LambdaQueryWrapper<ConfigAllInfo> wrapper = Wrappers.lambdaQuery(ConfigAllInfo.class)
@ -51,6 +58,40 @@ public class ConfigServiceImpl implements ConfigService {
return configAllInfo; return configAllInfo;
} }
@Override
public ConfigAllInfo findConfigRecentInfo(String... params) {
ConfigAllInfo resultConfig;
ConfigAllInfo configInstance = null;
LambdaQueryWrapper<ConfigInstanceInfo> instanceQueryWrapper = Wrappers.lambdaQuery(ConfigInstanceInfo.class)
.eq(ConfigInstanceInfo::getInstanceId, params[3])
.orderByDesc(ConfigInstanceInfo::getGmtCreate)
.last("LIMIT 1");
ConfigInstanceInfo instanceInfo = configInstanceMapper.selectOne(instanceQueryWrapper);
if (instanceInfo != null) {
String content = instanceInfo.getContent();
configInstance = JSON.parseObject(content, ConfigAllInfo.class);
configInstance.setContent(content);
configInstance.setGmtCreate(instanceInfo.getGmtCreate());
configInstance.setMd5(Md5Util.getTpContentMd5(configInstance));
}
ConfigAllInfo configAllInfo = findConfigAllInfo(params[0], params[1], params[2]);
if (configAllInfo != null && configInstance == null) {
resultConfig = configAllInfo;
} else if (configAllInfo == null && configInstance != null) {
resultConfig = configInstance;
} else {
if (configAllInfo.getGmtModified().before(configInstance.getGmtCreate())) {
resultConfig = configInstance;
} else {
resultConfig = configAllInfo;
}
}
return resultConfig;
}
@Override @Override
public void insertOrUpdate(String identify, ConfigAllInfo configInfo) { public void insertOrUpdate(String identify, ConfigAllInfo configInfo) {
LambdaQueryWrapper<ConfigAllInfo> queryWrapper = Wrappers.lambdaQuery(ConfigAllInfo.class) LambdaQueryWrapper<ConfigAllInfo> queryWrapper = Wrappers.lambdaQuery(ConfigAllInfo.class)
@ -67,10 +108,10 @@ public class ConfigServiceImpl implements ConfigService {
.condition( .condition(
existConfig == null, existConfig == null,
() -> configService.addConfigInfo(configInfo), () -> configService.addConfigInfo(configInfo),
() -> configService.updateConfigInfo(configInfo) () -> configService.updateConfigInfo(identify, configInfo)
); );
} catch (Exception ex) { } catch (Exception ex) {
updateConfigInfo(configInfo); updateConfigInfo(identify, configInfo);
} }
ConfigChangePublisher.notifyConfigChange(new LocalDataChangeEvent(identify, ContentUtil.getGroupKey(configInfo))); ConfigChangePublisher.notifyConfigChange(new LocalDataChangeEvent(identify, ContentUtil.getGroupKey(configInfo)));
@ -98,7 +139,7 @@ public class ConfigServiceImpl implements ConfigService {
success = "核心线程: {{#config.coreSize}}, 最大线程: {{#config.maxSize}}, 队列类型: {{#config.queueType}}, 队列容量: {{#config.capacity}}, 拒绝策略: {{#config.rejectedType}}", success = "核心线程: {{#config.coreSize}}, 最大线程: {{#config.maxSize}}, 队列类型: {{#config.queueType}}, 队列容量: {{#config.capacity}}, 拒绝策略: {{#config.rejectedType}}",
detail = "{{#config.toString()}}" detail = "{{#config.toString()}}"
) )
public void updateConfigInfo(ConfigAllInfo config) { public void updateConfigInfo(String identify, ConfigAllInfo config) {
LambdaUpdateWrapper<ConfigAllInfo> wrapper = Wrappers.lambdaUpdate(ConfigAllInfo.class) LambdaUpdateWrapper<ConfigAllInfo> wrapper = Wrappers.lambdaUpdate(ConfigAllInfo.class)
.eq(ConfigAllInfo::getTpId, config.getTpId()) .eq(ConfigAllInfo::getTpId, config.getTpId())
.eq(ConfigAllInfo::getItemId, config.getItemId()) .eq(ConfigAllInfo::getItemId, config.getItemId())
@ -109,6 +150,14 @@ public class ConfigServiceImpl implements ConfigService {
config.setMd5(Md5Util.getTpContentMd5(config)); config.setMd5(Md5Util.getTpContentMd5(config));
try { try {
// 创建线程池配置实例临时配置, 也可以当作历史配置, 不过针对的是单节点
if (StrUtil.isNotBlank(identify)) {
ConfigInstanceInfo instanceInfo = BeanUtil.convert(config, ConfigInstanceInfo.class);
instanceInfo.setInstanceId(identify);
configInstanceMapper.insert(instanceInfo);
return;
}
configInfoMapper.update(config, wrapper); configInfoMapper.update(config, wrapper);
} catch (Exception ex) { } catch (Exception ex) {
log.error("[db-error] message :: {}", ex.getMessage(), ex); log.error("[db-error] message :: {}", ex.getMessage(), ex);

Loading…
Cancel
Save