From fb729921fc0905cd64c94f106f0c0035feadaca2 Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Fri, 5 Nov 2021 22:53:36 +0800 Subject: [PATCH] =?UTF-8?q?Feature:=20=E5=A2=9E=E5=BC=BA=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B1=A0=E9=85=8D=E7=BD=AE=E4=BF=AE=E6=94=B9.=20=E5=AE=A2?= =?UTF-8?q?=E6=88=B7=E7=AB=AF=E9=9B=86=E7=BE=A4=E9=83=A8=E7=BD=B2=E6=97=B6?= =?UTF-8?q?,=20=E5=8F=AF=E4=BF=AE=E6=94=B9=E5=85=A8=E9=83=A8=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E6=B1=A0=E9=85=8D=E7=BD=AE=E4=B9=9F=E5=8F=AF=E9=80=89?= =?UTF-8?q?=E6=8B=A9=E5=8D=95=E4=B8=80=E5=AE=9E=E4=BE=8B.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../threadpool/common/constant/Constants.java | 8 ++++ .../threadpool/common/model/InstanceInfo.java | 2 + .../config/controller/ConfigController.java | 8 ++-- .../config/event/LocalDataChangeEvent.java | 11 ++++- .../threadpool/config/model/CacheItem.java | 9 ++++ .../config/service/ConfigCacheService.java | 41 ++++++++++------- .../config/service/LongPollingService.java | 30 ++++++++++--- .../config/service/biz/ConfigService.java | 3 +- .../config/service/biz/ThreadPoolService.java | 3 +- .../service/biz/impl/ConfigServiceImpl.java | 4 +- .../biz/impl/ThreadPoolServiceImpl.java | 4 +- .../threadpool/config/toolkit/MapUtil.java | 45 +++++++++++++++++++ .../config/toolkit/Md5ConfigUtil.java | 19 +++----- .../config/toolkit/RequestUtil.java | 6 ++- .../controller/ThreadPoolController.java | 7 +-- .../starter/config/DiscoveryConfig.java | 4 ++ .../threadpool/starter/core/ClientWorker.java | 10 ++++- 17 files changed, 163 insertions(+), 51 deletions(-) diff --git a/common/src/main/java/com/github/dynamic/threadpool/common/constant/Constants.java b/common/src/main/java/com/github/dynamic/threadpool/common/constant/Constants.java index cc40c294..bdc9dc2e 100644 --- a/common/src/main/java/com/github/dynamic/threadpool/common/constant/Constants.java +++ b/common/src/main/java/com/github/dynamic/threadpool/common/constant/Constants.java @@ -1,5 +1,9 @@ package com.github.dynamic.threadpool.common.constant; +import org.springframework.expression.spel.ast.Identifier; + +import java.util.UUID; + /** * Constants. * @@ -42,6 +46,10 @@ public class Constants { public static final String LONG_PULLING_TIMEOUT_NO_HANGUP = "Long-Pulling-Timeout-No-Hangup"; + public static final String LONG_PULLING_CLIENT_IDENTIFICATION = "Long-Pulling-Client-Identification"; + + public static final String CLIENT_IDENTIFICATION_VALUE = UUID.randomUUID().toString(); + public static final String LISTENING_CONFIGS = "Listening-Configs"; public static final String GROUP_KEY_DELIMITER = "+"; diff --git a/common/src/main/java/com/github/dynamic/threadpool/common/model/InstanceInfo.java b/common/src/main/java/com/github/dynamic/threadpool/common/model/InstanceInfo.java index 88cb054d..814caa45 100644 --- a/common/src/main/java/com/github/dynamic/threadpool/common/model/InstanceInfo.java +++ b/common/src/main/java/com/github/dynamic/threadpool/common/model/InstanceInfo.java @@ -34,6 +34,8 @@ public class InstanceInfo { private String callBackUrl; + private String identify; + private volatile String vipAddress; private volatile String secureVipAddress; diff --git a/config/src/main/java/com/github/dynamic/threadpool/config/controller/ConfigController.java b/config/src/main/java/com/github/dynamic/threadpool/config/controller/ConfigController.java index 60f99ffd..518821e6 100644 --- a/config/src/main/java/com/github/dynamic/threadpool/config/controller/ConfigController.java +++ b/config/src/main/java/com/github/dynamic/threadpool/config/controller/ConfigController.java @@ -40,12 +40,14 @@ public class ConfigController { @RequestParam("itemId") String itemId, @RequestParam(value = "namespace") String namespace) { - return Results.success(configService.findConfigAllInfo(tpId, itemId, namespace)); + ConfigAllInfo configAllInfo = configService.findConfigAllInfo(tpId, itemId, namespace); + return Results.success(configAllInfo); } @PostMapping - public Result publishConfig(@RequestBody ConfigAllInfo config) { - configService.insertOrUpdate(config); + public Result publishConfig(@RequestParam(value = "identify", required = false) String identify, + @RequestBody ConfigAllInfo config) { + configService.insertOrUpdate(identify, config); return Results.success(true); } diff --git a/config/src/main/java/com/github/dynamic/threadpool/config/event/LocalDataChangeEvent.java b/config/src/main/java/com/github/dynamic/threadpool/config/event/LocalDataChangeEvent.java index de778a02..7887ffbd 100644 --- a/config/src/main/java/com/github/dynamic/threadpool/config/event/LocalDataChangeEvent.java +++ b/config/src/main/java/com/github/dynamic/threadpool/config/event/LocalDataChangeEvent.java @@ -8,9 +8,18 @@ package com.github.dynamic.threadpool.config.event; */ public class LocalDataChangeEvent extends Event { + /** + * 租户+项目+线程池 + */ public final String groupKey; - public LocalDataChangeEvent(String groupKey) { + /** + * 客户端实例唯一标识 + */ + public final String identify; + + public LocalDataChangeEvent(String identify, String groupKey) { + this.identify = identify; this.groupKey = groupKey; } diff --git a/config/src/main/java/com/github/dynamic/threadpool/config/model/CacheItem.java b/config/src/main/java/com/github/dynamic/threadpool/config/model/CacheItem.java index 228ce739..1757e989 100644 --- a/config/src/main/java/com/github/dynamic/threadpool/config/model/CacheItem.java +++ b/config/src/main/java/com/github/dynamic/threadpool/config/model/CacheItem.java @@ -1,5 +1,6 @@ package com.github.dynamic.threadpool.config.model; +import com.github.dynamic.threadpool.common.toolkit.Md5Util; import com.github.dynamic.threadpool.config.toolkit.SimpleReadWriteLock; import com.github.dynamic.threadpool.config.toolkit.SingletonRepository; import com.github.dynamic.threadpool.common.constant.Constants; @@ -22,6 +23,8 @@ public class CacheItem { public volatile long lastModifiedTs; + public volatile ConfigAllInfo configAllInfo; + public SimpleReadWriteLock rwLock = new SimpleReadWriteLock(); public CacheItem(String groupKey) { @@ -33,4 +36,10 @@ public class CacheItem { this.groupKey = SingletonRepository.DataIdGroupIdCache.getSingleton(groupKey); } + public CacheItem(String groupKey, ConfigAllInfo configAllInfo) { + this.configAllInfo = configAllInfo; + this.md5 = Md5Util.getTpContentMd5(configAllInfo); + this.groupKey = SingletonRepository.DataIdGroupIdCache.getSingleton(groupKey); + } + } diff --git a/config/src/main/java/com/github/dynamic/threadpool/config/service/ConfigCacheService.java b/config/src/main/java/com/github/dynamic/threadpool/config/service/ConfigCacheService.java index ca17b838..13a10616 100644 --- a/config/src/main/java/com/github/dynamic/threadpool/config/service/ConfigCacheService.java +++ b/config/src/main/java/com/github/dynamic/threadpool/config/service/ConfigCacheService.java @@ -1,5 +1,6 @@ package com.github.dynamic.threadpool.config.service; +import cn.hutool.core.collection.CollUtil; import com.github.dynamic.threadpool.config.service.biz.ConfigService; import com.github.dynamic.threadpool.common.config.ApplicationContextHolder; import com.github.dynamic.threadpool.common.constant.Constants; @@ -8,9 +9,12 @@ import com.github.dynamic.threadpool.config.event.LocalDataChangeEvent; import com.github.dynamic.threadpool.config.model.CacheItem; import com.github.dynamic.threadpool.config.model.ConfigAllInfo; import com.github.dynamic.threadpool.config.notify.NotifyCenter; +import com.google.common.collect.Maps; import org.springframework.util.StringUtils; +import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; /** @@ -23,7 +27,7 @@ public class ConfigCacheService { static ConfigService configService = null; - private static final ConcurrentHashMap CACHE = new ConcurrentHashMap(); + private static final ConcurrentHashMap> CACHE = new ConcurrentHashMap(); public static boolean isUpdateData(String groupKey, String md5, String ip) { String contentMd5 = ConfigCacheService.getContentMd5IsNullPut(groupKey, ip); @@ -32,16 +36,16 @@ public class ConfigCacheService { /** * Get Md5. - * TODO:Add IP, different IP thread pool rewrite - * TODO:groupKey && Md5 Cache * * @param groupKey * @param ip * @return */ private static String getContentMd5IsNullPut(String groupKey, String ip) { - CacheItem cacheItem = CACHE.get(groupKey); - if (cacheItem != null) { + Map cacheItemMap = Optional.ofNullable(CACHE.get(groupKey)).orElse(Maps.newHashMap()); + + CacheItem cacheItem = null; + if (CollUtil.isNotEmpty(cacheItemMap) && (cacheItem = cacheItemMap.get(ip)) != null) { return cacheItem.md5; } @@ -52,9 +56,9 @@ public class ConfigCacheService { ConfigAllInfo config = configService.findConfigAllInfo(split[0], split[1], split[2]); if (config != null && !StringUtils.isEmpty(config.getTpId())) { - String md5 = Md5Util.getTpContentMd5(config); - cacheItem = new CacheItem(groupKey, md5); - CACHE.put(groupKey, cacheItem); + cacheItem = new CacheItem(groupKey, config); + cacheItemMap.put(ip, cacheItem); + CACHE.put(groupKey, cacheItemMap); } return (cacheItem != null) ? cacheItem.md5 : Constants.NULL; } @@ -74,23 +78,28 @@ public class ConfigCacheService { return Md5Util.getTpContentMd5(config); } - public static void updateMd5(String groupKey, String md5, long lastModifiedTs) { - CacheItem cache = makeSure(groupKey); + public static void updateMd5(String groupKey, String ip, String md5) { + CacheItem cache = makeSure(groupKey, ip); if (cache.md5 == null || !cache.md5.equals(md5)) { cache.md5 = md5; - cache.lastModifiedTs = lastModifiedTs; - NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey)); + cache.lastModifiedTs = System.currentTimeMillis(); + NotifyCenter.publishEvent(new LocalDataChangeEvent(ip, groupKey)); } } - static CacheItem makeSure(final String groupKey) { - CacheItem item = CACHE.get(groupKey); + public static CacheItem makeSure(String groupKey, String ip) { + Map ipCacheItemMap = CACHE.get(groupKey); + CacheItem item = ipCacheItemMap.get(ip); if (null != item) { return item; } + CacheItem tmp = new CacheItem(groupKey); - item = CACHE.putIfAbsent(groupKey, tmp); - return (null == item) ? tmp : item; + Map cacheItemMap = Maps.newHashMap(); + cacheItemMap.put(ip, tmp); + CACHE.putIfAbsent(groupKey, cacheItemMap); + + return tmp; } } diff --git a/config/src/main/java/com/github/dynamic/threadpool/config/service/LongPollingService.java b/config/src/main/java/com/github/dynamic/threadpool/config/service/LongPollingService.java index 01eb5d58..7eb578db 100644 --- a/config/src/main/java/com/github/dynamic/threadpool/config/service/LongPollingService.java +++ b/config/src/main/java/com/github/dynamic/threadpool/config/service/LongPollingService.java @@ -1,8 +1,10 @@ package com.github.dynamic.threadpool.config.service; +import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; import com.github.dynamic.threadpool.config.notify.listener.Subscriber; import com.github.dynamic.threadpool.config.toolkit.ConfigExecutor; +import com.github.dynamic.threadpool.config.toolkit.MapUtil; import com.github.dynamic.threadpool.config.toolkit.Md5ConfigUtil; import com.github.dynamic.threadpool.config.toolkit.RequestUtil; import com.github.dynamic.threadpool.common.toolkit.Md5Util; @@ -10,6 +12,7 @@ import com.github.dynamic.threadpool.common.web.base.Results; import com.github.dynamic.threadpool.config.event.Event; import com.github.dynamic.threadpool.config.event.LocalDataChangeEvent; import com.github.dynamic.threadpool.config.notify.NotifyCenter; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; @@ -23,6 +26,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import static com.github.dynamic.threadpool.common.constant.Constants.GROUP_KEY_DELIMITER; + /** * Long polling service. * @@ -61,7 +66,7 @@ public class LongPollingService { } else { if (event instanceof LocalDataChangeEvent) { LocalDataChangeEvent evt = (LocalDataChangeEvent) event; - ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey)); + ConfigExecutor.executeLongPolling(new DataChangeTask(evt.identify, evt.groupKey)); } } } @@ -85,9 +90,12 @@ public class LongPollingService { class DataChangeTask implements Runnable { + final String identify; + final String groupKey; - DataChangeTask(String groupKey) { + DataChangeTask(String identify, String groupKey) { + this.identify = identify; this.groupKey = groupKey; } @@ -97,12 +105,20 @@ public class LongPollingService { for (Iterator iter = allSubs.iterator(); iter.hasNext(); ) { ClientLongPolling clientSub = iter.next(); - if (clientSub.clientMd5Map.containsKey(groupKey)) { - getRetainIps().put(clientSub.ip, System.currentTimeMillis()); - ConfigCacheService.updateMd5(groupKey, ConfigCacheService.getContentMd5(groupKey), System.currentTimeMillis()); - iter.remove(); - clientSub.sendResponse(Arrays.asList(groupKey)); + String identity = groupKey + GROUP_KEY_DELIMITER + identify; + List parseMapForFilter = Lists.newArrayList(identity); + if (StrUtil.isBlank(identify)) { + parseMapForFilter = MapUtil.parseMapForFilter(clientSub.clientMd5Map, groupKey); } + + parseMapForFilter.forEach(each -> { + if (clientSub.clientMd5Map.containsKey(each)) { + getRetainIps().put(clientSub.ip, System.currentTimeMillis()); + ConfigCacheService.updateMd5(each, clientSub.ip, ConfigCacheService.getContentMd5(groupKey)); + iter.remove(); + clientSub.sendResponse(Arrays.asList(groupKey)); + } + }); } } catch (Exception ex) { log.error("Data change error :: {}", ex.getMessage(), ex); diff --git a/config/src/main/java/com/github/dynamic/threadpool/config/service/biz/ConfigService.java b/config/src/main/java/com/github/dynamic/threadpool/config/service/biz/ConfigService.java index b0bf6028..eed7eafe 100644 --- a/config/src/main/java/com/github/dynamic/threadpool/config/service/biz/ConfigService.java +++ b/config/src/main/java/com/github/dynamic/threadpool/config/service/biz/ConfigService.java @@ -23,8 +23,9 @@ public interface ConfigService { /** * Insert or update. * + * @param identify * @param configAllInfo */ - void insertOrUpdate(ConfigAllInfo configAllInfo); + void insertOrUpdate(String identify, ConfigAllInfo configAllInfo); } diff --git a/config/src/main/java/com/github/dynamic/threadpool/config/service/biz/ThreadPoolService.java b/config/src/main/java/com/github/dynamic/threadpool/config/service/biz/ThreadPoolService.java index 4301bc70..2e6864e9 100644 --- a/config/src/main/java/com/github/dynamic/threadpool/config/service/biz/ThreadPoolService.java +++ b/config/src/main/java/com/github/dynamic/threadpool/config/service/biz/ThreadPoolService.java @@ -42,8 +42,9 @@ public interface ThreadPoolService { /** * Save or update thread pool config. * + * @param identify * @param reqDTO */ - void saveOrUpdateThreadPoolConfig(ThreadPoolSaveOrUpdateReqDTO reqDTO); + void saveOrUpdateThreadPoolConfig(String identify, ThreadPoolSaveOrUpdateReqDTO reqDTO); } diff --git a/config/src/main/java/com/github/dynamic/threadpool/config/service/biz/impl/ConfigServiceImpl.java b/config/src/main/java/com/github/dynamic/threadpool/config/service/biz/impl/ConfigServiceImpl.java index 3812493e..55e0aa79 100644 --- a/config/src/main/java/com/github/dynamic/threadpool/config/service/biz/impl/ConfigServiceImpl.java +++ b/config/src/main/java/com/github/dynamic/threadpool/config/service/biz/impl/ConfigServiceImpl.java @@ -40,14 +40,14 @@ public class ConfigServiceImpl implements ConfigService { } @Override - public void insertOrUpdate(ConfigAllInfo configAllInfo) { + public void insertOrUpdate(String identify, ConfigAllInfo configAllInfo) { try { addConfigInfo(configAllInfo); } catch (Exception ex) { updateConfigInfo(configAllInfo); } - ConfigChangePublisher.notifyConfigChange(new LocalDataChangeEvent(ContentUtil.getGroupKey(configAllInfo))); + ConfigChangePublisher.notifyConfigChange(new LocalDataChangeEvent(identify, ContentUtil.getGroupKey(configAllInfo))); } private Integer addConfigInfo(ConfigAllInfo config) { diff --git a/config/src/main/java/com/github/dynamic/threadpool/config/service/biz/impl/ThreadPoolServiceImpl.java b/config/src/main/java/com/github/dynamic/threadpool/config/service/biz/impl/ThreadPoolServiceImpl.java index 77fc5548..c3ec1a72 100644 --- a/config/src/main/java/com/github/dynamic/threadpool/config/service/biz/impl/ThreadPoolServiceImpl.java +++ b/config/src/main/java/com/github/dynamic/threadpool/config/service/biz/impl/ThreadPoolServiceImpl.java @@ -56,8 +56,8 @@ public class ThreadPoolServiceImpl implements ThreadPoolService { } @Override - public void saveOrUpdateThreadPoolConfig(ThreadPoolSaveOrUpdateReqDTO reqDTO) { - configService.insertOrUpdate(BeanUtil.convert(reqDTO, ConfigAllInfo.class)); + public void saveOrUpdateThreadPoolConfig(String identify, ThreadPoolSaveOrUpdateReqDTO reqDTO) { + configService.insertOrUpdate(identify, BeanUtil.convert(reqDTO, ConfigAllInfo.class)); } } diff --git a/config/src/main/java/com/github/dynamic/threadpool/config/toolkit/MapUtil.java b/config/src/main/java/com/github/dynamic/threadpool/config/toolkit/MapUtil.java index 0a2ee3b0..74750985 100644 --- a/config/src/main/java/com/github/dynamic/threadpool/config/toolkit/MapUtil.java +++ b/config/src/main/java/com/github/dynamic/threadpool/config/toolkit/MapUtil.java @@ -1,8 +1,15 @@ package com.github.dynamic.threadpool.config.toolkit; +import cn.hutool.core.collection.CollUtil; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.function.BiFunction; +import java.util.stream.Collectors; /** * Map util. @@ -28,4 +35,42 @@ public class MapUtil { return val; } + + /** + * 根据 Key 进行模糊匹配. + * + * @param sourceMap + * @param filters + * @return + */ + public static List parseMapForFilter(Map sourceMap, String filters) { + List resultList = Lists.newArrayList(); + if (CollUtil.isEmpty(sourceMap)) { + return resultList; + } + + sourceMap.forEach((key, val) -> { + if (checkKey(key, filters)) { + resultList.add(key); + } + }); + + return resultList; + } + + /** + * 匹配想要查询的字符. + * + * @param key + * @param filters + * @return + */ + private static boolean checkKey(String key, String filters) { + if (key.indexOf(filters) > -1) { + return true; + } else { + return false; + } + } + } diff --git a/config/src/main/java/com/github/dynamic/threadpool/config/toolkit/Md5ConfigUtil.java b/config/src/main/java/com/github/dynamic/threadpool/config/toolkit/Md5ConfigUtil.java index 5550bdd5..cb7ea4de 100644 --- a/config/src/main/java/com/github/dynamic/threadpool/config/toolkit/Md5ConfigUtil.java +++ b/config/src/main/java/com/github/dynamic/threadpool/config/toolkit/Md5ConfigUtil.java @@ -73,7 +73,7 @@ public class Md5ConfigUtil { if (c == WORD_SEPARATOR_CHAR) { tmpList.add(configKeysString.substring(start, i)); start = i + 1; - if (tmpList.size() > 3) { + if (tmpList.size() > 4) { // Malformed message and return parameter error. throw new IllegalArgumentException("invalid protocol,too much key"); } @@ -84,16 +84,9 @@ public class Md5ConfigUtil { } start = i + 1; - // If it is the old message, the last digit is MD5. The post-multi-tenant message is tenant - if (tmpList.size() == 2) { - String groupKey = getKey(tmpList.get(0), tmpList.get(1)); - groupKey = SingletonRepository.DataIdGroupIdCache.getSingleton(groupKey); - md5Map.put(groupKey, endValue); - } else { - String groupKey = getKey(tmpList.get(0), tmpList.get(1), endValue); - groupKey = SingletonRepository.DataIdGroupIdCache.getSingleton(groupKey); - md5Map.put(groupKey, tmpList.get(2)); - } + String groupKey = getKey(tmpList.get(0), tmpList.get(1), tmpList.get(2), tmpList.get(3)); + groupKey = SingletonRepository.DataIdGroupIdCache.getSingleton(groupKey); + md5Map.put(groupKey, endValue); tmpList.clear(); // Protect malformed messages @@ -113,7 +106,7 @@ public class Md5ConfigUtil { return sb.toString(); } - public static String getKey(String dataId, String group, String tenant) { + public static String getKey(String dataId, String group, String tenant, String identify) { StringBuilder sb = new StringBuilder(); GroupKey.urlEncode(dataId, sb); sb.append('+'); @@ -121,7 +114,9 @@ public class Md5ConfigUtil { if (!StringUtils.isEmpty(tenant)) { sb.append('+'); GroupKey.urlEncode(tenant, sb); + sb.append("+").append(identify); } + return sb.toString(); } diff --git a/config/src/main/java/com/github/dynamic/threadpool/config/toolkit/RequestUtil.java b/config/src/main/java/com/github/dynamic/threadpool/config/toolkit/RequestUtil.java index 1c7cd05e..24561a37 100644 --- a/config/src/main/java/com/github/dynamic/threadpool/config/toolkit/RequestUtil.java +++ b/config/src/main/java/com/github/dynamic/threadpool/config/toolkit/RequestUtil.java @@ -1,9 +1,12 @@ package com.github.dynamic.threadpool.config.toolkit; +import cn.hutool.core.text.StrBuilder; import org.springframework.util.StringUtils; import javax.servlet.http.HttpServletRequest; +import static com.github.dynamic.threadpool.common.constant.Constants.LONG_PULLING_CLIENT_IDENTIFICATION; + /** * Request util. * @@ -24,7 +27,8 @@ public class RequestUtil { return xForwardedFor.split(X_FORWARDED_FOR_SPLIT_SYMBOL)[0].trim(); } String nginxHeader = request.getHeader(X_REAL_IP); - return StringUtils.isEmpty(nginxHeader) ? request.getRemoteAddr() : nginxHeader; + String ipPort = request.getHeader(LONG_PULLING_CLIENT_IDENTIFICATION); + return StringUtils.isEmpty(nginxHeader) ? ipPort : nginxHeader; } } diff --git a/console/src/main/java/com/github/dynamic/threadpool/console/controller/ThreadPoolController.java b/console/src/main/java/com/github/dynamic/threadpool/console/controller/ThreadPoolController.java index d25dabd9..d6a7ad2e 100644 --- a/console/src/main/java/com/github/dynamic/threadpool/console/controller/ThreadPoolController.java +++ b/console/src/main/java/com/github/dynamic/threadpool/console/controller/ThreadPoolController.java @@ -41,9 +41,10 @@ public class ThreadPoolController { return Results.success(threadPoolService.getThreadPool(reqDTO)); } - @PostMapping("/pool/save_or_update") - public Result saveOrUpdateThreadPoolConfig(@RequestBody ThreadPoolSaveOrUpdateReqDTO reqDTO) { - threadPoolService.saveOrUpdateThreadPoolConfig(reqDTO); + @PostMapping("/pool/save_or_update}") + public Result saveOrUpdateThreadPoolConfig(@RequestParam(value = "identify", required = false) String identify, + @RequestBody ThreadPoolSaveOrUpdateReqDTO reqDTO) { + threadPoolService.saveOrUpdateThreadPoolConfig(identify, reqDTO); return Results.success(); } diff --git a/hippo4j-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/DiscoveryConfig.java b/hippo4j-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/DiscoveryConfig.java index 4e2f6950..6edf4764 100644 --- a/hippo4j-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/DiscoveryConfig.java +++ b/hippo4j-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/config/DiscoveryConfig.java @@ -10,6 +10,7 @@ import org.springframework.core.env.ConfigurableEnvironment; import java.net.InetAddress; +import static com.github.dynamic.threadpool.common.constant.Constants.CLIENT_IDENTIFICATION_VALUE; import static com.github.dynamic.threadpool.starter.toolkit.CloudCommonIdUtil.getDefaultInstanceId; import static com.github.dynamic.threadpool.starter.toolkit.CloudCommonIdUtil.getIpApplicationName; @@ -31,11 +32,14 @@ public class DiscoveryConfig { instanceInfo.setInstanceId(getDefaultInstanceId(environment)) .setIpApplicationName(getIpApplicationName(environment)) .setHostName(InetAddress.getLocalHost().getHostAddress()) + .setIdentify(CLIENT_IDENTIFICATION_VALUE) .setAppName(environment.getProperty("spring.application.name")) .setClientBasePath(environment.getProperty("server.servlet.context-path")); + String callBackUrl = new StringBuilder().append(instanceInfo.getHostName()).append(":") .append(environment.getProperty("server.port")).append(instanceInfo.getClientBasePath()) .toString(); + instanceInfo.setCallBackUrl(callBackUrl); return instanceInfo; diff --git a/hippo4j-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/ClientWorker.java b/hippo4j-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/ClientWorker.java index bf80561f..db8f19cf 100644 --- a/hippo4j-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/ClientWorker.java +++ b/hippo4j-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/ClientWorker.java @@ -64,6 +64,8 @@ public class ClientWorker { return t; }); + log.info("Client identity :: {}", CLIENT_IDENTIFICATION_VALUE); + this.executor.scheduleWithFixedDelay(() -> { try { checkConfigInfo(); @@ -141,8 +143,9 @@ public class ClientWorker { for (CacheData cacheData : cacheDataList) { sb.append(cacheData.tpId).append(WORD_SEPARATOR); sb.append(cacheData.itemId).append(WORD_SEPARATOR); - sb.append(cacheData.getMd5()).append(WORD_SEPARATOR); - sb.append(cacheData.tenantId).append(LINE_SEPARATOR); + sb.append(cacheData.tenantId).append(WORD_SEPARATOR); + sb.append(CLIENT_IDENTIFICATION_VALUE).append(WORD_SEPARATOR); + sb.append(cacheData.getMd5()).append(LINE_SEPARATOR); if (cacheData.isInitializing()) { inInitializingCacheList.add(GroupKey.getKeyTenant(cacheData.tpId, cacheData.itemId, cacheData.tenantId)); @@ -159,6 +162,9 @@ public class ClientWorker { Map headers = new HashMap(2); headers.put(LONG_PULLING_TIMEOUT, "" + timeout); + // 确认客户端身份, 修改线程池配置时可单独修改 + headers.put(LONG_PULLING_CLIENT_IDENTIFICATION, CLIENT_IDENTIFICATION_VALUE); + // told server do not hang me up if new initializing cacheData added in if (isInitializingCacheList) { headers.put(LONG_PULLING_TIMEOUT_NO_HANGUP, "true");