feat: 功能持续更新.

pull/161/head
chen.ma 3 years ago
parent 68f7653558
commit 2bb07c11c3

@ -25,6 +25,11 @@
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
</project>

@ -1,4 +1,4 @@
package io.dynamic.threadpool.starter.common;
package io.dynamic.threadpool.common.constant;
/**
* Constants
@ -35,4 +35,9 @@ public class Constants {
public static final String PROBE_MODIFY_REQUEST = "Listening-Configs";
public static final String LONG_PULLING_TIMEOUT = "Long-Pulling-Timeout";
public static final String LISTENING_CONFIGS = "Listening-Configs";
public static final String GROUP_KEY_DELIMITER = "+";
}

@ -1,6 +1,9 @@
package io.dynamic.threadpool.common.toolkit;
import com.alibaba.fastjson.JSON;
import io.dynamic.threadpool.common.constant.Constants;
import io.dynamic.threadpool.common.model.PoolParameter;
import io.dynamic.threadpool.common.model.PoolParameterInfo;
/**
* Content Util.
@ -10,9 +13,9 @@ import io.dynamic.threadpool.common.model.PoolParameter;
*/
public class ContentUtil {
public static String getPoolContent(PoolParameter parameter) {
/*public static String getPoolContent(PoolParameter parameter) {
StringBuilder stringBuilder = new StringBuilder();
String targetStr = stringBuilder.append(parameter.getCoreSize())
String resultStr = stringBuilder.append(parameter.getCoreSize())
.append(parameter.getMaxSize())
.append(parameter.getQueueType())
.append(parameter.getCapacity())
@ -21,6 +24,33 @@ public class ContentUtil {
.append(parameter.getCapacityAlarm())
.append(parameter.getLivenessAlarm())
.toString();
return targetStr;
return resultStr;
}*/
public static String getPoolContent(PoolParameter parameter) {
PoolParameterInfo poolInfo = new PoolParameterInfo();
poolInfo.setNamespace(parameter.getNamespace());
poolInfo.setItemId(parameter.getItemId());
poolInfo.setTpId(parameter.getTpId());
poolInfo.setCoreSize(parameter.getCoreSize());
poolInfo.setMaxSize(parameter.getMaxSize());
poolInfo.setQueueType(parameter.getQueueType());
poolInfo.setCapacity(parameter.getCapacity());
poolInfo.setKeepAliveTime(parameter.getKeepAliveTime());
poolInfo.setIsAlarm(parameter.getIsAlarm());
poolInfo.setCapacityAlarm(parameter.getCapacityAlarm());
poolInfo.setLivenessAlarm(parameter.getLivenessAlarm());
return JSON.toJSONString(poolInfo);
}
public static String getGroupKey(PoolParameter parameter) {
StringBuilder stringBuilder = new StringBuilder();
String resultStr = stringBuilder.append(parameter.getTpId())
.append(Constants.GROUP_KEY_DELIMITER)
.append(parameter.getItemId())
.append(Constants.GROUP_KEY_DELIMITER)
.append(parameter.getNamespace())
.toString();
return resultStr;
}
}

@ -0,0 +1,106 @@
package io.dynamic.threadpool.common.toolkit;
import org.springframework.util.StringUtils;
/**
* Group Key
*
* @author chen.ma
* @date 2021/6/24 21:12
*/
public class GroupKey {
public static String getKey(String dataId, String group) {
return getKey(dataId, group, "");
}
public static String getKey(String dataId, String group, String datumStr) {
return doGetKey(dataId, group, datumStr);
}
public static String getKeyTenant(String dataId, String group, String tenant) {
return doGetKey(dataId, group, tenant);
}
private static String doGetKey(String dataId, String group, String datumStr) {
StringBuilder sb = new StringBuilder();
urlEncode(dataId, sb);
sb.append('+');
urlEncode(group, sb);
if (!StringUtils.isEmpty(datumStr)) {
sb.append('+');
urlEncode(datumStr, sb);
}
return sb.toString();
}
/**
* Parse key.
*
* @param groupKey group key
* @return parsed key
*/
public static String[] parseKey(String groupKey) {
StringBuilder sb = new StringBuilder();
String dataId = null;
String group = null;
String tenant = null;
for (int i = 0; i < groupKey.length(); ++i) {
char c = groupKey.charAt(i);
if ('+' == c) {
if (null == dataId) {
dataId = sb.toString();
sb.setLength(0);
} else if (null == group) {
group = sb.toString();
sb.setLength(0);
} else {
throw new IllegalArgumentException("invalid groupkey:" + groupKey);
}
} else if ('%' == c) {
char next = groupKey.charAt(++i);
char nextnext = groupKey.charAt(++i);
if ('2' == next && 'B' == nextnext) {
sb.append('+');
} else if ('2' == next && '5' == nextnext) {
sb.append('%');
} else {
throw new IllegalArgumentException("invalid groupkey:" + groupKey);
}
} else {
sb.append(c);
}
}
if (StringUtils.isEmpty(group)) {
group = sb.toString();
if (group.length() == 0) {
throw new IllegalArgumentException("invalid groupkey:" + groupKey);
}
} else {
tenant = sb.toString();
if (group.length() == 0) {
throw new IllegalArgumentException("invalid groupkey:" + groupKey);
}
}
return new String[]{dataId, group, tenant};
}
/**
* + -> %2B % -> %25.
*/
public static void urlEncode(String str, StringBuilder sb) {
for (int idx = 0; idx < str.length(); ++idx) {
char c = str.charAt(idx);
if ('+' == c) {
sb.append("%2B");
} else if ('%' == c) {
sb.append("%25");
} else {
sb.append(c);
}
}
}
}

@ -1,9 +1,16 @@
package io.dynamic.threadpool.common.toolkit;
import io.dynamic.threadpool.common.constant.Constants;
import io.dynamic.threadpool.common.model.PoolParameter;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.net.URLEncoder;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import static io.dynamic.threadpool.common.constant.Constants.WORD_SEPARATOR;
/**
* MD5 Util.
@ -58,4 +65,31 @@ public class Md5Util {
public static String getTpContentMd5(PoolParameter config) {
return Md5Util.md5Hex(ContentUtil.getPoolContent(config), "UTF-8");
}
public static String compareMd5ResultString(List<String> changedGroupKeys) throws IOException {
if (null == changedGroupKeys) {
return "";
}
StringBuilder sb = new StringBuilder();
for (String groupKey : changedGroupKeys) {
String[] dataIdGroupId = GroupKey.parseKey(groupKey);
sb.append(dataIdGroupId[0]);
sb.append(WORD_SEPARATOR);
sb.append(dataIdGroupId[1]);
// if have tenant, then set it
if (dataIdGroupId.length == 3) {
if (!StringUtils.isEmpty(dataIdGroupId[2])) {
sb.append(WORD_SEPARATOR);
sb.append(dataIdGroupId[2]);
}
}
sb.append(Constants.LINE_SEPARATOR);
}
// To encode WORD_SEPARATOR and LINE_SEPARATOR invisible characters, encoded value is %02 and %01
return URLEncoder.encode(sb.toString(), "UTF-8");
}
}

@ -2,7 +2,7 @@ package io.dynamic.threadpool.starter.core;
import io.dynamic.threadpool.common.toolkit.ContentUtil;
import io.dynamic.threadpool.common.toolkit.Md5Util;
import io.dynamic.threadpool.starter.common.Constants;
import io.dynamic.threadpool.common.constant.Constants;
import io.dynamic.threadpool.starter.listener.Listener;
import io.dynamic.threadpool.starter.wrap.ManagerListenerWrap;
import lombok.extern.slf4j.Slf4j;

@ -1,24 +1,28 @@
package io.dynamic.threadpool.starter.listener;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import io.dynamic.threadpool.common.constant.Constants;
import io.dynamic.threadpool.common.model.PoolParameterInfo;
import io.dynamic.threadpool.common.toolkit.ContentUtil;
import io.dynamic.threadpool.common.toolkit.GroupKey;
import io.dynamic.threadpool.common.web.base.Result;
import io.dynamic.threadpool.starter.common.Constants;
import io.dynamic.threadpool.starter.core.CacheData;
import io.dynamic.threadpool.starter.remote.HttpAgent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.net.URLDecoder;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static io.dynamic.threadpool.starter.common.Constants.LINE_SEPARATOR;
import static io.dynamic.threadpool.starter.common.Constants.WORD_SEPARATOR;
import static io.dynamic.threadpool.common.constant.Constants.LINE_SEPARATOR;
import static io.dynamic.threadpool.common.constant.Constants.WORD_SEPARATOR;
/**
*
@ -102,25 +106,27 @@ public class ClientWorker {
@Override
public void run() {
List<CacheData> cacheDataList = new ArrayList();
cacheMap.forEach((key, val) -> cacheDataList.add(val));
List<CacheData> queryCacheDataList = cacheMap.entrySet()
.stream().map(each -> each.getValue()).collect(Collectors.toList());
List<String> changedTpIds = checkUpdateDataIds(cacheDataList);
if (!CollectionUtils.isEmpty(cacheDataList)) {
List<String> changedTpIds = checkUpdateDataIds(queryCacheDataList);
if (CollectionUtils.isEmpty(changedTpIds)) {
log.info("[dynamic threadPool] tpIds changed :: {}", changedTpIds);
} else {
for (String each : changedTpIds) {
String[] keys = each.split(",");
String namespace = keys[0];
String[] keys = StrUtil.split(each, Constants.GROUP_KEY_DELIMITER);
String tpId = keys[0];
String itemId = keys[1];
String tpId = keys[2];
String namespace = keys[2];
try {
String content = getServerConfig(namespace, itemId, tpId, 3000L);
CacheData cacheData = cacheMap.get(tpId);
cacheData.setContent(content);
String poolContent = ContentUtil.getPoolContent(JSON.parseObject(content, PoolParameterInfo.class));
cacheData.setContent(poolContent);
cacheDataList.add(cacheData);
log.info("[data-received] namespace :: {}, itemId :: {}, tpId :: {}, md5 :: {}, content :: {}",
namespace, itemId, tpId, cacheData.getMd5(), content);
log.info("[data-received] namespace :: {}, itemId :: {}, tpId :: {}, md5 :: {}",
namespace, itemId, tpId, cacheData.getMd5());
} catch (Exception ex) {
// ignore
}
@ -219,7 +225,35 @@ public class ClientWorker {
* @return
*/
public List<String> parseUpdateDataIdResponse(String response) {
return null;
if (StringUtils.isEmpty(response)) {
return Collections.emptyList();
}
try {
response = URLDecoder.decode(response, "UTF-8");
} catch (Exception e) {
log.error("[polling-resp] decode modifiedDataIdsString error", e);
}
List<String> updateList = new LinkedList();
for (String dataIdAndGroup : response.split(LINE_SEPARATOR)) {
if (!StringUtils.isEmpty(dataIdAndGroup)) {
String[] keyArr = dataIdAndGroup.split(WORD_SEPARATOR);
String dataId = keyArr[0];
String group = keyArr[1];
if (keyArr.length == 2) {
updateList.add(GroupKey.getKey(dataId, group));
log.info("[{}] [polling-resp] config changed. dataId={}, group={}", dataId, group);
} else if (keyArr.length == 3) {
String tenant = keyArr[2];
updateList.add(GroupKey.getKeyTenant(dataId, group, tenant));
log.info("[polling-resp] config changed. dataId={}, group={}, tenant={}", dataId, group, tenant);
} else {
log.error("[{}] [polling-resp] invalid dataIdAndGroup error {}", dataIdAndGroup);
}
}
}
return updateList;
}
/**

@ -2,7 +2,7 @@ package io.dynamic.threadpool.starter.listener;
import com.alibaba.fastjson.JSON;
import io.dynamic.threadpool.starter.common.CommonThreadPool;
import io.dynamic.threadpool.starter.common.Constants;
import io.dynamic.threadpool.common.constant.Constants;
import io.dynamic.threadpool.common.config.ApplicationContextHolder;
import io.dynamic.threadpool.starter.config.DynamicThreadPoolProperties;
import io.dynamic.threadpool.starter.core.GlobalThreadPoolManage;

@ -18,4 +18,9 @@ public class ThreadPoolConfig {
return new DynamicThreadPoolWrap("message-consume");
}
@Bean
public DynamicThreadPoolWrap messageCenterProduceThreadPool() {
return new DynamicThreadPoolWrap("message-produce");
}
}

@ -1,20 +0,0 @@
package io.dynamic.threadpool.server.constant;
/**
*
*
* @author chen.ma
* @date 2021/6/20 13:54
*/
public class Constants {
public static final String BASE_PATH = "/v1/cs";
public static final String CONFIG_CONTROLLER_PATH = BASE_PATH + "/configs";
public static final String LISTENING_CONFIGS = "Listening-Configs";
public static final String ENCODE = "UTF-8";
public static final String NULL = "";
}

@ -1,8 +1,9 @@
package io.dynamic.threadpool.server.controller;
import io.dynamic.threadpool.common.constant.Constants;
import io.dynamic.threadpool.common.toolkit.ContentUtil;
import io.dynamic.threadpool.common.web.base.Result;
import io.dynamic.threadpool.common.web.base.Results;
import io.dynamic.threadpool.server.constant.Constants;
import io.dynamic.threadpool.server.event.LocalDataChangeEvent;
import io.dynamic.threadpool.server.model.ConfigAllInfo;
import io.dynamic.threadpool.server.model.ConfigInfoBase;
@ -18,7 +19,6 @@ import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.net.URLDecoder;
import java.sql.Timestamp;
import java.util.Map;
/**
@ -49,13 +49,8 @@ public class ConfigController {
@PostMapping
public Result<Boolean> publishConfig(HttpServletRequest request, @RequestBody ConfigAllInfo config) {
configService.insertOrUpdate(config);
long gmtModified = new Timestamp(System.currentTimeMillis()).getTime();
/*ConfigChangePublisher
.notifyConfigChange(new ConfigDataChangeEvent(config.getNamespace(), config.getItemId(), config.getTpId(), gmtModified));*/
ConfigChangePublisher
.notifyConfigChange(new LocalDataChangeEvent(""));
.notifyConfigChange(new LocalDataChangeEvent(ContentUtil.getGroupKey(config)));
return Results.success(true);
}

