diff --git a/common/pom.xml b/common/pom.xml
index 45bc2b38..c3740911 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -25,6 +25,11 @@
spring-boot-starter
+
+ com.alibaba
+ fastjson
+
+
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/common/Constants.java b/common/src/main/java/io/dynamic/threadpool/common/constant/Constants.java
similarity index 85%
rename from dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/common/Constants.java
rename to common/src/main/java/io/dynamic/threadpool/common/constant/Constants.java
index cd571b40..be1c592b 100644
--- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/common/Constants.java
+++ b/common/src/main/java/io/dynamic/threadpool/common/constant/Constants.java
@@ -1,4 +1,4 @@
-package io.dynamic.threadpool.starter.common;
+package io.dynamic.threadpool.common.constant;
/**
* Constants
@@ -35,4 +35,9 @@ public class Constants {
public static final String PROBE_MODIFY_REQUEST = "Listening-Configs";
public static final String LONG_PULLING_TIMEOUT = "Long-Pulling-Timeout";
+
+ public static final String LISTENING_CONFIGS = "Listening-Configs";
+
+ public static final String GROUP_KEY_DELIMITER = "+";
+
}
diff --git a/common/src/main/java/io/dynamic/threadpool/common/toolkit/ContentUtil.java b/common/src/main/java/io/dynamic/threadpool/common/toolkit/ContentUtil.java
index 1c861b49..83bbc9ed 100644
--- a/common/src/main/java/io/dynamic/threadpool/common/toolkit/ContentUtil.java
+++ b/common/src/main/java/io/dynamic/threadpool/common/toolkit/ContentUtil.java
@@ -1,6 +1,9 @@
package io.dynamic.threadpool.common.toolkit;
+import com.alibaba.fastjson.JSON;
+import io.dynamic.threadpool.common.constant.Constants;
import io.dynamic.threadpool.common.model.PoolParameter;
+import io.dynamic.threadpool.common.model.PoolParameterInfo;
/**
* Content Util.
@@ -10,9 +13,9 @@ import io.dynamic.threadpool.common.model.PoolParameter;
*/
public class ContentUtil {
- public static String getPoolContent(PoolParameter parameter) {
+ /*public static String getPoolContent(PoolParameter parameter) {
StringBuilder stringBuilder = new StringBuilder();
- String targetStr = stringBuilder.append(parameter.getCoreSize())
+ String resultStr = stringBuilder.append(parameter.getCoreSize())
.append(parameter.getMaxSize())
.append(parameter.getQueueType())
.append(parameter.getCapacity())
@@ -21,6 +24,33 @@ public class ContentUtil {
.append(parameter.getCapacityAlarm())
.append(parameter.getLivenessAlarm())
.toString();
- return targetStr;
+ return resultStr;
+ }*/
+
+ public static String getPoolContent(PoolParameter parameter) {
+ PoolParameterInfo poolInfo = new PoolParameterInfo();
+ poolInfo.setNamespace(parameter.getNamespace());
+ poolInfo.setItemId(parameter.getItemId());
+ poolInfo.setTpId(parameter.getTpId());
+ poolInfo.setCoreSize(parameter.getCoreSize());
+ poolInfo.setMaxSize(parameter.getMaxSize());
+ poolInfo.setQueueType(parameter.getQueueType());
+ poolInfo.setCapacity(parameter.getCapacity());
+ poolInfo.setKeepAliveTime(parameter.getKeepAliveTime());
+ poolInfo.setIsAlarm(parameter.getIsAlarm());
+ poolInfo.setCapacityAlarm(parameter.getCapacityAlarm());
+ poolInfo.setLivenessAlarm(parameter.getLivenessAlarm());
+ return JSON.toJSONString(poolInfo);
+ }
+
+ public static String getGroupKey(PoolParameter parameter) {
+ StringBuilder stringBuilder = new StringBuilder();
+ String resultStr = stringBuilder.append(parameter.getTpId())
+ .append(Constants.GROUP_KEY_DELIMITER)
+ .append(parameter.getItemId())
+ .append(Constants.GROUP_KEY_DELIMITER)
+ .append(parameter.getNamespace())
+ .toString();
+ return resultStr;
}
}
diff --git a/common/src/main/java/io/dynamic/threadpool/common/toolkit/GroupKey.java b/common/src/main/java/io/dynamic/threadpool/common/toolkit/GroupKey.java
new file mode 100644
index 00000000..f189541a
--- /dev/null
+++ b/common/src/main/java/io/dynamic/threadpool/common/toolkit/GroupKey.java
@@ -0,0 +1,106 @@
+package io.dynamic.threadpool.common.toolkit;
+
+import org.springframework.util.StringUtils;
+
+/**
+ * Group Key
+ *
+ * @author chen.ma
+ * @date 2021/6/24 21:12
+ */
+public class GroupKey {
+ public static String getKey(String dataId, String group) {
+ return getKey(dataId, group, "");
+ }
+
+ public static String getKey(String dataId, String group, String datumStr) {
+ return doGetKey(dataId, group, datumStr);
+ }
+
+ public static String getKeyTenant(String dataId, String group, String tenant) {
+ return doGetKey(dataId, group, tenant);
+ }
+
+ private static String doGetKey(String dataId, String group, String datumStr) {
+ StringBuilder sb = new StringBuilder();
+ urlEncode(dataId, sb);
+ sb.append('+');
+ urlEncode(group, sb);
+ if (!StringUtils.isEmpty(datumStr)) {
+ sb.append('+');
+ urlEncode(datumStr, sb);
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * Parse key.
+ *
+ * @param groupKey group key
+ * @return parsed key
+ */
+ public static String[] parseKey(String groupKey) {
+ StringBuilder sb = new StringBuilder();
+ String dataId = null;
+ String group = null;
+ String tenant = null;
+
+ for (int i = 0; i < groupKey.length(); ++i) {
+ char c = groupKey.charAt(i);
+ if ('+' == c) {
+ if (null == dataId) {
+ dataId = sb.toString();
+ sb.setLength(0);
+ } else if (null == group) {
+ group = sb.toString();
+ sb.setLength(0);
+ } else {
+ throw new IllegalArgumentException("invalid groupkey:" + groupKey);
+ }
+ } else if ('%' == c) {
+ char next = groupKey.charAt(++i);
+ char nextnext = groupKey.charAt(++i);
+ if ('2' == next && 'B' == nextnext) {
+ sb.append('+');
+ } else if ('2' == next && '5' == nextnext) {
+ sb.append('%');
+ } else {
+ throw new IllegalArgumentException("invalid groupkey:" + groupKey);
+ }
+ } else {
+ sb.append(c);
+ }
+ }
+
+ if (StringUtils.isEmpty(group)) {
+ group = sb.toString();
+ if (group.length() == 0) {
+ throw new IllegalArgumentException("invalid groupkey:" + groupKey);
+ }
+ } else {
+ tenant = sb.toString();
+ if (group.length() == 0) {
+ throw new IllegalArgumentException("invalid groupkey:" + groupKey);
+ }
+ }
+
+ return new String[]{dataId, group, tenant};
+ }
+
+ /**
+ * + -> %2B % -> %25.
+ */
+ public static void urlEncode(String str, StringBuilder sb) {
+ for (int idx = 0; idx < str.length(); ++idx) {
+ char c = str.charAt(idx);
+ if ('+' == c) {
+ sb.append("%2B");
+ } else if ('%' == c) {
+ sb.append("%25");
+ } else {
+ sb.append(c);
+ }
+ }
+ }
+}
diff --git a/common/src/main/java/io/dynamic/threadpool/common/toolkit/Md5Util.java b/common/src/main/java/io/dynamic/threadpool/common/toolkit/Md5Util.java
index e1d1964f..a0a762a2 100644
--- a/common/src/main/java/io/dynamic/threadpool/common/toolkit/Md5Util.java
+++ b/common/src/main/java/io/dynamic/threadpool/common/toolkit/Md5Util.java
@@ -1,9 +1,16 @@
package io.dynamic.threadpool.common.toolkit;
+import io.dynamic.threadpool.common.constant.Constants;
import io.dynamic.threadpool.common.model.PoolParameter;
+import org.springframework.util.StringUtils;
+import java.io.IOException;
+import java.net.URLEncoder;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
+import java.util.List;
+
+import static io.dynamic.threadpool.common.constant.Constants.WORD_SEPARATOR;
/**
* MD5 Util.
@@ -58,4 +65,31 @@ public class Md5Util {
public static String getTpContentMd5(PoolParameter config) {
return Md5Util.md5Hex(ContentUtil.getPoolContent(config), "UTF-8");
}
+
+ public static String compareMd5ResultString(List changedGroupKeys) throws IOException {
+ if (null == changedGroupKeys) {
+ return "";
+ }
+
+ StringBuilder sb = new StringBuilder();
+
+ for (String groupKey : changedGroupKeys) {
+ String[] dataIdGroupId = GroupKey.parseKey(groupKey);
+ sb.append(dataIdGroupId[0]);
+ sb.append(WORD_SEPARATOR);
+ sb.append(dataIdGroupId[1]);
+ // if have tenant, then set it
+ if (dataIdGroupId.length == 3) {
+ if (!StringUtils.isEmpty(dataIdGroupId[2])) {
+ sb.append(WORD_SEPARATOR);
+ sb.append(dataIdGroupId[2]);
+ }
+ }
+ sb.append(Constants.LINE_SEPARATOR);
+ }
+
+ // To encode WORD_SEPARATOR and LINE_SEPARATOR invisible characters, encoded value is %02 and %01
+ return URLEncoder.encode(sb.toString(), "UTF-8");
+ }
+
}
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/CacheData.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/CacheData.java
index 8e46ce89..78fe4d99 100644
--- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/CacheData.java
+++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/CacheData.java
@@ -2,7 +2,7 @@ package io.dynamic.threadpool.starter.core;
import io.dynamic.threadpool.common.toolkit.ContentUtil;
import io.dynamic.threadpool.common.toolkit.Md5Util;
-import io.dynamic.threadpool.starter.common.Constants;
+import io.dynamic.threadpool.common.constant.Constants;
import io.dynamic.threadpool.starter.listener.Listener;
import io.dynamic.threadpool.starter.wrap.ManagerListenerWrap;
import lombok.extern.slf4j.Slf4j;
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/ClientWorker.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/ClientWorker.java
index 84fd7c18..07dca73d 100644
--- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/ClientWorker.java
+++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/ClientWorker.java
@@ -1,24 +1,28 @@
package io.dynamic.threadpool.starter.listener;
+import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
+import io.dynamic.threadpool.common.constant.Constants;
import io.dynamic.threadpool.common.model.PoolParameterInfo;
import io.dynamic.threadpool.common.toolkit.ContentUtil;
+import io.dynamic.threadpool.common.toolkit.GroupKey;
import io.dynamic.threadpool.common.web.base.Result;
-import io.dynamic.threadpool.starter.common.Constants;
import io.dynamic.threadpool.starter.core.CacheData;
import io.dynamic.threadpool.starter.remote.HttpAgent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
+import java.net.URLDecoder;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
-import static io.dynamic.threadpool.starter.common.Constants.LINE_SEPARATOR;
-import static io.dynamic.threadpool.starter.common.Constants.WORD_SEPARATOR;
+import static io.dynamic.threadpool.common.constant.Constants.LINE_SEPARATOR;
+import static io.dynamic.threadpool.common.constant.Constants.WORD_SEPARATOR;
/**
* 客户端监听
@@ -102,25 +106,27 @@ public class ClientWorker {
@Override
public void run() {
List cacheDataList = new ArrayList();
- cacheMap.forEach((key, val) -> cacheDataList.add(val));
+ List queryCacheDataList = cacheMap.entrySet()
+ .stream().map(each -> each.getValue()).collect(Collectors.toList());
- List changedTpIds = checkUpdateDataIds(cacheDataList);
- if (!CollectionUtils.isEmpty(cacheDataList)) {
+ List changedTpIds = checkUpdateDataIds(queryCacheDataList);
+ if (CollectionUtils.isEmpty(changedTpIds)) {
log.info("[dynamic threadPool] tpIds changed :: {}", changedTpIds);
} else {
for (String each : changedTpIds) {
- String[] keys = each.split(",");
- String namespace = keys[0];
+ String[] keys = StrUtil.split(each, Constants.GROUP_KEY_DELIMITER);
+ String tpId = keys[0];
String itemId = keys[1];
- String tpId = keys[2];
+ String namespace = keys[2];
try {
String content = getServerConfig(namespace, itemId, tpId, 3000L);
CacheData cacheData = cacheMap.get(tpId);
- cacheData.setContent(content);
+ String poolContent = ContentUtil.getPoolContent(JSON.parseObject(content, PoolParameterInfo.class));
+ cacheData.setContent(poolContent);
cacheDataList.add(cacheData);
- log.info("[data-received] namespace :: {}, itemId :: {}, tpId :: {}, md5 :: {}, content :: {}",
- namespace, itemId, tpId, cacheData.getMd5(), content);
+ log.info("[data-received] namespace :: {}, itemId :: {}, tpId :: {}, md5 :: {}",
+ namespace, itemId, tpId, cacheData.getMd5());
} catch (Exception ex) {
// ignore
}
@@ -219,7 +225,35 @@ public class ClientWorker {
* @return
*/
public List parseUpdateDataIdResponse(String response) {
- return null;
+ if (StringUtils.isEmpty(response)) {
+ return Collections.emptyList();
+ }
+
+ try {
+ response = URLDecoder.decode(response, "UTF-8");
+ } catch (Exception e) {
+ log.error("[polling-resp] decode modifiedDataIdsString error", e);
+ }
+
+ List updateList = new LinkedList();
+ for (String dataIdAndGroup : response.split(LINE_SEPARATOR)) {
+ if (!StringUtils.isEmpty(dataIdAndGroup)) {
+ String[] keyArr = dataIdAndGroup.split(WORD_SEPARATOR);
+ String dataId = keyArr[0];
+ String group = keyArr[1];
+ if (keyArr.length == 2) {
+ updateList.add(GroupKey.getKey(dataId, group));
+ log.info("[{}] [polling-resp] config changed. dataId={}, group={}", dataId, group);
+ } else if (keyArr.length == 3) {
+ String tenant = keyArr[2];
+ updateList.add(GroupKey.getKeyTenant(dataId, group, tenant));
+ log.info("[polling-resp] config changed. dataId={}, group={}, tenant={}", dataId, group, tenant);
+ } else {
+ log.error("[{}] [polling-resp] invalid dataIdAndGroup error {}", dataIdAndGroup);
+ }
+ }
+ }
+ return updateList;
}
/**
diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/ThreadPoolRunListener.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/ThreadPoolRunListener.java
index 0b61d782..5fb8a2b4 100644
--- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/ThreadPoolRunListener.java
+++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/listener/ThreadPoolRunListener.java
@@ -2,7 +2,7 @@ package io.dynamic.threadpool.starter.listener;
import com.alibaba.fastjson.JSON;
import io.dynamic.threadpool.starter.common.CommonThreadPool;
-import io.dynamic.threadpool.starter.common.Constants;
+import io.dynamic.threadpool.common.constant.Constants;
import io.dynamic.threadpool.common.config.ApplicationContextHolder;
import io.dynamic.threadpool.starter.config.DynamicThreadPoolProperties;
import io.dynamic.threadpool.starter.core.GlobalThreadPoolManage;
diff --git a/example/src/main/java/io/dynamic/threadpool/example/config/ThreadPoolConfig.java b/example/src/main/java/io/dynamic/threadpool/example/config/ThreadPoolConfig.java
index b7843445..bbd530b2 100644
--- a/example/src/main/java/io/dynamic/threadpool/example/config/ThreadPoolConfig.java
+++ b/example/src/main/java/io/dynamic/threadpool/example/config/ThreadPoolConfig.java
@@ -18,4 +18,9 @@ public class ThreadPoolConfig {
return new DynamicThreadPoolWrap("message-consume");
}
+ @Bean
+ public DynamicThreadPoolWrap messageCenterProduceThreadPool() {
+ return new DynamicThreadPoolWrap("message-produce");
+ }
+
}
diff --git a/server/src/main/java/io/dynamic/threadpool/server/constant/Constants.java b/server/src/main/java/io/dynamic/threadpool/server/constant/Constants.java
deleted file mode 100644
index 866fb8f8..00000000
--- a/server/src/main/java/io/dynamic/threadpool/server/constant/Constants.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package io.dynamic.threadpool.server.constant;
-
-/**
- * 服务端常量类
- *
- * @author chen.ma
- * @date 2021/6/20 13:54
- */
-public class Constants {
-
- public static final String BASE_PATH = "/v1/cs";
-
- public static final String CONFIG_CONTROLLER_PATH = BASE_PATH + "/configs";
-
- public static final String LISTENING_CONFIGS = "Listening-Configs";
-
- public static final String ENCODE = "UTF-8";
-
- public static final String NULL = "";
-}
diff --git a/server/src/main/java/io/dynamic/threadpool/server/controller/ConfigController.java b/server/src/main/java/io/dynamic/threadpool/server/controller/ConfigController.java
index 2ff76df4..4b4b62d3 100644
--- a/server/src/main/java/io/dynamic/threadpool/server/controller/ConfigController.java
+++ b/server/src/main/java/io/dynamic/threadpool/server/controller/ConfigController.java
@@ -1,8 +1,9 @@
package io.dynamic.threadpool.server.controller;
+import io.dynamic.threadpool.common.constant.Constants;
+import io.dynamic.threadpool.common.toolkit.ContentUtil;
import io.dynamic.threadpool.common.web.base.Result;
import io.dynamic.threadpool.common.web.base.Results;
-import io.dynamic.threadpool.server.constant.Constants;
import io.dynamic.threadpool.server.event.LocalDataChangeEvent;
import io.dynamic.threadpool.server.model.ConfigAllInfo;
import io.dynamic.threadpool.server.model.ConfigInfoBase;
@@ -18,7 +19,6 @@ import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.net.URLDecoder;
-import java.sql.Timestamp;
import java.util.Map;
/**
@@ -49,13 +49,8 @@ public class ConfigController {
@PostMapping
public Result publishConfig(HttpServletRequest request, @RequestBody ConfigAllInfo config) {
configService.insertOrUpdate(config);
-
- long gmtModified = new Timestamp(System.currentTimeMillis()).getTime();
- /*ConfigChangePublisher
- .notifyConfigChange(new ConfigDataChangeEvent(config.getNamespace(), config.getItemId(), config.getTpId(), gmtModified));*/
-
ConfigChangePublisher
- .notifyConfigChange(new LocalDataChangeEvent(""));
+ .notifyConfigChange(new LocalDataChangeEvent(ContentUtil.getGroupKey(config)));
return Results.success(true);
}
diff --git a/server/src/main/java/io/dynamic/threadpool/server/model/CacheItem.java b/server/src/main/java/io/dynamic/threadpool/server/model/CacheItem.java
index e85b938d..1c5f9cfc 100644
--- a/server/src/main/java/io/dynamic/threadpool/server/model/CacheItem.java
+++ b/server/src/main/java/io/dynamic/threadpool/server/model/CacheItem.java
@@ -1,6 +1,6 @@
package io.dynamic.threadpool.server.model;
-import io.dynamic.threadpool.server.constant.Constants;
+import io.dynamic.threadpool.common.constant.Constants;
import io.dynamic.threadpool.server.toolkit.SimpleReadWriteLock;
import io.dynamic.threadpool.server.toolkit.SingletonRepository;
import lombok.Getter;
diff --git a/server/src/main/java/io/dynamic/threadpool/server/model/ConfigAllInfo.java b/server/src/main/java/io/dynamic/threadpool/server/model/ConfigAllInfo.java
index 518d4c0a..ac6b98d2 100644
--- a/server/src/main/java/io/dynamic/threadpool/server/model/ConfigAllInfo.java
+++ b/server/src/main/java/io/dynamic/threadpool/server/model/ConfigAllInfo.java
@@ -1,5 +1,6 @@
package io.dynamic.threadpool.server.model;
+import com.alibaba.fastjson.annotation.JSONField;
import io.dynamic.threadpool.common.model.PoolParameter;
import lombok.Data;
@@ -14,11 +15,15 @@ public class ConfigAllInfo extends ConfigInfo implements PoolParameter {
private static final long serialVersionUID = -2417394244017463665L;
+ @JSONField(serialize = false)
private String createUser;
+ @JSONField(serialize = false)
private String desc;
+ @JSONField(serialize = false)
private Long createTime;
+ @JSONField(serialize = false)
private Long modifyTime;
}
diff --git a/server/src/main/java/io/dynamic/threadpool/server/model/ConfigInfoBase.java b/server/src/main/java/io/dynamic/threadpool/server/model/ConfigInfoBase.java
index 1322e1a4..be2f269b 100644
--- a/server/src/main/java/io/dynamic/threadpool/server/model/ConfigInfoBase.java
+++ b/server/src/main/java/io/dynamic/threadpool/server/model/ConfigInfoBase.java
@@ -1,5 +1,6 @@
package io.dynamic.threadpool.server.model;
+import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import java.io.Serializable;
@@ -73,10 +74,12 @@ public class ConfigInfoBase implements Serializable {
/**
* MD5
*/
+ @JSONField(serialize = false)
private String md5;
/**
* 内容
*/
+ @JSONField(serialize = false)
private String content;
}
diff --git a/server/src/main/java/io/dynamic/threadpool/server/service/ConfigCacheService.java b/server/src/main/java/io/dynamic/threadpool/server/service/ConfigCacheService.java
index d55a09a0..f93fa8d8 100644
--- a/server/src/main/java/io/dynamic/threadpool/server/service/ConfigCacheService.java
+++ b/server/src/main/java/io/dynamic/threadpool/server/service/ConfigCacheService.java
@@ -1,8 +1,8 @@
package io.dynamic.threadpool.server.service;
import io.dynamic.threadpool.common.config.ApplicationContextHolder;
+import io.dynamic.threadpool.common.constant.Constants;
import io.dynamic.threadpool.common.toolkit.Md5Util;
-import io.dynamic.threadpool.server.constant.Constants;
import io.dynamic.threadpool.server.event.LocalDataChangeEvent;
import io.dynamic.threadpool.server.model.CacheItem;
import io.dynamic.threadpool.server.model.ConfigAllInfo;
@@ -25,7 +25,7 @@ public class ConfigCacheService {
private static final ConcurrentHashMap CACHE = new ConcurrentHashMap();
public static boolean isUpdateData(String groupKey, String md5, String ip) {
- String contentMd5 = ConfigCacheService.getContentMd5(groupKey, ip);
+ String contentMd5 = ConfigCacheService.getContentMd5IsNullPut(groupKey, ip);
return Objects.equals(contentMd5, md5);
}
@@ -38,7 +38,7 @@ public class ConfigCacheService {
* @param ip
* @return
*/
- private static String getContentMd5(String groupKey, String ip) {
+ private static String getContentMd5IsNullPut(String groupKey, String ip) {
CacheItem cacheItem = CACHE.get(groupKey);
if (cacheItem != null) {
return cacheItem.md5;
@@ -58,6 +58,21 @@ public class ConfigCacheService {
return (cacheItem != null) ? cacheItem.md5 : Constants.NULL;
}
+ public static String getContentMd5(String groupKey) {
+ if (configService == null) {
+ configService = ApplicationContextHolder.getBean(ConfigService.class);
+ }
+
+ String[] split = groupKey.split("\\+");
+ ConfigAllInfo config = configService.findConfigAllInfo(split[0], split[1], split[2]);
+ if (config == null || StringUtils.isEmpty(config.getTpId())) {
+ String errorMessage = String.format("config is null. tpId :: %s, itemId :: %s, namespace :: %s", split[0], split[1], split[2]);
+ throw new RuntimeException(errorMessage);
+ }
+
+ return Md5Util.getTpContentMd5(config);
+ }
+
public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
CacheItem cache = makeSure(groupKey);
if (cache.md5 == null || !cache.md5.equals(md5)) {
diff --git a/server/src/main/java/io/dynamic/threadpool/server/service/LongPollingService.java b/server/src/main/java/io/dynamic/threadpool/server/service/LongPollingService.java
index 93945464..822df297 100644
--- a/server/src/main/java/io/dynamic/threadpool/server/service/LongPollingService.java
+++ b/server/src/main/java/io/dynamic/threadpool/server/service/LongPollingService.java
@@ -1,6 +1,7 @@
package io.dynamic.threadpool.server.service;
import com.alibaba.fastjson.JSON;
+import io.dynamic.threadpool.common.toolkit.Md5Util;
import io.dynamic.threadpool.common.web.base.Results;
import io.dynamic.threadpool.server.event.Event;
import io.dynamic.threadpool.server.event.LocalDataChangeEvent;
@@ -94,6 +95,7 @@ public class LongPollingService {
if (clientSub.clientMd5Map.containsKey(groupKey)) {
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
+ ConfigCacheService.updateMd5(groupKey, ConfigCacheService.getContentMd5(groupKey), System.currentTimeMillis());
iter.remove();
clientSub.sendResponse(Arrays.asList(groupKey));
}
@@ -214,14 +216,14 @@ public class LongPollingService {
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
try {
- String respString = JSON.toJSONString(Results.success(changedGroups));
+ String respStr = Md5Util.compareMd5ResultString(changedGroups);
+ String resultStr = JSON.toJSONString(Results.success(respStr));
- // Disable cache.
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
- response.getWriter().println(respString);
+ response.getWriter().println(resultStr);
asyncContext.complete();
} catch (Exception ex) {
log.error(ex.toString(), ex);
diff --git a/server/src/main/java/io/dynamic/threadpool/server/toolkit/Md5ConfigUtil.java b/server/src/main/java/io/dynamic/threadpool/server/toolkit/Md5ConfigUtil.java
index 5b690b5f..cebcee81 100644
--- a/server/src/main/java/io/dynamic/threadpool/server/toolkit/Md5ConfigUtil.java
+++ b/server/src/main/java/io/dynamic/threadpool/server/toolkit/Md5ConfigUtil.java
@@ -1,5 +1,6 @@
package io.dynamic.threadpool.server.toolkit;
+import io.dynamic.threadpool.common.toolkit.GroupKey;
import io.dynamic.threadpool.server.model.ConfigAllInfo;
import io.dynamic.threadpool.server.service.ConfigCacheService;
import io.dynamic.threadpool.common.toolkit.Md5Util;
@@ -101,34 +102,22 @@ public class Md5ConfigUtil {
public static String getKey(String dataId, String group) {
StringBuilder sb = new StringBuilder();
- urlEncode(dataId, sb);
+ GroupKey.urlEncode(dataId, sb);
sb.append('+');
- urlEncode(group, sb);
+ GroupKey.urlEncode(group, sb);
return sb.toString();
}
public static String getKey(String dataId, String group, String tenant) {
StringBuilder sb = new StringBuilder();
- urlEncode(dataId, sb);
+ GroupKey.urlEncode(dataId, sb);
sb.append('+');
- urlEncode(group, sb);
+ GroupKey.urlEncode(group, sb);
if (!StringUtils.isEmpty(tenant)) {
sb.append('+');
- urlEncode(tenant, sb);
+ GroupKey.urlEncode(tenant, sb);
}
return sb.toString();
}
- static void urlEncode(String str, StringBuilder sb) {
- for (int idx = 0; idx < str.length(); ++idx) {
- char c = str.charAt(idx);
- if ('+' == c) {
- sb.append("%2B");
- } else if ('%' == c) {
- sb.append("%25");
- } else {
- sb.append(c);
- }
- }
- }
}