Feature: 增强线程池配置修改. 客户端集群部署时, 可修改全部线程池配置也可选择单一实例.

pull/10/head
chen.ma 3 years ago
parent 04e1d3c7b3
commit fb729921fc

@ -1,5 +1,9 @@
package com.github.dynamic.threadpool.common.constant;
import org.springframework.expression.spel.ast.Identifier;
import java.util.UUID;
/**
* Constants.
*
@ -42,6 +46,10 @@ public class Constants {
public static final String LONG_PULLING_TIMEOUT_NO_HANGUP = "Long-Pulling-Timeout-No-Hangup";
public static final String LONG_PULLING_CLIENT_IDENTIFICATION = "Long-Pulling-Client-Identification";
public static final String CLIENT_IDENTIFICATION_VALUE = UUID.randomUUID().toString();
public static final String LISTENING_CONFIGS = "Listening-Configs";
public static final String GROUP_KEY_DELIMITER = "+";

@ -34,6 +34,8 @@ public class InstanceInfo {
private String callBackUrl;
private String identify;
private volatile String vipAddress;
private volatile String secureVipAddress;

@ -40,12 +40,14 @@ public class ConfigController {
@RequestParam("itemId") String itemId,
@RequestParam(value = "namespace") String namespace) {
return Results.success(configService.findConfigAllInfo(tpId, itemId, namespace));
ConfigAllInfo configAllInfo = configService.findConfigAllInfo(tpId, itemId, namespace);
return Results.success(configAllInfo);
}
@PostMapping
public Result<Boolean> publishConfig(@RequestBody ConfigAllInfo config) {
configService.insertOrUpdate(config);
public Result<Boolean> publishConfig(@RequestParam(value = "identify", required = false) String identify,
@RequestBody ConfigAllInfo config) {
configService.insertOrUpdate(identify, config);
return Results.success(true);
}

@ -8,9 +8,18 @@ package com.github.dynamic.threadpool.config.event;
*/
public class LocalDataChangeEvent extends Event {
/**
* ++线
*/
public final String groupKey;
public LocalDataChangeEvent(String groupKey) {
/**
*
*/
public final String identify;
public LocalDataChangeEvent(String identify, String groupKey) {
this.identify = identify;
this.groupKey = groupKey;
}

@ -1,5 +1,6 @@
package com.github.dynamic.threadpool.config.model;
import com.github.dynamic.threadpool.common.toolkit.Md5Util;
import com.github.dynamic.threadpool.config.toolkit.SimpleReadWriteLock;
import com.github.dynamic.threadpool.config.toolkit.SingletonRepository;
import com.github.dynamic.threadpool.common.constant.Constants;
@ -22,6 +23,8 @@ public class CacheItem {
public volatile long lastModifiedTs;
public volatile ConfigAllInfo configAllInfo;
public SimpleReadWriteLock rwLock = new SimpleReadWriteLock();
public CacheItem(String groupKey) {
@ -33,4 +36,10 @@ public class CacheItem {
this.groupKey = SingletonRepository.DataIdGroupIdCache.getSingleton(groupKey);
}
public CacheItem(String groupKey, ConfigAllInfo configAllInfo) {
this.configAllInfo = configAllInfo;
this.md5 = Md5Util.getTpContentMd5(configAllInfo);
this.groupKey = SingletonRepository.DataIdGroupIdCache.getSingleton(groupKey);
}
}

@ -1,5 +1,6 @@
package com.github.dynamic.threadpool.config.service;
import cn.hutool.core.collection.CollUtil;
import com.github.dynamic.threadpool.config.service.biz.ConfigService;
import com.github.dynamic.threadpool.common.config.ApplicationContextHolder;
import com.github.dynamic.threadpool.common.constant.Constants;
@ -8,9 +9,12 @@ import com.github.dynamic.threadpool.config.event.LocalDataChangeEvent;
import com.github.dynamic.threadpool.config.model.CacheItem;
import com.github.dynamic.threadpool.config.model.ConfigAllInfo;
import com.github.dynamic.threadpool.config.notify.NotifyCenter;
import com.google.common.collect.Maps;
import org.springframework.util.StringUtils;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
@ -23,7 +27,7 @@ public class ConfigCacheService {
static ConfigService configService = null;
private static final ConcurrentHashMap<String, CacheItem> CACHE = new ConcurrentHashMap();
private static final ConcurrentHashMap<String, Map<String, CacheItem>> CACHE = new ConcurrentHashMap();
public static boolean isUpdateData(String groupKey, String md5, String ip) {
String contentMd5 = ConfigCacheService.getContentMd5IsNullPut(groupKey, ip);
@ -32,16 +36,16 @@ public class ConfigCacheService {
/**
* Get Md5.
* TODOAdd IP, different IP thread pool rewrite
* TODOgroupKey && Md5 Cache
*
* @param groupKey
* @param ip
* @return
*/
private static String getContentMd5IsNullPut(String groupKey, String ip) {
CacheItem cacheItem = CACHE.get(groupKey);
if (cacheItem != null) {
Map<String, CacheItem> cacheItemMap = Optional.ofNullable(CACHE.get(groupKey)).orElse(Maps.newHashMap());
CacheItem cacheItem = null;
if (CollUtil.isNotEmpty(cacheItemMap) && (cacheItem = cacheItemMap.get(ip)) != null) {
return cacheItem.md5;
}
@ -52,9 +56,9 @@ public class ConfigCacheService {
ConfigAllInfo config = configService.findConfigAllInfo(split[0], split[1], split[2]);
if (config != null && !StringUtils.isEmpty(config.getTpId())) {
String md5 = Md5Util.getTpContentMd5(config);
cacheItem = new CacheItem(groupKey, md5);
CACHE.put(groupKey, cacheItem);
cacheItem = new CacheItem(groupKey, config);
cacheItemMap.put(ip, cacheItem);
CACHE.put(groupKey, cacheItemMap);
}
return (cacheItem != null) ? cacheItem.md5 : Constants.NULL;
}
@ -74,23 +78,28 @@ public class ConfigCacheService {
return Md5Util.getTpContentMd5(config);
}
public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
CacheItem cache = makeSure(groupKey);
public static void updateMd5(String groupKey, String ip, String md5) {
CacheItem cache = makeSure(groupKey, ip);
if (cache.md5 == null || !cache.md5.equals(md5)) {
cache.md5 = md5;
cache.lastModifiedTs = lastModifiedTs;
NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
cache.lastModifiedTs = System.currentTimeMillis();
NotifyCenter.publishEvent(new LocalDataChangeEvent(ip, groupKey));
}
}
static CacheItem makeSure(final String groupKey) {
CacheItem item = CACHE.get(groupKey);
public static CacheItem makeSure(String groupKey, String ip) {
Map<String, CacheItem> ipCacheItemMap = CACHE.get(groupKey);
CacheItem item = ipCacheItemMap.get(ip);
if (null != item) {
return item;
}
CacheItem tmp = new CacheItem(groupKey);
item = CACHE.putIfAbsent(groupKey, tmp);
return (null == item) ? tmp : item;
Map<String, CacheItem> cacheItemMap = Maps.newHashMap();
cacheItemMap.put(ip, tmp);
CACHE.putIfAbsent(groupKey, cacheItemMap);
return tmp;
}
}

@ -1,8 +1,10 @@
package com.github.dynamic.threadpool.config.service;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.github.dynamic.threadpool.config.notify.listener.Subscriber;
import com.github.dynamic.threadpool.config.toolkit.ConfigExecutor;
import com.github.dynamic.threadpool.config.toolkit.MapUtil;
import com.github.dynamic.threadpool.config.toolkit.Md5ConfigUtil;
import com.github.dynamic.threadpool.config.toolkit.RequestUtil;
import com.github.dynamic.threadpool.common.toolkit.Md5Util;
@ -10,6 +12,7 @@ import com.github.dynamic.threadpool.common.web.base.Results;
import com.github.dynamic.threadpool.config.event.Event;
import com.github.dynamic.threadpool.config.event.LocalDataChangeEvent;
import com.github.dynamic.threadpool.config.notify.NotifyCenter;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
@ -23,6 +26,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static com.github.dynamic.threadpool.common.constant.Constants.GROUP_KEY_DELIMITER;
/**
* Long polling service.
*
@ -61,7 +66,7 @@ public class LongPollingService {
} else {
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey));
ConfigExecutor.executeLongPolling(new DataChangeTask(evt.identify, evt.groupKey));
}
}
}
@ -85,9 +90,12 @@ public class LongPollingService {
class DataChangeTask implements Runnable {
final String identify;
final String groupKey;
DataChangeTask(String groupKey) {
DataChangeTask(String identify, String groupKey) {
this.identify = identify;
this.groupKey = groupKey;
}
@ -97,12 +105,20 @@ public class LongPollingService {
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
if (clientSub.clientMd5Map.containsKey(groupKey)) {
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
ConfigCacheService.updateMd5(groupKey, ConfigCacheService.getContentMd5(groupKey), System.currentTimeMillis());
iter.remove();
clientSub.sendResponse(Arrays.asList(groupKey));
String identity = groupKey + GROUP_KEY_DELIMITER + identify;
List<String> parseMapForFilter = Lists.newArrayList(identity);
if (StrUtil.isBlank(identify)) {
parseMapForFilter = MapUtil.parseMapForFilter(clientSub.clientMd5Map, groupKey);
}
parseMapForFilter.forEach(each -> {
if (clientSub.clientMd5Map.containsKey(each)) {
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
ConfigCacheService.updateMd5(each, clientSub.ip, ConfigCacheService.getContentMd5(groupKey));
iter.remove();
clientSub.sendResponse(Arrays.asList(groupKey));
}
});
}
} catch (Exception ex) {
log.error("Data change error :: {}", ex.getMessage(), ex);

@ -23,8 +23,9 @@ public interface ConfigService {
/**
* Insert or update.
*
* @param identify
* @param configAllInfo
*/
void insertOrUpdate(ConfigAllInfo configAllInfo);
void insertOrUpdate(String identify, ConfigAllInfo configAllInfo);
}

@ -42,8 +42,9 @@ public interface ThreadPoolService {
/**
* Save or update thread pool config.
*
* @param identify
* @param reqDTO
*/
void saveOrUpdateThreadPoolConfig(ThreadPoolSaveOrUpdateReqDTO reqDTO);
void saveOrUpdateThreadPoolConfig(String identify, ThreadPoolSaveOrUpdateReqDTO reqDTO);
}

@ -40,14 +40,14 @@ public class ConfigServiceImpl implements ConfigService {
}
@Override
public void insertOrUpdate(ConfigAllInfo configAllInfo) {
public void insertOrUpdate(String identify, ConfigAllInfo configAllInfo) {
try {
addConfigInfo(configAllInfo);
} catch (Exception ex) {
updateConfigInfo(configAllInfo);
}
ConfigChangePublisher.notifyConfigChange(new LocalDataChangeEvent(ContentUtil.getGroupKey(configAllInfo)));
ConfigChangePublisher.notifyConfigChange(new LocalDataChangeEvent(identify, ContentUtil.getGroupKey(configAllInfo)));
}
private Integer addConfigInfo(ConfigAllInfo config) {

@ -56,8 +56,8 @@ public class ThreadPoolServiceImpl implements ThreadPoolService {
}
@Override
public void saveOrUpdateThreadPoolConfig(ThreadPoolSaveOrUpdateReqDTO reqDTO) {
configService.insertOrUpdate(BeanUtil.convert(reqDTO, ConfigAllInfo.class));
public void saveOrUpdateThreadPoolConfig(String identify, ThreadPoolSaveOrUpdateReqDTO reqDTO) {
configService.insertOrUpdate(identify, BeanUtil.convert(reqDTO, ConfigAllInfo.class));
}
}

@ -1,8 +1,15 @@
package com.github.dynamic.threadpool.config.toolkit;
import cn.hutool.core.collection.CollUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
/**
* Map util.
@ -28,4 +35,42 @@ public class MapUtil {
return val;
}
/**
* Key .
*
* @param sourceMap
* @param filters
* @return
*/
public static List<String> parseMapForFilter(Map<String, ?> sourceMap, String filters) {
List<String> resultList = Lists.newArrayList();
if (CollUtil.isEmpty(sourceMap)) {
return resultList;
}
sourceMap.forEach((key, val) -> {
if (checkKey(key, filters)) {
resultList.add(key);
}
});
return resultList;
}
/**
* .
*
* @param key
* @param filters
* @return
*/
private static boolean checkKey(String key, String filters) {
if (key.indexOf(filters) > -1) {
return true;
} else {
return false;
}
}
}

@ -73,7 +73,7 @@ public class Md5ConfigUtil {
if (c == WORD_SEPARATOR_CHAR) {
tmpList.add(configKeysString.substring(start, i));
start = i + 1;
if (tmpList.size() > 3) {
if (tmpList.size() > 4) {
// Malformed message and return parameter error.
throw new IllegalArgumentException("invalid protocol,too much key");
}
@ -84,16 +84,9 @@ public class Md5ConfigUtil {
}
start = i + 1;
// If it is the old message, the last digit is MD5. The post-multi-tenant message is tenant
if (tmpList.size() == 2) {
String groupKey = getKey(tmpList.get(0), tmpList.get(1));
groupKey = SingletonRepository.DataIdGroupIdCache.getSingleton(groupKey);
md5Map.put(groupKey, endValue);
} else {
String groupKey = getKey(tmpList.get(0), tmpList.get(1), endValue);
groupKey = SingletonRepository.DataIdGroupIdCache.getSingleton(groupKey);
md5Map.put(groupKey, tmpList.get(2));
}
String groupKey = getKey(tmpList.get(0), tmpList.get(1), tmpList.get(2), tmpList.get(3));
groupKey = SingletonRepository.DataIdGroupIdCache.getSingleton(groupKey);
md5Map.put(groupKey, endValue);
tmpList.clear();
// Protect malformed messages
@ -113,7 +106,7 @@ public class Md5ConfigUtil {
return sb.toString();
}
public static String getKey(String dataId, String group, String tenant) {
public static String getKey(String dataId, String group, String tenant, String identify) {
StringBuilder sb = new StringBuilder();
GroupKey.urlEncode(dataId, sb);
sb.append('+');
@ -121,7 +114,9 @@ public class Md5ConfigUtil {
if (!StringUtils.isEmpty(tenant)) {
sb.append('+');
GroupKey.urlEncode(tenant, sb);
sb.append("+").append(identify);
}
return sb.toString();
}

@ -1,9 +1,12 @@
package com.github.dynamic.threadpool.config.toolkit;
import cn.hutool.core.text.StrBuilder;
import org.springframework.util.StringUtils;
import javax.servlet.http.HttpServletRequest;
import static com.github.dynamic.threadpool.common.constant.Constants.LONG_PULLING_CLIENT_IDENTIFICATION;
/**
* Request util.
*
@ -24,7 +27,8 @@ public class RequestUtil {
return xForwardedFor.split(X_FORWARDED_FOR_SPLIT_SYMBOL)[0].trim();
}
String nginxHeader = request.getHeader(X_REAL_IP);
return StringUtils.isEmpty(nginxHeader) ? request.getRemoteAddr() : nginxHeader;
String ipPort = request.getHeader(LONG_PULLING_CLIENT_IDENTIFICATION);
return StringUtils.isEmpty(nginxHeader) ? ipPort : nginxHeader;
}
}

@ -41,9 +41,10 @@ public class ThreadPoolController {
return Results.success(threadPoolService.getThreadPool(reqDTO));
}
@PostMapping("/pool/save_or_update")
public Result saveOrUpdateThreadPoolConfig(@RequestBody ThreadPoolSaveOrUpdateReqDTO reqDTO) {
threadPoolService.saveOrUpdateThreadPoolConfig(reqDTO);
@PostMapping("/pool/save_or_update}")
public Result saveOrUpdateThreadPoolConfig(@RequestParam(value = "identify", required = false) String identify,
@RequestBody ThreadPoolSaveOrUpdateReqDTO reqDTO) {
threadPoolService.saveOrUpdateThreadPoolConfig(identify, reqDTO);
return Results.success();
}

@ -10,6 +10,7 @@ import org.springframework.core.env.ConfigurableEnvironment;
import java.net.InetAddress;
import static com.github.dynamic.threadpool.common.constant.Constants.CLIENT_IDENTIFICATION_VALUE;
import static com.github.dynamic.threadpool.starter.toolkit.CloudCommonIdUtil.getDefaultInstanceId;
import static com.github.dynamic.threadpool.starter.toolkit.CloudCommonIdUtil.getIpApplicationName;
@ -31,11 +32,14 @@ public class DiscoveryConfig {
instanceInfo.setInstanceId(getDefaultInstanceId(environment))
.setIpApplicationName(getIpApplicationName(environment))
.setHostName(InetAddress.getLocalHost().getHostAddress())
.setIdentify(CLIENT_IDENTIFICATION_VALUE)
.setAppName(environment.getProperty("spring.application.name"))
.setClientBasePath(environment.getProperty("server.servlet.context-path"));
String callBackUrl = new StringBuilder().append(instanceInfo.getHostName()).append(":")
.append(environment.getProperty("server.port")).append(instanceInfo.getClientBasePath())
.toString();
instanceInfo.setCallBackUrl(callBackUrl);
return instanceInfo;

@ -64,6 +64,8 @@ public class ClientWorker {
return t;
});
log.info("Client identity :: {}", CLIENT_IDENTIFICATION_VALUE);
this.executor.scheduleWithFixedDelay(() -> {
try {
checkConfigInfo();
@ -141,8 +143,9 @@ public class ClientWorker {
for (CacheData cacheData : cacheDataList) {
sb.append(cacheData.tpId).append(WORD_SEPARATOR);
sb.append(cacheData.itemId).append(WORD_SEPARATOR);
sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
sb.append(cacheData.tenantId).append(LINE_SEPARATOR);
sb.append(cacheData.tenantId).append(WORD_SEPARATOR);
sb.append(CLIENT_IDENTIFICATION_VALUE).append(WORD_SEPARATOR);
sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
if (cacheData.isInitializing()) {
inInitializingCacheList.add(GroupKey.getKeyTenant(cacheData.tpId, cacheData.itemId, cacheData.tenantId));
@ -159,6 +162,9 @@ public class ClientWorker {
Map<String, String> headers = new HashMap(2);
headers.put(LONG_PULLING_TIMEOUT, "" + timeout);
// 确认客户端身份, 修改线程池配置时可单独修改
headers.put(LONG_PULLING_CLIENT_IDENTIFICATION, CLIENT_IDENTIFICATION_VALUE);
// told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) {
headers.put(LONG_PULLING_TIMEOUT_NO_HANGUP, "true");

Loading…
Cancel
Save