@ -1,6 +1,6 @@
package io.dynamic.threadpool.server.model;
import io.dynamic.threadpool.server.constant.Constants;
import io.dynamic.threadpool.common.constant.Constants;
import io.dynamic.threadpool.server.toolkit.SimpleReadWriteLock;
import io.dynamic.threadpool.server.toolkit.SingletonRepository;
import lombok.Getter;

@ -1,5 +1,6 @@
package io.dynamic.threadpool.server.model;
import com.alibaba.fastjson.annotation.JSONField;
import io.dynamic.threadpool.common.model.PoolParameter;
import lombok.Data;
@ -14,11 +15,15 @@ public class ConfigAllInfo extends ConfigInfo implements PoolParameter {
private static final long serialVersionUID = -2417394244017463665L;
@JSONField(serialize = false)
private String createUser;
@JSONField(serialize = false)
private String desc;
@JSONField(serialize = false)
private Long createTime;
@JSONField(serialize = false)
private Long modifyTime;
}

@ -1,5 +1,6 @@
package io.dynamic.threadpool.server.model;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import java.io.Serializable;
@ -73,10 +74,12 @@ public class ConfigInfoBase implements Serializable {
/**
* MD5
*/
@JSONField(serialize = false)
private String md5;
/**
*
*/
@JSONField(serialize = false)
private String content;
}

