feat: 功能持续更新.

pull/161/head
chen.ma 4 years ago
parent ded7622277
commit 0a315a0537

@ -19,4 +19,12 @@
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
</dependencies>
</project>

@ -1,4 +1,4 @@
package io.dynamic.threadpool.starter.config;
package io.dynamic.threadpool.common.config;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;

@ -1,4 +1,4 @@
package io.dynamic.threadpool.starter.config;
package io.dynamic.threadpool.common.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@ -1,4 +1,4 @@
package io.dynamict.hreadpool.common.executor;
package io.dynamic.threadpool.common.executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

@ -1,4 +1,4 @@
package io.dynamict.hreadpool.common.executor;
package io.dynamic.threadpool.common.executor;
import java.util.HashMap;
import java.util.HashSet;
@ -6,6 +6,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Thread Pool Manager.
@ -21,10 +22,20 @@ public class ThreadPoolManager {
private static final ThreadPoolManager INSTANCE = new ThreadPoolManager();
private static final AtomicBoolean CLOSED = new AtomicBoolean(false);
public static ThreadPoolManager getInstance() {
return INSTANCE;
}
static {
INSTANCE.init();
}
private void init() {
resourcesManager = new ConcurrentHashMap<String, Map<String, Set<ExecutorService>>>(8);
}
public void register(String namespace, String group, ExecutorService executor) {
if (!resourcesManager.containsKey(namespace)) {
synchronized (this) {

@ -1,4 +1,4 @@
package io.dynamict.hreadpool.common.model;
package io.dynamic.threadpool.common.model;
import lombok.Getter;
import lombok.Setter;

@ -0,0 +1,33 @@
package io.dynamic.threadpool.common.model;
/**
* Pool Parameter.
*
* @author chen.ma
* @date 2021/6/24 16:04
*/
public interface PoolParameter {
String getNamespace();
String getItemId();
String getTpId();
Integer getCoreSize();
Integer getMaxSize();
Integer getQueueType();
Integer getCapacity();
Integer getKeepAliveTime();
Integer getIsAlarm();
Integer getCapacityAlarm();
Integer getLivenessAlarm();
}

@ -1,4 +1,4 @@
package io.dynamic.threadpool.starter.model;
package io.dynamic.threadpool.common.model;
import lombok.Data;
@ -11,7 +11,7 @@ import java.io.Serializable;
* @date 2021/6/16 23:18
*/
@Data
public class PoolParameterInfo implements Serializable {
public class PoolParameterInfo implements PoolParameter, Serializable {
private static final long serialVersionUID = -7123935122108553864L;
@ -59,4 +59,20 @@ public class PoolParameterInfo implements Serializable {
* 线
*/
private Integer keepAliveTime;
/**
*
*/
private Integer isAlarm;
/**
*
*/
private Integer capacityAlarm;
/**
*
*/
private Integer livenessAlarm;
}

@ -0,0 +1,26 @@
package io.dynamic.threadpool.common.toolkit;
import io.dynamic.threadpool.common.model.PoolParameter;
/**
* Content Util.
*
* @author chen.ma
* @date 2021/6/24 16:13
*/
public class ContentUtil {
public static String getPoolContent(PoolParameter parameter) {
StringBuilder stringBuilder = new StringBuilder();
String targetStr = stringBuilder.append(parameter.getCoreSize())
.append(parameter.getMaxSize())
.append(parameter.getQueueType())
.append(parameter.getCapacity())
.append(parameter.getKeepAliveTime())
.append(parameter.getIsAlarm())
.append(parameter.getCapacityAlarm())
.append(parameter.getLivenessAlarm())
.toString();
return targetStr;
}
}

@ -1,4 +1,6 @@
package io.dynamict.hreadpool.common.toolkit;
package io.dynamic.threadpool.common.toolkit;
import io.dynamic.threadpool.common.model.PoolParameter;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
@ -53,4 +55,14 @@ public class Md5Util {
return new String(out);
}
/**
* ThreadPool Md5
*
* @param config
* @return
*/
public static String getTpContentMd5(PoolParameter config) {
return Md5Util.md5Hex(ContentUtil.getPoolContent(config), "UTF-8");
}
}

@ -1,4 +1,4 @@
package io.dynamict.hreadpool.common.web.base;
package io.dynamic.threadpool.common.web.base;
import lombok.Data;
import lombok.experimental.Accessors;

@ -1,7 +1,7 @@
package io.dynamict.hreadpool.common.web.base;
package io.dynamic.threadpool.common.web.base;
import io.dynamict.hreadpool.common.web.exception.ErrorCode;
import io.dynamict.hreadpool.common.web.exception.ServiceException;
import io.dynamic.threadpool.common.web.exception.ErrorCode;
import io.dynamic.threadpool.common.web.exception.ServiceException;
/**
* Result

@ -1,4 +1,4 @@
package io.dynamict.hreadpool.common.web.exception;
package io.dynamic.threadpool.common.web.exception;
/**
*

@ -1,4 +1,4 @@
package io.dynamict.hreadpool.common.web.exception;
package io.dynamic.threadpool.common.web.exception;
import lombok.Data;
import lombok.EqualsAndHashCode;

@ -1,10 +1,11 @@
package io.dynamic.threadpool.starter.adapter;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import io.dynamic.threadpool.starter.config.ApplicationContextHolder;
import io.dynamic.threadpool.common.config.ApplicationContextHolder;
import io.dynamic.threadpool.starter.operation.ThreadPoolOperation;
import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
@ -35,6 +36,7 @@ public class ThreadPoolConfigAdapter extends ConfigAdapter {
new ThreadFactoryBuilder().setNamePrefix("threadPool-config").build(),
new ThreadPoolExecutor.DiscardOldestPolicy());
@Order(1025)
@PostConstruct
public void subscribeConfig() {
Map<String, DynamicThreadPoolWrap> executorMap =

@ -20,11 +20,17 @@ public class Constants {
public static final int CONFIG_LONG_POLL_TIMEOUT = 30000;
public static final String LINE_SEPARATOR = Character.toString((char) 1);
public static final String WORD_SEPARATOR = Character.toString((char) 2);
public static final String LONGPOLLING_LINE_SEPARATOR = "\r\n";
public static final String BASE_PATH = "/v1/cs";
public static final String CONFIG_CONTROLLER_PATH = BASE_PATH + "/configs";
public static final String LISTENER_PATH = BASE_PATH + CONFIG_CONTROLLER_PATH + "/listener";
public static final String LISTENER_PATH = CONFIG_CONTROLLER_PATH + "/listener";
public static final String PROBE_MODIFY_REQUEST = "Listening-Configs";

@ -1,9 +1,10 @@
package io.dynamic.threadpool.starter.core;
import io.dynamic.threadpool.common.toolkit.ContentUtil;
import io.dynamic.threadpool.common.toolkit.Md5Util;
import io.dynamic.threadpool.starter.common.Constants;
import io.dynamic.threadpool.starter.listener.Listener;
import io.dynamic.threadpool.starter.wrap.ManagerListenerWrap;
import io.dynamict.hreadpool.common.toolkit.Md5Util;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CopyOnWriteArrayList;
@ -21,6 +22,10 @@ public class CacheData {
public volatile String content;
public final String namespace;
public final String itemId;
public final String tpId;
private int taskId;
@ -29,10 +34,11 @@ public class CacheData {
private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
public CacheData(String tpId) {
public CacheData(String namespace, String itemId, String tpId) {
this.namespace = namespace;
this.itemId = itemId;
this.tpId = tpId;
// TODOnacos 走的本地文件获取, 这里思考下如何优雅获取
this.content = null;
this.content = ContentUtil.getPoolContent(GlobalThreadPoolManage.getPoolParameter(tpId));
this.md5 = getMd5String(content);
this.listeners = new CopyOnWriteArrayList();

@ -1,6 +1,7 @@
package io.dynamic.threadpool.starter.core;
import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap;
import io.dynamic.threadpool.common.model.PoolParameter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -13,17 +14,28 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class GlobalThreadPoolManage {
private static final Map<String, PoolParameter> POOL_PARAMETER = new ConcurrentHashMap();
private static final Map<String, DynamicThreadPoolWrap> EXECUTOR_MAP = new ConcurrentHashMap();
public static DynamicThreadPoolWrap getExecutorService(String tpId) {
return EXECUTOR_MAP.get(tpId);
}
public static void register(String tpId, DynamicThreadPoolWrap executor) {
public static PoolParameter getPoolParameter(String tpId) {
return POOL_PARAMETER.get(tpId);
}
public static void register(String tpId, PoolParameter poolParameter, DynamicThreadPoolWrap executor) {
registerPool(tpId, executor);
registerPoolParameter(tpId, poolParameter);
}
public static void registerPool(String tpId, DynamicThreadPoolWrap executor) {
EXECUTOR_MAP.put(tpId, executor);
}
public static void remove(String tpId) {
EXECUTOR_MAP.remove(tpId);
public static void registerPoolParameter(String tpId, PoolParameter poolParameter) {
POOL_PARAMETER.put(tpId, poolParameter);
}
}

@ -1,7 +1,7 @@
package io.dynamic.threadpool.starter.core;
import com.alibaba.fastjson.JSON;
import io.dynamic.threadpool.starter.model.PoolParameterInfo;
import io.dynamic.threadpool.common.model.PoolParameterInfo;
import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap;
import lombok.extern.slf4j.Slf4j;

@ -1,11 +1,12 @@
package io.dynamic.threadpool.starter.listener;
import com.alibaba.fastjson.JSON;
import io.dynamic.threadpool.common.model.PoolParameterInfo;
import io.dynamic.threadpool.common.toolkit.ContentUtil;
import io.dynamic.threadpool.common.web.base.Result;
import io.dynamic.threadpool.starter.common.Constants;
import io.dynamic.threadpool.starter.core.CacheData;
import io.dynamic.threadpool.starter.remote.HttpAgent;
import io.dynamict.hreadpool.common.model.GlobalRemotePoolInfo;
import io.dynamict.hreadpool.common.web.base.Result;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
@ -16,6 +17,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static io.dynamic.threadpool.starter.common.Constants.LINE_SEPARATOR;
import static io.dynamic.threadpool.starter.common.Constants.WORD_SEPARATOR;
/**
*
*
@ -98,61 +102,82 @@ public class ClientWorker {
@Override
public void run() {
List<CacheData> cacheDataList = new ArrayList();
cacheMap.forEach((key, val) -> cacheDataList.add(val));
List<String> changedTpIds = checkUpdateTpIds(cacheDataList);
List<String> changedTpIds = checkUpdateDataIds(cacheDataList);
if (!CollectionUtils.isEmpty(cacheDataList)) {
log.info("[dynamic threadPool] tpIds changed :: {}", changedTpIds);
}
} else {
for (String each : changedTpIds) {
String[] keys = each.split(",");
String namespace = keys[0];
String itemId = keys[1];
String tpId = keys[2];
try {
String content = getServerConfig(namespace, itemId, tpId, 3000L);
CacheData cacheData = cacheMap.get(tpId);
cacheData.setContent(content);
cacheDataList.add(cacheData);
log.info("[data-received] namespace :: {}, itemId :: {}, tpId :: {}, md5 :: {}, content :: {}",
namespace, itemId, tpId, cacheData.getMd5(), content);
} catch (Exception ex) {
// ignore
}
}
for (String each : changedTpIds) {
String[] keys = each.split(",");
String namespace = keys[0];
String itemId = keys[1];
String tpId = keys[2];
try {
String content = getServerConfig(namespace, itemId, tpId, 3000L);
CacheData cacheData = cacheMap.get(tpId);
cacheData.setContent(content);
cacheDataList.add(cacheData);
log.info("[data-received] namespace :: {}, itemId :: {}, tpId :: {}, md5 :: {}, content :: {}",
namespace, itemId, tpId, cacheData.getMd5(), content);
} catch (Exception ex) {
// ignore
for (CacheData each : cacheDataList) {
each.checkListenerMd5();
}
}
for (CacheData each : cacheDataList) {
each.checkListenerMd5();
}
executorService.execute(this);
}
}
/**
* 线 ID
*
*
* @param cacheDataList
* @return
*/
public List<String> checkUpdateTpIds(List<CacheData> cacheDataList) {
private List<String> checkUpdateDataIds(List<CacheData> cacheDataList) {
StringBuilder sb = new StringBuilder();
for (CacheData cacheData : cacheDataList) {
sb.append(cacheData.tpId).append(WORD_SEPARATOR);
sb.append(cacheData.itemId).append(WORD_SEPARATOR);
sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
sb.append(cacheData.namespace).append(LINE_SEPARATOR);
}
return checkUpdateTpIds(sb.toString());
}
/**
* 线 ID
*
* @param probeUpdateString
* @return
*/
public List<String> checkUpdateTpIds(String probeUpdateString) {
Map<String, String> params = new HashMap(2);
params.put(Constants.PROBE_MODIFY_REQUEST, JSON.toJSONString(cacheDataList));
params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
Map<String, String> headers = new HashMap(2);
headers.put(Constants.LONG_PULLING_TIMEOUT, "" + timeout);
if (StringUtils.isEmpty(cacheDataList)) {
if (StringUtils.isEmpty(probeUpdateString)) {
return Collections.emptyList();
}
try {
long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
Result result = agent.httpPost(Constants.LISTENER_PATH, headers, params, readTimeoutMs);
if (result.isSuccess()) {
if (result == null || result.isFail()) {
setHealthServer(false);
log.warn("[check-update] get changed dataId error, code: {}", result == null ? "error" : result.getCode());
} else {
setHealthServer(true);
return parseUpdateDataIdResponse(result.getData().toString());
} else {
setHealthServer(false);
log.error("[check-update] get changed dataId error, code: {}", result.getCode());
}
} catch (Exception ex) {
setHealthServer(false);
@ -226,12 +251,12 @@ public class ClientWorker {
return cacheData;
}
cacheData = new CacheData(tpId);
cacheData = new CacheData(namespace, itemId, tpId);
CacheData lastCacheData = cacheMap.putIfAbsent(tpId, cacheData);
if (lastCacheData == null) {
String serverConfig = getServerConfig(namespace, itemId, tpId, 3000L);
GlobalRemotePoolInfo poolInfo = JSON.parseObject(serverConfig, GlobalRemotePoolInfo.class);
cacheData.setContent(poolInfo.getContent());
PoolParameterInfo poolInfo = JSON.parseObject(serverConfig, PoolParameterInfo.class);
cacheData.setContent(ContentUtil.getPoolContent(poolInfo));
int taskId = cacheMap.size() / Constants.CONFIG_LONG_POLL_TIMEOUT;
cacheData.setTaskId(taskId);

@ -3,18 +3,18 @@ package io.dynamic.threadpool.starter.listener;
import com.alibaba.fastjson.JSON;
import io.dynamic.threadpool.starter.common.CommonThreadPool;
import io.dynamic.threadpool.starter.common.Constants;
import io.dynamic.threadpool.starter.config.ApplicationContextHolder;
import io.dynamic.threadpool.common.config.ApplicationContextHolder;
import io.dynamic.threadpool.starter.config.DynamicThreadPoolProperties;
import io.dynamic.threadpool.starter.core.GlobalThreadPoolManage;
import io.dynamic.threadpool.starter.model.PoolParameterInfo;
import io.dynamic.threadpool.starter.remote.HttpAgent;
import io.dynamic.threadpool.starter.remote.ServerHttpAgent;
import io.dynamic.threadpool.starter.toolkit.BlockingQueueUtil;
import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap;
import io.dynamict.hreadpool.common.web.base.Result;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import io.dynamic.threadpool.common.model.PoolParameterInfo;
import io.dynamic.threadpool.common.web.base.Result;
import org.springframework.core.annotation.Order;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit;
* @author chen.ma
* @date 2021/6/20 16:34
*/
public class ThreadPoolRunListener implements ApplicationRunner {
public class ThreadPoolRunListener {
private final DynamicThreadPoolProperties dynamicThreadPoolProperties;
@ -35,8 +35,9 @@ public class ThreadPoolRunListener implements ApplicationRunner {
this.dynamicThreadPoolProperties = properties;
}
@Override
public void run(ApplicationArguments args) throws Exception {
@Order(1024)
@PostConstruct
public void run() {
Map<String, DynamicThreadPoolWrap> executorMap =
ApplicationContextHolder.getBeansOfType(DynamicThreadPoolWrap.class);
@ -59,7 +60,7 @@ public class ThreadPoolRunListener implements ApplicationRunner {
val.setPool(CommonThreadPool.getInstance(val.getTpId()));
}
GlobalThreadPoolManage.register(val.getTpId(), val);
GlobalThreadPoolManage.register(val.getTpId(), ppi, val);
});
}

@ -1,6 +1,6 @@
package io.dynamic.threadpool.starter.remote;
import io.dynamict.hreadpool.common.web.base.Result;
import io.dynamic.threadpool.common.web.base.Result;
import java.util.Map;

@ -1,9 +1,9 @@
package io.dynamic.threadpool.starter.remote;
import io.dynamic.threadpool.starter.config.ApplicationContextHolder;
import io.dynamic.threadpool.common.config.ApplicationContextHolder;
import io.dynamic.threadpool.starter.config.DynamicThreadPoolProperties;
import io.dynamic.threadpool.starter.toolkit.HttpClientUtil;
import io.dynamict.hreadpool.common.web.base.Result;
import io.dynamic.threadpool.common.web.base.Result;
import java.util.Map;

@ -1,3 +1,3 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=io.dynamic.threadpool.starter.config.CommonConfiguration, \
org.springframework.boot.autoconfigure.EnableAutoConfiguration=io.dynamic.threadpool.common.config.CommonConfiguration, \
io.dynamic.threadpool.starter.config.OkHttpClientConfig,\
io.dynamic.threadpool.starter.config.DynamicThreadPoolAutoConfiguration

@ -46,6 +46,11 @@
<groupId>io.dynamic-threadpool</groupId>
<artifactId>common</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
<build>

@ -3,7 +3,10 @@ package io.dynamic.threadpool.server;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@SpringBootApplication(
scanBasePackages = {
"io.dynamic.threadpool.common.config", "io.dynamic.threadpool.server"
})
public class ServerApplication {
public static void main(String[] args) {

@ -15,4 +15,6 @@ public class Constants {
public static final String LISTENING_CONFIGS = "Listening-Configs";
public static final String ENCODE = "UTF-8";
public static final String NULL = "";
}

@ -1,11 +1,15 @@
package io.dynamic.threadpool.server.controller;
import io.dynamic.threadpool.server.constant.Constants;
import io.dynamic.threadpool.server.event.ConfigDataChangeEvent;
import io.dynamic.threadpool.server.model.ConfigAllInfo;
import io.dynamic.threadpool.server.model.ConfigInfoBase;
import io.dynamic.threadpool.server.service.ConfigChangePublisher;
import io.dynamic.threadpool.server.service.ConfigService;
import io.dynamic.threadpool.server.service.ConfigServletInner;
import io.dynamict.hreadpool.common.web.base.Result;
import io.dynamict.hreadpool.common.web.base.Results;
import io.dynamic.threadpool.server.toolkit.Md5ConfigUtil;
import io.dynamic.threadpool.common.web.base.Result;
import io.dynamic.threadpool.common.web.base.Results;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.StringUtils;
@ -14,6 +18,8 @@ import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.net.URLDecoder;
import java.sql.Timestamp;
import java.util.Map;
/**
*
@ -40,6 +46,17 @@ public class ConfigController {
return Results.success(configService.findConfigAllInfo(tpId, itemId, namespace));
}
@PostMapping
public Result<Boolean> publishConfig(HttpServletRequest request, @RequestBody ConfigAllInfo config) {
configService.insertOrUpdate(config);
long gmtModified = new Timestamp(System.currentTimeMillis()).getTime();
ConfigChangePublisher
.notifyConfigChange(new ConfigDataChangeEvent(config.getNamespace(), config.getItemId(), config.getTpId(), gmtModified));
return Results.success(true);
}
@SneakyThrows
@PostMapping("/listener")
public void listener(HttpServletRequest request, HttpServletResponse response) {
@ -52,7 +69,14 @@ public class ConfigController {
probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
configServletInner.doPollingConfig(request, response, null, probeModify.length());
Map<String, String> clientMd5Map;
try {
clientMd5Map = Md5ConfigUtil.getClientMd5Map(probeModify);
} catch (Throwable e) {
throw new IllegalArgumentException("invalid probeModify");
}
configServletInner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
}

@ -0,0 +1,31 @@
package io.dynamic.threadpool.server.event;
import org.springframework.util.StringUtils;
/**
* Config Data Change Event.
*
* @author chen.ma
* @date 2021/6/24 23:35
*/
public class ConfigDataChangeEvent extends Event {
public final String namespace;
public final String itemId;
public final String tpId;
public final long lastModifiedTs;
public ConfigDataChangeEvent(String namespace, String itemId, String tpId, Long gmtModified) {
if (StringUtils.isEmpty(namespace) || StringUtils.isEmpty(itemId) || StringUtils.isEmpty(tpId)) {
throw new IllegalArgumentException("dataId is null or group is null");
}
this.namespace = namespace;
this.itemId = itemId;
this.tpId = tpId;
this.lastModifiedTs = gmtModified;
}
}

@ -1,4 +1,4 @@
package io.dynamic.threadpool.server.notify;
package io.dynamic.threadpool.server.event;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;

@ -1,7 +1,5 @@
package io.dynamic.threadpool.server.event;
import io.dynamic.threadpool.server.notify.Event;
/**
* Local Data Change Event.
*
@ -9,4 +7,10 @@ import io.dynamic.threadpool.server.notify.Event;
* @date 2021/6/23 19:13
*/
public class LocalDataChangeEvent extends Event {
public final String groupKey;
public LocalDataChangeEvent(String groupKey) {
this.groupKey = groupKey;
}
}

@ -1,4 +1,4 @@
package io.dynamic.threadpool.server.notify;
package io.dynamic.threadpool.server.event;
/**
* Slow Event.

@ -22,7 +22,7 @@ public final class RowMapperManager {
ConfigAllInfo configAllInfo = new ConfigAllInfo();
configAllInfo.setTpId(rs.getString("tp_id"));
configAllInfo.setItemId(rs.getString("item_id"));
configAllInfo.setNamespace(rs.getString("namespace"));
configAllInfo.setNamespace(rs.getString("tenant_id"));
configAllInfo.setContent(rs.getString("content"));
configAllInfo.setCoreSize(rs.getInt("core_size"));
configAllInfo.setMaxSize(rs.getInt("max_size"));

@ -0,0 +1,35 @@
package io.dynamic.threadpool.server.model;
import io.dynamic.threadpool.server.constant.Constants;
import io.dynamic.threadpool.server.toolkit.SimpleReadWriteLock;
import io.dynamic.threadpool.server.toolkit.SingletonRepository;
import lombok.Getter;
import lombok.Setter;
/**
* Cache Item.
*
* @author chen.ma
* @date 2021/6/24 21:23
*/
@Getter
@Setter
public class CacheItem {
final String groupKey;
public volatile String md5 = Constants.NULL;
public volatile long lastModifiedTs;
public SimpleReadWriteLock rwLock = new SimpleReadWriteLock();
public CacheItem(String groupKey) {
this.groupKey = SingletonRepository.DataIdGroupIdCache.getSingleton(groupKey);
}
public CacheItem(String groupKey, String md5) {
this.md5 = md5;
this.groupKey = SingletonRepository.DataIdGroupIdCache.getSingleton(groupKey);
}
}

@ -1,5 +1,6 @@
package io.dynamic.threadpool.server.model;
import io.dynamic.threadpool.common.model.PoolParameter;
import lombok.Data;
/**
@ -9,7 +10,7 @@ import lombok.Data;
* @date 2021/6/20 15:14
*/
@Data
public class ConfigAllInfo extends ConfigInfo {
public class ConfigAllInfo extends ConfigInfo implements PoolParameter {
private static final long serialVersionUID = -2417394244017463665L;

@ -15,6 +15,11 @@ public class ConfigInfoBase implements Serializable {
private static final long serialVersionUID = -1892597426099265730L;
/**
* namespace
*/
private String namespace;
/**
* TpId
*/

@ -1,7 +1,15 @@
package io.dynamic.threadpool.server.notify;
import cn.hutool.core.collection.ConcurrentHashSet;
import io.dynamic.threadpool.server.event.Event;
import io.dynamic.threadpool.server.notify.listener.Subscriber;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/**
* The default event publisher implementation.
@ -9,12 +17,126 @@ import io.dynamic.threadpool.server.notify.listener.Subscriber;
* @author chen.ma
* @date 2021/6/23 19:06
*/
@Slf4j
public class DefaultPublisher extends Thread implements EventPublisher {
protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet();
private BlockingQueue<Event> queue;
private volatile boolean initialized = false;
private volatile boolean shutdown = false;
private int queueMaxSize = -1;
protected volatile Long lastEventSequence = -1L;
private static final AtomicReferenceFieldUpdater<DefaultPublisher, Long> UPDATER = AtomicReferenceFieldUpdater
.newUpdater(DefaultPublisher.class, Long.class, "lastEventSequence");
@Override
public void init(Class<? extends Event> type, int bufferSize) {
setDaemon(true);
setName("dynamic.thread-pool.publisher-" + type.getName());
this.queueMaxSize = bufferSize;
this.queue = new ArrayBlockingQueue(bufferSize);
start();
}
@Override
public synchronized void start() {
if (!initialized) {
// start just called once
super.start();
if (queueMaxSize == -1) {
queueMaxSize = NotifyCenter.ringBufferSize;
}
initialized = true;
}
}
@Override
public void run() {
openEventHandler();
}
private void openEventHandler() {
try {
int waitTimes = 60;
for (; ; ) {
if (shutdown || hasSubscriber() || waitTimes <= 0) {
break;
}
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
waitTimes--;
}
for (; ; ) {
if (shutdown) {
break;
}
final Event event = queue.take();
receiveEvent(event);
UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
}
} catch (Throwable ex) {
log.error("Event listener exception : {}", ex);
}
}
@Override
public void addSubscriber(Subscriber subscriber) {
subscribers.add(subscriber);
}
@Override
public boolean publish(Event event) {
boolean success = this.queue.offer(event);
if (!success) {
log.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
receiveEvent(event);
return true;
}
return true;
}
@Override
public void notifySubscriber(Subscriber subscriber, Event event) {
final Runnable job = () -> subscriber.onEvent(event);
final Executor executor = subscriber.executor();
if (executor != null) {
executor.execute(job);
} else {
try {
job.run();
} catch (Throwable e) {
log.error("Event callback exception : {}", e);
}
}
}
private boolean hasSubscriber() {
return !CollectionUtils.isEmpty(subscribers);
}
void receiveEvent(Event event) {
final long currentEventSequence = event.sequence();
for (Subscriber subscriber : subscribers) {
/*if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
event.getClass());
continue;
}*/
notifySubscriber(subscriber, event);
}
}
}

@ -1,6 +1,8 @@
package io.dynamic.threadpool.server.notify;
import cn.hutool.core.collection.ConcurrentHashSet;
import io.dynamic.threadpool.server.event.Event;
import io.dynamic.threadpool.server.event.SlowEvent;
import io.dynamic.threadpool.server.notify.listener.Subscriber;
import java.util.Map;
@ -15,7 +17,7 @@ import java.util.concurrent.locks.ReentrantLock;
* @author chen.ma
* @date 2021/6/23 19:05
*/
public class DefaultSharePublisher {
public class DefaultSharePublisher extends DefaultPublisher {
private final Map<Class<? extends SlowEvent>, Set<Subscriber>> subMappings = new ConcurrentHashMap();

@ -1,5 +1,6 @@
package io.dynamic.threadpool.server.notify;
import io.dynamic.threadpool.server.event.Event;
import io.dynamic.threadpool.server.notify.listener.Subscriber;
/**
@ -10,6 +11,12 @@ import io.dynamic.threadpool.server.notify.listener.Subscriber;
*/
public interface EventPublisher {
void init(Class<? extends Event> type, int bufferSize);
void addSubscriber(Subscriber subscriber);
boolean publish(Event event);
void notifySubscriber(Subscriber subscriber, Event event);
}

@ -1,11 +1,16 @@
package io.dynamic.threadpool.server.notify;
import io.dynamic.threadpool.server.event.Event;
import io.dynamic.threadpool.server.event.SlowEvent;
import io.dynamic.threadpool.server.notify.listener.SmartSubscriber;
import io.dynamic.threadpool.server.notify.listener.Subscriber;
import io.dynamic.threadpool.server.toolkit.ClassUtil;
import io.dynamic.threadpool.server.toolkit.MapUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.Iterator;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
@ -15,18 +20,48 @@ import java.util.function.BiFunction;
* @author chen.ma
* @date 2021/6/23 18:58
*/
@Slf4j
public class NotifyCenter {
private static final NotifyCenter INSTANCE = new NotifyCenter();
public static int ringBufferSize = 16384;
public static int shareBufferSize = 1024;
private DefaultSharePublisher sharePublisher;
private static Class<? extends EventPublisher> clazz = null;
private static BiFunction<Class<? extends Event>, Integer, EventPublisher> publisherFactory = null;
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap(16);
static {
final ServiceLoader<EventPublisher> loader = ServiceLoader.load(EventPublisher.class);
Iterator<EventPublisher> iterator = loader.iterator();
if (iterator.hasNext()) {
clazz = iterator.next().getClass();
} else {
clazz = DefaultPublisher.class;
}
publisherFactory = (cls, buffer) -> {
try {
EventPublisher publisher = clazz.newInstance();
publisher.init(cls, buffer);
return publisher;
} catch (Throwable ex) {
log.error("Service class newInstance has error : {}", ex);
throw new RuntimeException(ex);
}
};
INSTANCE.sharePublisher = new DefaultSharePublisher();
INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);
}
public static <T> void registerSubscriber(final Subscriber consumer) {
if (consumer instanceof SmartSubscriber) {
for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
@ -52,11 +87,33 @@ public class NotifyCenter {
final String topic = ClassUtil.getCanonicalName(subscribeType);
synchronized (NotifyCenter.class) {
// MapUtils.computeIfAbsent is a unsafe method.
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, subscribeType, ringBufferSize);
}
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
publisher.addSubscriber(consumer);
}
public static boolean publishEvent(final Event event) {
try {
return publishEvent(event.getClass(), event);
} catch (Throwable ex) {
log.error("There was an exception to the message publishing : {}", ex);
return false;
}
}
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
if (ClassUtil.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher.publish(event);
}
final String topic = ClassUtil.getCanonicalName(eventType);
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher != null) {
return publisher.publish(event);
}
log.warn("There are no [{}] publishers for this event, please register", topic);
return false;
}
}

@ -1,6 +1,6 @@
package io.dynamic.threadpool.server.notify.listener;
import io.dynamic.threadpool.server.notify.Event;
import io.dynamic.threadpool.server.event.Event;
import java.util.List;

@ -1,6 +1,8 @@
package io.dynamic.threadpool.server.notify.listener;
import io.dynamic.threadpool.server.notify.Event;
import io.dynamic.threadpool.server.event.Event;
import java.util.concurrent.Executor;
/**
* An abstract subscriber class for subscriber interface.
@ -24,4 +26,7 @@ public abstract class Subscriber<T extends Event> {
*/
public abstract Class<? extends Event> subscribeType();
public Executor executor() {
return null;
}
}

@ -0,0 +1,59 @@
package io.dynamic.threadpool.server.service;
import io.dynamic.threadpool.common.config.ApplicationContextHolder;
import io.dynamic.threadpool.common.toolkit.Md5Util;
import io.dynamic.threadpool.server.constant.Constants;
import io.dynamic.threadpool.server.model.CacheItem;
import io.dynamic.threadpool.server.model.ConfigAllInfo;
import org.springframework.util.StringUtils;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
* Config Cache Service.
*
* @author chen.ma
* @date 2021/6/24 21:19
*/
public class ConfigCacheService {
static ConfigService configService = null;
private static final ConcurrentHashMap<String, CacheItem> CACHE = new ConcurrentHashMap();
public static boolean isUpdateData(String groupKey, String md5, String ip) {
String contentMd5 = ConfigCacheService.getContentMd5(groupKey, ip);
return Objects.equals(contentMd5, md5);
}
/**
* Md5
* TODO IP, IP 线
* TODOgroupKey && Md5 Cache
*
* @param groupKey
* @param ip
* @return
*/
private static String getContentMd5(String groupKey, String ip) {
CacheItem cacheItem = CACHE.get(groupKey);
if (cacheItem != null) {
return cacheItem.md5;
}
if (configService == null) {
configService = ApplicationContextHolder.getBean(ConfigService.class);
}
String[] split = groupKey.split("\\+");
ConfigAllInfo config = configService.findConfigAllInfo(split[0], split[1], split[2]);
if (config != null && !StringUtils.isEmpty(config.getTpId())) {
String md5 = Md5Util.getTpContentMd5(config);
cacheItem = new CacheItem(groupKey, md5);
CACHE.put(groupKey, cacheItem);
}
return (cacheItem != null) ? cacheItem.md5 : Constants.NULL;
}
}

@ -0,0 +1,22 @@
package io.dynamic.threadpool.server.service;
import io.dynamic.threadpool.server.event.ConfigDataChangeEvent;
import io.dynamic.threadpool.server.notify.NotifyCenter;
/**
* Config Change Publisher.
*
* @author chen.ma
* @date 2021/6/24 23:34
*/
public class ConfigChangePublisher {
/**
* Notify ConfigChange.
*
* @param event ConfigDataChangeEvent instance.
*/
public static void notifyConfigChange(ConfigDataChangeEvent event) {
NotifyCenter.publishEvent(event);
}
}

@ -13,10 +13,17 @@ public interface ConfigService {
/**
*
*
* @param tpId tpId
* @param itemId itemId
* @param namespace namespace
* @param tpId tpId
* @param itemId itemId
* @param namespace namespace
* @return
*/
ConfigAllInfo findConfigAllInfo(String tpId, String itemId, String namespace);
/**
*
*
* @param configAllInfo
*/
void insertOrUpdate(ConfigAllInfo configAllInfo);
}

@ -1,20 +1,22 @@
package io.dynamic.threadpool.server.service;
import com.alibaba.fastjson.JSON;
import io.dynamic.threadpool.server.event.LocalDataChangeEvent;
import io.dynamic.threadpool.server.notify.Event;
import io.dynamic.threadpool.server.event.Event;
import io.dynamic.threadpool.server.notify.NotifyCenter;
import io.dynamic.threadpool.server.notify.listener.Subscriber;
import io.dynamic.threadpool.server.toolkit.ConfigExecutor;
import io.dynamic.threadpool.server.toolkit.Md5ConfigUtil;
import io.dynamic.threadpool.server.toolkit.RequestUtil;
import io.dynamic.threadpool.common.web.base.Results;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
@ -34,8 +36,6 @@ public class LongPollingService {
public static final String LONG_POLLING_HEADER = "Long-Pulling-Timeout";
public static final String LONG_POLLING_NO_HANG_UP_HEADER = "Long-Pulling-Timeout-No-Hangup";
public static final String CLIENT_APPNAME_HEADER = "Client-AppName";
private Map<String, Long> retainIps = new ConcurrentHashMap();
@ -54,7 +54,7 @@ public class LongPollingService {
} else {
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
// ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey));
}
}
}
@ -78,25 +78,29 @@ public class LongPollingService {
final String groupKey;
final long changeTime = System.currentTimeMillis();
final boolean isBeta;
final List<String> betaIps;
DataChangeTask(String groupKey, boolean isBeta, List<String> betaIps) {
DataChangeTask(String groupKey) {
this.groupKey = groupKey;
this.isBeta = isBeta;
this.betaIps = betaIps;
}
@Override
public void run() {
try {
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
if (clientSub.clientMd5Map.containsKey(groupKey)) {
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
iter.remove();
clientSub.sendResponse(Arrays.asList(groupKey));
}
}
} catch (Exception ex) {
log.error("Data change error :: {}", ex.getMessage(), ex);
}
}
}
public static boolean isSupportLongPolling(HttpServletRequest req) {
return null != req.getHeader(LONG_POLLING_HEADER);
}
@ -112,7 +116,6 @@ public class LongPollingService {
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {
String str = req.getHeader(LONG_POLLING_HEADER);
String noHangUpFlag = req.getHeader(LONG_POLLING_NO_HANG_UP_HEADER);
String appName = req.getHeader(CLIENT_APPNAME_HEADER);
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
@ -120,7 +123,11 @@ public class LongPollingService {
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
} else {
List<String> changedGroups = Md5ConfigUtil.compareMd5(req, clientMd5Map);
if (changedGroups.size() > 0) {
generateResponse(rsp, changedGroups);
return;
}
}
String ip = RequestUtil.getRemoteIp(req);
@ -164,11 +171,22 @@ public class LongPollingService {
@Override
public void run() {
asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(() -> {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
allSubs.remove(ClientLongPolling.this);
if (isFixedPolling()) {
try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
allSubs.remove(ClientLongPolling.this);
if (isFixedPolling()) {
List<String> changedGroups = Md5ConfigUtil.compareMd5((HttpServletRequest) asyncContext.getRequest(), clientMd5Map);
if (changedGroups.size() > 0) {
sendResponse(changedGroups);
} else {
sendResponse(null);
}
} else {
sendResponse(null);
}
} catch (Exception ex) {
log.error("Long polling error :: {}", ex.getMessage(), ex);
}
}, timeoutTime, TimeUnit.MILLISECONDS);
@ -176,9 +194,64 @@ public class LongPollingService {
allSubs.add(this);
}
private void sendResponse(List<String> changedGroups) {
// Cancel time out task.
if (null != asyncTimeoutFuture) {
asyncTimeoutFuture.cancel(false);
}
generateResponse(changedGroups);
}
private void generateResponse(List<String> changedGroups) {
if (null == changedGroups) {
// Tell web container to send http response.
asyncContext.complete();
return;
}
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
try {
String respString = JSON.toJSONString(Results.success(changedGroups));
// Disable cache.
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().println(respString);
asyncContext.complete();
} catch (Exception ex) {
log.error(ex.toString(), ex);
asyncContext.complete();
}
}
}
public Map<String, Long> getRetainIps() {
return retainIps;
}
/**
*
*
* @param response
* @param changedGroups
*/
private void generateResponse(HttpServletResponse response, List<String> changedGroups) {
if (!CollectionUtils.isEmpty(changedGroups)) {
try {
final String respString = JSON.toJSONString(Results.success(changedGroups));
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().println(respString);
} catch (Exception ex) {
log.error(ex.toString(), ex);
}
}
}
}

@ -3,16 +3,24 @@ package io.dynamic.threadpool.server.service.impl;
import io.dynamic.threadpool.server.mapper.RowMapperManager;
import io.dynamic.threadpool.server.model.ConfigAllInfo;
import io.dynamic.threadpool.server.service.ConfigService;
import io.dynamic.threadpool.server.toolkit.Md5ConfigUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.support.GeneratedKeyHolder;
import org.springframework.jdbc.support.KeyHolder;
import org.springframework.stereotype.Service;
import java.sql.PreparedStatement;
import java.sql.Statement;
/**
*
*
* @author chen.ma
* @date 2021/6/20 15:21
*/
@Slf4j
@Service
public class ConfigServiceImpl implements ConfigService {
@ -28,4 +36,61 @@ public class ConfigServiceImpl implements ConfigService {
return configAllInfo;
}
@Override
public void insertOrUpdate(ConfigAllInfo configAllInfo) {
try {
addConfigInfo(configAllInfo);
} catch (Exception ex) {
updateConfigInfo(configAllInfo);
}
}
private Long addConfigInfo(ConfigAllInfo config) {
final String sql = "INSERT INTO `config_info` (`tenant_id`, `item_id`, `tp_id`, `core_size`, `max_size`, `queue_type`, `capacity`, `keep_alive_time`, `content`, `md5`, `is_alarm`, `capacity_alarm`, `liveness_alarm`) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?);";
KeyHolder keyHolder = new GeneratedKeyHolder();
try {
jdbcTemplate.update(con -> {
PreparedStatement ps = con.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
ps.setString(1, config.getNamespace());
ps.setString(2, config.getItemId());
ps.setString(3, config.getTpId());
ps.setInt(4, config.getCoreSize());
ps.setInt(5, config.getMaxSize());
ps.setInt(6, config.getQueueType());
ps.setInt(7, config.getCapacity());
ps.setInt(8, config.getKeepAliveTime());
ps.setString(9, config.getContent());
ps.setString(10, Md5ConfigUtil.getTpContentMd5(config));
ps.setInt(11, config.getIsAlarm());
ps.setInt(12, config.getCapacityAlarm());
ps.setInt(13, config.getLivenessAlarm());
return ps;
}, keyHolder);
Number number = keyHolder.getKey();
if (number == null) {
throw new IllegalArgumentException("insert config_info fail");
}
return number.longValue();
} catch (Exception ex) {
log.error("[db-error] message :: {}", ex.getMessage(), ex);
throw ex;
}
}
private void updateConfigInfo(ConfigAllInfo config) {
try {
jdbcTemplate.update("update config_info set core_size = ?, max_size = ?, queue_type = ?, capacity = ?, keep_alive_time = ?, content = ?, md5 = ?, is_alarm = ?, capacity_alarm = ?, liveness_alarm = ? " +
"where tenant_id = ?, item_id = ?, tp_id = ?",
config.getCoreSize(), config.getMaxSize(), config.getQueueType(), config.getCapacity(), config.getKeepAliveTime(),
config.getContent(), Md5ConfigUtil.getTpContentMd5(config), config.getIsAlarm(), config.getCapacityAlarm(),
config.getLivenessAlarm(), config.getNamespace(), config.getItemId(), config.getTpId());
} catch (Exception ex) {
log.error("[db-error] message :: {}", ex.getMessage(), ex);
throw ex;
}
}
}

@ -1,7 +1,7 @@
package io.dynamic.threadpool.server.toolkit;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import io.dynamict.hreadpool.common.executor.ExecutorFactory;
import io.dynamic.threadpool.common.executor.ExecutorFactory;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;

@ -1,7 +1,15 @@
package io.dynamic.threadpool.server.toolkit;
import cn.hutool.crypto.digest.DigestUtil;
import io.dynamic.threadpool.server.model.ConfigAllInfo;
import io.dynamic.threadpool.server.service.ConfigCacheService;
import io.dynamic.threadpool.common.toolkit.Md5Util;
import org.springframework.util.StringUtils;
import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
@ -12,23 +20,115 @@ import io.dynamic.threadpool.server.model.ConfigAllInfo;
*/
public class Md5ConfigUtil {
static final char WORD_SEPARATOR_CHAR = (char) 2;
static final char LINE_SEPARATOR_CHAR = (char) 1;
/**
* ThreadPool Md5
*
* @param config
* @return
*/
public String getTpContentMd5(ConfigAllInfo config) {
StringBuilder stringBuilder = new StringBuilder();
String targetStr = stringBuilder.append(config.getCoreSize())
.append(config.getMaxSize())
.append(config.getQueueType())
.append(config.getCapacity())
.append(config.getKeepAliveTime())
.append(config.getIsAlarm())
.append(config.getCapacityAlarm())
.append(config.getLivenessAlarm())
.toString();
return DigestUtil.md5Hex(targetStr);
public static String getTpContentMd5(ConfigAllInfo config) {
return Md5Util.getTpContentMd5(config);
}
/**
* Md5
*
* @param request
* @param clientMd5Map
* @return
*/
public static List<String> compareMd5(HttpServletRequest request, Map<String, String> clientMd5Map) {
List<String> changedGroupKeys = new ArrayList();
clientMd5Map.forEach((key, val) -> {
String remoteIp = RequestUtil.getRemoteIp(request);
boolean isUpdateData = ConfigCacheService.isUpdateData(key, val, remoteIp);
if (!isUpdateData) {
changedGroupKeys.add(key);
}
});
return changedGroupKeys;
}
public static Map<String, String> getClientMd5Map(String configKeysString) {
Map<String, String> md5Map = new HashMap(5);
if (null == configKeysString || "".equals(configKeysString)) {
return md5Map;
}
int start = 0;
List<String> tmpList = new ArrayList(3);
for (int i = start; i < configKeysString.length(); i++) {
char c = configKeysString.charAt(i);
if (c == WORD_SEPARATOR_CHAR) {
tmpList.add(configKeysString.substring(start, i));
start = i + 1;
if (tmpList.size() > 3) {
// Malformed message and return parameter error.
throw new IllegalArgumentException("invalid protocol,too much key");
}
} else if (c == LINE_SEPARATOR_CHAR) {
String endValue = "";
if (start + 1 <= i) {
endValue = configKeysString.substring(start, i);
}
start = i + 1;
// If it is the old message, the last digit is MD5. The post-multi-tenant message is tenant
if (tmpList.size() == 2) {
String groupKey = getKey(tmpList.get(0), tmpList.get(1));
groupKey = SingletonRepository.DataIdGroupIdCache.getSingleton(groupKey);
md5Map.put(groupKey, endValue);
} else {
String groupKey = getKey(tmpList.get(0), tmpList.get(1), endValue);
groupKey = SingletonRepository.DataIdGroupIdCache.getSingleton(groupKey);
md5Map.put(groupKey, tmpList.get(2));
}
tmpList.clear();
// Protect malformed messages
if (md5Map.size() > 10000) {
throw new IllegalArgumentException("invalid protocol, too much listener");
}
}
}
return md5Map;
}
public static String getKey(String dataId, String group) {
StringBuilder sb = new StringBuilder();
urlEncode(dataId, sb);
sb.append('+');
urlEncode(group, sb);
return sb.toString();
}
public static String getKey(String dataId, String group, String tenant) {
StringBuilder sb = new StringBuilder();
urlEncode(dataId, sb);
sb.append('+');
urlEncode(group, sb);
if (!StringUtils.isEmpty(tenant)) {
sb.append('+');
urlEncode(tenant, sb);
}
return sb.toString();
}
static void urlEncode(String str, StringBuilder sb) {
for (int idx = 0; idx < str.length(); ++idx) {
char c = str.charAt(idx);
if ('+' == c) {
sb.append("%2B");
} else if ('%' == c) {
sb.append("%25");
} else {
sb.append(c);
}
}
}
}

@ -0,0 +1,59 @@
package io.dynamic.threadpool.server.toolkit;
/**
* .
*
* @author chen.ma
* @date 2021/6/24 21:26
*/
public class SimpleReadWriteLock {
/**
* Try read lock.
*/
public synchronized boolean tryReadLock() {
if (isWriteLocked()) {
return false;
} else {
status++;
return true;
}
}
/**
* Release the read lock.
*/
public synchronized void releaseReadLock() {
status--;
}
/**
* Try write lock.
*/
public synchronized boolean tryWriteLock() {
if (!isFree()) {
return false;
} else {
status = -1;
return true;
}
}
public synchronized void releaseWriteLock() {
status = 0;
}
private boolean isWriteLocked() {
return status < 0;
}
private boolean isFree() {
return status == 0;
}
/**
* Zero means no lock; Negative Numbers mean write locks; Positive Numbers mean read locks, and the numeric value
* represents the number of read locks.
*/
private int status = 0;
}

@ -0,0 +1,48 @@
package io.dynamic.threadpool.server.toolkit;
import java.util.concurrent.ConcurrentHashMap;
/**
* Singleton Repository.
*
* @author chen.ma
* @date 2021/6/24 21:28
*/
public class SingletonRepository<T> {
public SingletonRepository() {
// Initializing size 2^16, the container itself use about 50K of memory, avoiding constant expansion
shared = new ConcurrentHashMap(1 << 16);
}
public T getSingleton(T obj) {
T previous = shared.putIfAbsent(obj, obj);
return (null == previous) ? obj : previous;
}
public int size() {
return shared.size();
}
/**
* Be careful use.
*/
public void remove(Object obj) {
shared.remove(obj);
}
private final ConcurrentHashMap<T, T> shared;
/**
* Cache of DataId and Group.
*/
public static class DataIdGroupIdCache {
public static String getSingleton(String str) {
return cache.getSingleton(str);
}
static SingletonRepository<String> cache = new SingletonRepository<String>();
}
}
Loading…
Cancel
Save