@ -1,8 +1,8 @@
package io.dynamic.threadpool.server.service;
import io.dynamic.threadpool.common.config.ApplicationContextHolder;
import io.dynamic.threadpool.common.constant.Constants;
import io.dynamic.threadpool.common.toolkit.Md5Util;
import io.dynamic.threadpool.server.constant.Constants;
import io.dynamic.threadpool.server.event.LocalDataChangeEvent;
import io.dynamic.threadpool.server.model.CacheItem;
import io.dynamic.threadpool.server.model.ConfigAllInfo;
@ -25,7 +25,7 @@ public class ConfigCacheService {
private static final ConcurrentHashMap<String, CacheItem> CACHE = new ConcurrentHashMap();
public static boolean isUpdateData(String groupKey, String md5, String ip) {
String contentMd5 = ConfigCacheService.getContentMd5(groupKey, ip);
String contentMd5 = ConfigCacheService.getContentMd5IsNullPut(groupKey, ip);
return Objects.equals(contentMd5, md5);
}
@ -38,7 +38,7 @@ public class ConfigCacheService {
* @param ip
* @return
*/
private static String getContentMd5(String groupKey, String ip) {
private static String getContentMd5IsNullPut(String groupKey, String ip) {
CacheItem cacheItem = CACHE.get(groupKey);
if (cacheItem != null) {
return cacheItem.md5;
@ -58,6 +58,21 @@ public class ConfigCacheService {
return (cacheItem != null) ? cacheItem.md5 : Constants.NULL;
}
public static String getContentMd5(String groupKey) {
if (configService == null) {
configService = ApplicationContextHolder.getBean(ConfigService.class);
}
String[] split = groupKey.split("\\+");
ConfigAllInfo config = configService.findConfigAllInfo(split[0], split[1], split[2]);
if (config == null || StringUtils.isEmpty(config.getTpId())) {
String errorMessage = String.format("config is null. tpId :: %s, itemId :: %s, namespace :: %s", split[0], split[1], split[2]);
throw new RuntimeException(errorMessage);
}
return Md5Util.getTpContentMd5(config);
}
public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
CacheItem cache = makeSure(groupKey);
if (cache.md5 == null || !cache.md5.equals(md5)) {

@ -1,6 +1,7 @@
package io.dynamic.threadpool.server.service;
import com.alibaba.fastjson.JSON;
import io.dynamic.threadpool.common.toolkit.Md5Util;
import io.dynamic.threadpool.common.web.base.Results;
import io.dynamic.threadpool.server.event.Event;
import io.dynamic.threadpool.server.event.LocalDataChangeEvent;
@ -94,6 +95,7 @@ public class LongPollingService {
if (clientSub.clientMd5Map.containsKey(groupKey)) {
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
ConfigCacheService.updateMd5(groupKey, ConfigCacheService.getContentMd5(groupKey), System.currentTimeMillis());
iter.remove();
clientSub.sendResponse(Arrays.asList(groupKey));
}
@ -214,14 +216,14 @@ public class LongPollingService {
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
try {
String respString = JSON.toJSONString(Results.success(changedGroups));
String respStr = Md5Util.compareMd5ResultString(changedGroups);
String resultStr = JSON.toJSONString(Results.success(respStr));
// Disable cache.
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().println(respString);
response.getWriter().println(resultStr);
asyncContext.complete();
} catch (Exception ex) {
log.error(ex.toString(), ex);

@ -1,5 +1,6 @@
package io.dynamic.threadpool.server.toolkit;
import io.dynamic.threadpool.common.toolkit.GroupKey;
import io.dynamic.threadpool.server.model.ConfigAllInfo;
import io.dynamic.threadpool.server.service.ConfigCacheService;
import io.dynamic.threadpool.common.toolkit.Md5Util;
@ -101,34 +102,22 @@ public class Md5ConfigUtil {
public static String getKey(String dataId, String group) {
StringBuilder sb = new StringBuilder();
urlEncode(dataId, sb);
GroupKey.urlEncode(dataId, sb);
sb.append('+');
urlEncode(group, sb);
GroupKey.urlEncode(group, sb);
return sb.toString();
}
public static String getKey(String dataId, String group, String tenant) {
StringBuilder sb = new StringBuilder();
urlEncode(dataId, sb);
GroupKey.urlEncode(dataId, sb);
sb.append('+');
urlEncode(group, sb);
GroupKey.urlEncode(group, sb);
if (!StringUtils.isEmpty(tenant)) {
sb.append('+');
urlEncode(tenant, sb);
GroupKey.urlEncode(tenant, sb);
}
return sb.toString();
}
static void urlEncode(String str, StringBuilder sb) {
for (int idx = 0; idx < str.length(); ++idx) {
char c = str.charAt(idx);
if ('+' == c) {
sb.append("%2B");
} else if ('%' == c) {
sb.append("%25");
} else {
sb.append(c);
}
}
}
}

Loading…
Cancel
Save