feat: 功能持续更新.

pull/161/head
chen.ma 3 years ago
parent 3791c6e7de
commit ded7622277

@ -0,0 +1,28 @@
package io.dynamict.hreadpool.common.executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
/**
* Executor Factory.
*
* @author chen.ma
* @date 2021/6/23 18:35
*/
public class ExecutorFactory {
public static final class Managed {
private static final String DEFAULT_NAMESPACE = "dynamic.thread-pool";
private static final ThreadPoolManager THREAD_POOL_MANAGER = ThreadPoolManager.getInstance();
public static ScheduledExecutorService newSingleScheduledExecutorService(String group, ThreadFactory threadFactory) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, threadFactory);
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
return executorService;
}
}
}

@ -0,0 +1,50 @@
package io.dynamict.hreadpool.common.executor;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
/**
* Thread Pool Manager.
*
* @author chen.ma
* @date 2021/6/23 18:36
*/
public class ThreadPoolManager {
private Map<String, Map<String, Set<ExecutorService>>> resourcesManager;
private Map<String, Object> lockers = new ConcurrentHashMap(8);
private static final ThreadPoolManager INSTANCE = new ThreadPoolManager();
public static ThreadPoolManager getInstance() {
return INSTANCE;
}
public void register(String namespace, String group, ExecutorService executor) {
if (!resourcesManager.containsKey(namespace)) {
synchronized (this) {
lockers.put(namespace, new Object());
}
}
final Object monitor = lockers.get(namespace);
synchronized (monitor) {
Map<String, Set<ExecutorService>> map = resourcesManager.get(namespace);
if (map == null) {
map = new HashMap(8);
map.put(group, new HashSet());
map.get(group).add(executor);
resourcesManager.put(namespace, map);
return;
}
if (!map.containsKey(group)) {
map.put(group, new HashSet());
}
map.get(group).add(executor);
}
}
}

@ -0,0 +1,85 @@
package io.dynamict.hreadpool.common.model;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
/**
*
*
* @author chen.ma
* @date 2021/6/23 21:08
*/
@Getter
@Setter
public class GlobalRemotePoolInfo implements Serializable {
private static final long serialVersionUID = 5447003335557127308L;
/**
*
*/
private String namespace;
/**
* ID
*/
private String itemId;
/**
* 线
*/
private String tpId;
/**
* 线
*/
private Integer coreSize;
/**
* 线
*/
private Integer maxSize;
/**
*
*/
private Integer queueType;
/**
*
*/
private Integer capacity;
/**
* 线
*/
private Integer keepAliveTime;
/**
*
*/
private Integer isAlarm;
/**
*
*/
private Integer capacityAlarm;
/**
*
*/
private Integer livenessAlarm;
/**
* MD5
*/
private String md5;
/**
*
*/
private String content;
}

@ -0,0 +1,35 @@
package io.dynamict.hreadpool.common.web.base;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
/**
*
*
* @author chen.ma
* @date 2021/3/19 16:12
*/
@Data
@Accessors(chain = true)
public class Result<T> implements Serializable {
private static final long serialVersionUID = -4408341719434417427L;
public static final String SUCCESS_CODE = "0";
private String code;
private String message;
private T data;
public boolean isSuccess() {
return SUCCESS_CODE.equals(code);
}
public boolean isFail() {
return !isSuccess();
}
}

@ -0,0 +1,36 @@
package io.dynamict.hreadpool.common.web.base;
import io.dynamict.hreadpool.common.web.exception.ErrorCode;
import io.dynamict.hreadpool.common.web.exception.ServiceException;
/**
* Result
*
* @author chen.ma
* @date 2021/3/19 16:12
*/
public final class Results {
public static Result<Void> success() {
return new Result<Void>()
.setCode(Result.SUCCESS_CODE);
}
public static <T> Result<T> success(T data) {
return new Result<T>()
.setCode(Result.SUCCESS_CODE)
.setData(data);
}
public static <T> Result<T> failure(ServiceException serviceException) {
return new Result<T>().setCode(ErrorCode.SERVICE_ERROR.getCode())
.setMessage(serviceException.getMessage());
}
public static <T> Result<T> failure(String code, String message) {
return new Result<T>()
.setCode(code)
.setMessage(message);
}
}

@ -0,0 +1,51 @@
package io.dynamict.hreadpool.common.web.exception;
/**
*
*
* @author chen.ma
* @date 2021/3/19 16:07
*/
public enum ErrorCode {
UNKNOWN_ERROR {
@Override
public String getCode() {
return "1";
}
@Override
public String getMessage() {
return "未知错误";
}
},
VALIDATION_ERROR {
@Override
public String getCode() {
return "2";
}
@Override
public String getMessage() {
return "参数错误";
}
},
SERVICE_ERROR {
@Override
public String getCode() {
return "3";
}
@Override
public String getMessage() {
return "服务异常";
}
};
public abstract String getCode();
public abstract String getMessage();
}

@ -0,0 +1,47 @@
package io.dynamict.hreadpool.common.web.exception;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
*
*
* @author chen.ma
* @date 2021/3/19 16:14
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class ServiceException extends RuntimeException {
private static final long serialVersionUID = -8667394300356618037L;
private String detail;
public ServiceException(String message) {
super(message);
}
public ServiceException(String message, String detail) {
super(message);
this.detail = detail;
}
public ServiceException(String message, Throwable cause) {
super(message, cause);
this.detail = cause.getMessage();
}
public ServiceException(String message, String detail, Throwable cause) {
super(message, cause);
this.detail = detail;
}
@Override
public String toString() {
return "ServiceException{" +
"message='" + getMessage() + "'," +
"detail='" + getDetail() + "'" +
'}';
}
}

@ -1,10 +1,15 @@
package io.dynamic.threadpool.starter.adapter;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import io.dynamic.threadpool.starter.config.ApplicationContextHolder;
import io.dynamic.threadpool.starter.operation.ThreadPoolOperation;
import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@ -30,8 +35,15 @@ public class ThreadPoolConfigAdapter extends ConfigAdapter {
new ThreadFactoryBuilder().setNamePrefix("threadPool-config").build(),
new ThreadPoolExecutor.DiscardOldestPolicy());
public void subscribeConfig(List<String> tpIds) {
tpIds.forEach(each -> threadPoolOperation.subscribeConfig(each, executorService, config -> callbackConfig(config)));
@PostConstruct
public void subscribeConfig() {
Map<String, DynamicThreadPoolWrap> executorMap =
ApplicationContextHolder.getBeansOfType(DynamicThreadPoolWrap.class);
List<String> tpIdList = new ArrayList();
executorMap.forEach((key, val) -> tpIdList.add(val.getTpId()));
tpIdList.forEach(each -> threadPoolOperation.subscribeConfig(each, executorService, config -> callbackConfig(config)));
}
}

@ -17,4 +17,16 @@ public class Constants {
public static final String NULL = "";
public static final String ENCODE = "UTF-8";
public static final int CONFIG_LONG_POLL_TIMEOUT = 30000;
public static final String BASE_PATH = "/v1/cs";
public static final String CONFIG_CONTROLLER_PATH = BASE_PATH + "/configs";
public static final String LISTENER_PATH = BASE_PATH + CONFIG_CONTROLLER_PATH + "/listener";
public static final String PROBE_MODIFY_REQUEST = "Listening-Configs";
public static final String LONG_PULLING_TIMEOUT = "Long-Pulling-Timeout";
}

@ -3,7 +3,7 @@ package io.dynamic.threadpool.starter.config;
import io.dynamic.threadpool.starter.adapter.ThreadPoolConfigAdapter;
import io.dynamic.threadpool.starter.core.ConfigService;
import io.dynamic.threadpool.starter.core.ThreadPoolConfigService;
import io.dynamic.threadpool.starter.core.ThreadPoolRunListener;
import io.dynamic.threadpool.starter.listener.ThreadPoolRunListener;
import io.dynamic.threadpool.starter.operation.ThreadPoolOperation;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -44,6 +44,6 @@ public class DynamicThreadPoolAutoConfiguration {
@Bean
public ThreadPoolOperation threadPoolOperation() {
return new ThreadPoolOperation();
return new ThreadPoolOperation(properties);
}
}

@ -34,4 +34,8 @@ public class DynamicThreadPoolProperties {
*/
private String itemId;
/**
* 线
*/
private String enabled;
}

@ -17,9 +17,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
@Slf4j
public class CacheData {
private volatile String md5;
public volatile String md5;
private volatile String content;
public volatile String content;
public final String tpId;
@ -82,4 +82,8 @@ public class CacheData {
public String getMd5() {
return this.md5;
}
public void setTaskId(Integer taskId) {
this.taskId = taskId;
}
}

@ -13,9 +13,17 @@ public interface ConfigService {
/**
* , , 使
*
* @param namespace
* @param itemId
* @param tpId
* @param listener
*/
void addListener(String tpId, Listener listener);
void addListener(String namespace, String itemId, String tpId, Listener listener);
/**
*
*
* @return
*/
String getServerStatus();
}

@ -1,8 +1,8 @@
package io.dynamic.threadpool.starter.core;
import io.dynamic.threadpool.starter.config.DynamicThreadPoolProperties;
import io.dynamic.threadpool.starter.http.HttpAgent;
import io.dynamic.threadpool.starter.http.ServerHttpAgent;
import io.dynamic.threadpool.starter.remote.HttpAgent;
import io.dynamic.threadpool.starter.remote.ServerHttpAgent;
import io.dynamic.threadpool.starter.listener.ClientWorker;
import io.dynamic.threadpool.starter.listener.Listener;
@ -26,7 +26,16 @@ public class ThreadPoolConfigService implements ConfigService {
}
@Override
public void addListener(String tpId, Listener listener) {
clientWorker.addTenantListeners(tpId, Arrays.asList(listener));
public void addListener(String namespace, String itemId, String tpId, Listener listener) {
clientWorker.addTenantListeners(namespace, itemId, tpId, Arrays.asList(listener));
}
@Override
public String getServerStatus() {
if (clientWorker.isHealthServer()) {
return "UP";
} else {
return "DOWN";
}
}
}

@ -3,6 +3,7 @@ package io.dynamic.threadpool.starter.core;
import com.alibaba.fastjson.JSON;
import io.dynamic.threadpool.starter.model.PoolParameterInfo;
import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -13,9 +14,11 @@ import java.util.concurrent.TimeUnit;
* @author chen.ma
* @date 2021/6/20 15:51
*/
@Slf4j
public class ThreadPoolDynamicRefresh {
public static void refreshDynamicPool(String content) {
log.info("[🔥] Start refreshing configuration. content :: {}", content);
PoolParameterInfo parameter = JSON.parseObject(content, PoolParameterInfo.class);
refreshDynamicPool(parameter.getTpId(), parameter.getCoreSize(), parameter.getMaxSize(), parameter.getCapacity(), parameter.getKeepAliveTime());
}

@ -1,50 +0,0 @@
package io.dynamic.threadpool.starter.http;
import io.dynamic.threadpool.starter.config.DynamicThreadPoolProperties;
import java.util.Map;
/**
* Server Http Agent.
*
* @author chen.ma
* @date 2021/6/23 20:50
*/
public class ServerHttpAgent implements HttpAgent {
private final DynamicThreadPoolProperties dynamicThreadPoolProperties;
public ServerHttpAgent(DynamicThreadPoolProperties properties) {
this.dynamicThreadPoolProperties = properties;
}
@Override
public void start() {
}
@Override
public String httpGet(String path, Map<String, String> headers, Map<String, String> paramValues, String encoding, long readTimeoutMs) {
return null;
}
@Override
public String httpPost(String path, Map<String, String> headers, Map<String, String> paramValues, String encoding, long readTimeoutMs) {
return null;
}
@Override
public String httpDelete(String path, Map<String, String> headers, Map<String, String> paramValues, String encoding, long readTimeoutMs) {
return null;
}
@Override
public String getNameSpace() {
return null;
}
@Override
public String getEncode() {
return null;
}
}

@ -1,13 +0,0 @@
package io.dynamic.threadpool.starter.http;
import lombok.extern.slf4j.Slf4j;
/**
* Server List Manager.
*
* @author chen.ma
* @date 2021/6/23 20:42
*/
@Slf4j
public class ServerListManager {
}

@ -1,12 +1,16 @@
package io.dynamic.threadpool.starter.listener;
import com.alibaba.fastjson.JSON;
import io.dynamic.threadpool.starter.common.Constants;
import io.dynamic.threadpool.starter.core.CacheData;
import io.dynamic.threadpool.starter.http.HttpAgent;
import io.dynamic.threadpool.starter.remote.HttpAgent;
import io.dynamict.hreadpool.common.model.GlobalRemotePoolInfo;
import io.dynamict.hreadpool.common.web.base.Result;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -23,6 +27,10 @@ public class ClientWorker {
private double currentLongingTaskCount = 0;
private long timeout;
private boolean isHealthServer = true;
private final HttpAgent agent;
private final ScheduledExecutorService executor;
@ -34,6 +42,7 @@ public class ClientWorker {
@SuppressWarnings("all")
public ClientWorker(HttpAgent httpAgent) {
this.agent = httpAgent;
this.timeout = Constants.CONFIG_LONG_POLL_TIMEOUT;
this.executor = Executors.newScheduledThreadPool(1, r -> {
Thread t = new Thread(r);
@ -126,21 +135,78 @@ public class ClientWorker {
* @return
*/
public List<String> checkUpdateTpIds(List<CacheData> cacheDataList) {
return null;
Map<String, String> params = new HashMap(2);
params.put(Constants.PROBE_MODIFY_REQUEST, JSON.toJSONString(cacheDataList));
Map<String, String> headers = new HashMap(2);
headers.put(Constants.LONG_PULLING_TIMEOUT, "" + timeout);
if (StringUtils.isEmpty(cacheDataList)) {
return Collections.emptyList();
}
try {
long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
Result result = agent.httpPost(Constants.LISTENER_PATH, headers, params, readTimeoutMs);
if (result.isSuccess()) {
setHealthServer(true);
return parseUpdateDataIdResponse(result.getData().toString());
} else {
setHealthServer(false);
log.error("[check-update] get changed dataId error, code: {}", result.getCode());
}
} catch (Exception ex) {
setHealthServer(false);
log.error("[check-update] get changed dataId exception.", ex);
}
return Collections.emptyList();
}
/**
*
*
* @param namespace
* @param itemId
* @param tpId
* @param readTimeout
* @return
*/
public String getServerConfig(String namespace, String itemId, String tpId, long readTimeout) {
Map<String, String> params = new HashMap(3);
params.put("namespace", namespace);
params.put("itemId", itemId);
params.put("tpId", tpId);
Result result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, readTimeout);
if (result.isSuccess()) {
return result.getData().toString();
}
log.error("[sub-server-error] namespace :: {}, itemId :: {}, tpId :: {}, result code :: {}",
namespace, itemId, tpId, result.getCode());
return Constants.NULL;
}
/**
* Http
*
* @param response
* @return
*/
public List<String> parseUpdateDataIdResponse(String response) {
return null;
}
/**
* CacheData Listener
*
* @param namespace
* @param itemId
* @param tpId
* @param listeners
*/
public void addTenantListeners(String tpId, List<? extends Listener> listeners) {
CacheData cacheData = addCacheDataIfAbsent(tpId);
public void addTenantListeners(String namespace, String itemId, String tpId, List<? extends Listener> listeners) {
CacheData cacheData = addCacheDataIfAbsent(namespace, itemId, tpId);
for (Listener listener : listeners) {
cacheData.addListener(listener);
}
@ -149,10 +215,12 @@ public class ClientWorker {
/**
* CacheData
*
* @param namespace
* @param itemId
* @param tpId
* @return
*/
public CacheData addCacheDataIfAbsent(String tpId) {
public CacheData addCacheDataIfAbsent(String namespace, String itemId, String tpId) {
CacheData cacheData = cacheMap.get(tpId);
if (cacheData != null) {
return cacheData;
@ -160,7 +228,25 @@ public class ClientWorker {
cacheData = new CacheData(tpId);
CacheData lastCacheData = cacheMap.putIfAbsent(tpId, cacheData);
if (lastCacheData == null) {
String serverConfig = getServerConfig(namespace, itemId, tpId, 3000L);
GlobalRemotePoolInfo poolInfo = JSON.parseObject(serverConfig, GlobalRemotePoolInfo.class);
cacheData.setContent(poolInfo.getContent());
int taskId = cacheMap.size() / Constants.CONFIG_LONG_POLL_TIMEOUT;
cacheData.setTaskId(taskId);
lastCacheData = cacheData;
}
return lastCacheData;
}
public boolean isHealthServer() {
return this.isHealthServer;
}
private void setHealthServer(boolean isHealthServer) {
this.isHealthServer = isHealthServer;
}
}

@ -1,21 +0,0 @@
package io.dynamic.threadpool.starter.listener;
/**
*
*
* @author chen.ma
* @date 2021/6/20 18:37
*/
public class LongPollingRunnable implements Runnable {
private final int taskId;
public LongPollingRunnable(Integer taskId) {
this.taskId = taskId;
}
@Override
public void run() {
}
}

@ -1,13 +1,17 @@
package io.dynamic.threadpool.starter.core;
package io.dynamic.threadpool.starter.listener;
import com.alibaba.fastjson.JSON;
import io.dynamic.threadpool.starter.common.CommonThreadPool;
import io.dynamic.threadpool.starter.common.Constants;
import io.dynamic.threadpool.starter.config.ApplicationContextHolder;
import io.dynamic.threadpool.starter.config.DynamicThreadPoolProperties;
import io.dynamic.threadpool.starter.core.GlobalThreadPoolManage;
import io.dynamic.threadpool.starter.model.PoolParameterInfo;
import io.dynamic.threadpool.starter.remote.HttpAgent;
import io.dynamic.threadpool.starter.remote.ServerHttpAgent;
import io.dynamic.threadpool.starter.toolkit.BlockingQueueUtil;
import io.dynamic.threadpool.starter.toolkit.HttpClientUtil;
import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap;
import org.springframework.beans.factory.annotation.Autowired;
import io.dynamict.hreadpool.common.web.base.Result;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
@ -25,9 +29,6 @@ import java.util.concurrent.TimeUnit;
*/
public class ThreadPoolRunListener implements ApplicationRunner {
@Autowired
private HttpClientUtil httpClientUtil;
private final DynamicThreadPoolProperties dynamicThreadPoolProperties;
public ThreadPoolRunListener(DynamicThreadPoolProperties properties) {
@ -40,13 +41,15 @@ public class ThreadPoolRunListener implements ApplicationRunner {
ApplicationContextHolder.getBeansOfType(DynamicThreadPoolWrap.class);
executorMap.forEach((key, val) -> {
Map<String, Object> queryStrMap = new HashMap(16);
queryStrMap.put("tdId", val.getTpId());
Map<String, String> queryStrMap = new HashMap(3);
queryStrMap.put("tpId", val.getTpId());
queryStrMap.put("itemId", dynamicThreadPoolProperties.getItemId());
queryStrMap.put("namespace", dynamicThreadPoolProperties.getNamespace());
PoolParameterInfo ppi = httpClientUtil.restApiGet(buildUrl(), queryStrMap, PoolParameterInfo.class);
if (ppi != null) {
PoolParameterInfo ppi = null;
HttpAgent httpAgent = new ServerHttpAgent(dynamicThreadPoolProperties);
Result result = httpAgent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 3000L);
if (result.isSuccess() && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) {
// 使用相关参数创建线程池
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue workQueue = BlockingQueueUtil.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity());
@ -60,8 +63,4 @@ public class ThreadPoolRunListener implements ApplicationRunner {
});
}
private String buildUrl() {
return "http://127.0.0.1:6691/v1/cs/configs";
}
}

@ -1,7 +1,5 @@
package io.dynamic.threadpool.starter.model;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import java.io.Serializable;
@ -18,9 +16,9 @@ public class PoolParameterInfo implements Serializable {
private static final long serialVersionUID = -7123935122108553864L;
/**
* Or
*
*/
private String tenant;
private String namespace;
/**
* Id

@ -1,5 +1,6 @@
package io.dynamic.threadpool.starter.operation;
import io.dynamic.threadpool.starter.config.DynamicThreadPoolProperties;
import io.dynamic.threadpool.starter.core.ConfigService;
import io.dynamic.threadpool.starter.listener.Listener;
import org.springframework.beans.factory.annotation.Autowired;
@ -17,6 +18,12 @@ public class ThreadPoolOperation {
@Autowired
private ConfigService configService;
private final DynamicThreadPoolProperties properties;
public ThreadPoolOperation(DynamicThreadPoolProperties properties) {
this.properties = properties;
}
public Listener subscribeConfig(String tpId, Executor executor, ThreadPoolSubscribeCallback threadPoolSubscribeCallback) {
Listener configListener = new Listener() {
@Override
@ -30,7 +37,7 @@ public class ThreadPoolOperation {
}
};
configService.addListener(tpId, configListener);
configService.addListener(properties.getNamespace(), properties.getItemId(), tpId, configListener);
return configListener;
}

@ -1,4 +1,6 @@
package io.dynamic.threadpool.starter.http;
package io.dynamic.threadpool.starter.remote;
import io.dynamict.hreadpool.common.web.base.Result;
import java.util.Map;
@ -35,12 +37,11 @@ public interface HttpAgent {
* @param path
* @param headers
* @param paramValues
* @param encoding
* @param readTimeoutMs
* @return
*/
String httpGet(String path, Map<String, String> headers, Map<String, String> paramValues,
String encoding, long readTimeoutMs);
Result httpGet(String path, Map<String, String> headers, Map<String, String> paramValues,
long readTimeoutMs);
/**
* Http Post
@ -48,12 +49,11 @@ public interface HttpAgent {
* @param path
* @param headers
* @param paramValues
* @param encoding
* @param readTimeoutMs
* @return
*/
String httpPost(String path, Map<String, String> headers, Map<String, String> paramValues,
String encoding, long readTimeoutMs);
Result httpPost(String path, Map<String, String> headers, Map<String, String> paramValues,
long readTimeoutMs);
/**
* Http Delete
@ -61,10 +61,9 @@ public interface HttpAgent {
* @param path
* @param headers
* @param paramValues
* @param encoding
* @param readTimeoutMs
* @return
*/
String httpDelete(String path, Map<String, String> headers, Map<String, String> paramValues,
String encoding, long readTimeoutMs);
Result httpDelete(String path, Map<String, String> headers, Map<String, String> paramValues,
long readTimeoutMs);
}

@ -0,0 +1,62 @@
package io.dynamic.threadpool.starter.remote;
import io.dynamic.threadpool.starter.config.ApplicationContextHolder;
import io.dynamic.threadpool.starter.config.DynamicThreadPoolProperties;
import io.dynamic.threadpool.starter.toolkit.HttpClientUtil;
import io.dynamict.hreadpool.common.web.base.Result;
import java.util.Map;
/**
* Server Http Agent.
*
* @author chen.ma
* @date 2021/6/23 20:50
*/
public class ServerHttpAgent implements HttpAgent {
private final DynamicThreadPoolProperties dynamicThreadPoolProperties;
private final ServerListManager serverListManager;
private HttpClientUtil httpClientUtil = ApplicationContextHolder.getBean(HttpClientUtil.class);
public ServerHttpAgent(DynamicThreadPoolProperties properties) {
this.dynamicThreadPoolProperties = properties;
this.serverListManager = new ServerListManager(dynamicThreadPoolProperties);
}
@Override
public void start() {
}
@Override
public Result httpGet(String path, Map<String, String> headers, Map<String, String> paramValues, long readTimeoutMs) {
return httpClientUtil.restApiGetByThreadPool(buildUrl(path), headers, paramValues, readTimeoutMs, Result.class);
}
@Override
public Result httpPost(String path, Map<String, String> headers, Map<String, String> paramValues, long readTimeoutMs) {
return httpClientUtil.restApiPostByThreadPool(buildUrl(path), headers, paramValues, readTimeoutMs, Result.class);
}
@Override
public Result httpDelete(String path, Map<String, String> headers, Map<String, String> paramValues, long readTimeoutMs) {
return null;
}
@Override
public String getNameSpace() {
return dynamicThreadPoolProperties.getNamespace();
}
@Override
public String getEncode() {
return null;
}
private String buildUrl(String path) {
return serverListManager.getCurrentServerAddr() + path;
}
}

@ -0,0 +1,126 @@
package io.dynamic.threadpool.starter.remote;
import io.dynamic.threadpool.starter.config.DynamicThreadPoolProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import java.util.*;
/**
* Server List Manager.
*
* @author chen.ma
* @date 2021/6/23 20:42
*/
@Slf4j
public class ServerListManager {
private static final String HTTPS = "https://";
private static final String HTTP = "http://";
private String serverAddrsStr;
volatile List<String> serverUrls = new ArrayList();
private volatile String currentServerAddr;
private Iterator<String> iterator;
private final DynamicThreadPoolProperties properties;
public ServerListManager(DynamicThreadPoolProperties dynamicThreadPoolProperties) {
this.properties = dynamicThreadPoolProperties;
serverAddrsStr = properties.getServerAddr();
if (!StringUtils.isEmpty(serverAddrsStr)) {
List<String> serverAddrs = new ArrayList();
String[] serverAddrsArr = this.serverAddrsStr.split(",");
for (String serverAddr : serverAddrsArr) {
if (serverAddr.startsWith(HTTPS) || serverAddr.startsWith(HTTP)) {
// TODO 固定写,后面优化
currentServerAddr = serverAddr;
serverAddrs.add(serverAddr);
}
}
this.serverUrls = serverAddrs;
}
}
public String getCurrentServerAddr() {
if (StringUtils.isEmpty(currentServerAddr)) {
iterator = iterator();
currentServerAddr = iterator.next();
}
return currentServerAddr;
}
Iterator<String> iterator() {
if (serverUrls.isEmpty()) {
log.error("[iterator-serverlist] No server address defined!");
}
return new ServerAddressIterator(serverUrls);
}
private static class ServerAddressIterator implements Iterator<String> {
final List<RandomizedServerAddress> sorted;
final Iterator<RandomizedServerAddress> iter;
public ServerAddressIterator(List<String> source) {
sorted = new ArrayList<RandomizedServerAddress>();
for (String address : source) {
sorted.add(new RandomizedServerAddress(address));
}
Collections.sort(sorted);
iter = sorted.iterator();
}
@Override
public boolean hasNext() {
return false;
}
@Override
public String next() {
return null;
}
static class RandomizedServerAddress implements Comparable<RandomizedServerAddress> {
static Random random = new Random();
String serverIp;
int priority = 0;
int seed;
public RandomizedServerAddress(String ip) {
try {
this.serverIp = ip;
/*
change random scope from 32 to Integer.MAX_VALUE to fix load balance issue
*/
this.seed = random.nextInt(Integer.MAX_VALUE);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public int compareTo(RandomizedServerAddress other) {
if (this.priority != other.priority) {
return other.priority - this.priority;
} else {
return other.seed - this.seed;
}
}
}
}
}

@ -5,9 +5,10 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Http
@ -48,7 +49,7 @@ public class HttpClientUtil {
* @param queryString
* @return
*/
public String get(String url, Map<String, Object> queryString) {
public String get(String url, Map<String, String> queryString) {
String fullUrl = buildUrl(url, queryString);
return get(fullUrl);
}
@ -74,7 +75,7 @@ public class HttpClientUtil {
* @param <T>
* @return
*/
public <T> T restApiGet(String url, Map<String, Object> queryString, Class<T> clazz) {
public <T> T restApiGet(String url, Map<String, String> queryString, Class<T> clazz) {
String fullUrl = buildUrl(url, queryString);
String resp = get(fullUrl);
return JSON.parseObject(resp, clazz);
@ -116,7 +117,7 @@ public class HttpClientUtil {
* @param queryString
* @return
*/
public String buildUrl(String url, Map<String, Object> queryString) {
public String buildUrl(String url, Map<String, String> queryString) {
if (null == queryString) {
return url;
}
@ -124,7 +125,7 @@ public class HttpClientUtil {
StringBuilder builder = new StringBuilder(url);
boolean isFirst = true;
for (Map.Entry<String, Object> entry : queryString.entrySet()) {
for (Map.Entry<String, String> entry : queryString.entrySet()) {
String key = entry.getKey();
if (key != null && entry.getValue() != null) {
if (isFirst) {
@ -169,4 +170,50 @@ public class HttpClientUtil {
return resp.body().bytes();
}
@SneakyThrows
public <T> T restApiGetByThreadPool(String url, Map<String, String> headers, Map<String, String> paramValues, Long readTimeoutMs, Class<T> clazz) {
String buildUrl = buildUrl(url, paramValues);
Request.Builder builder = new Request.Builder().get();
if (!CollectionUtils.isEmpty(headers)) {
builder.headers(Headers.of(headers));
}
Request request = builder.url(buildUrl).build();
Call call = okHttpClient.newCall(request);
call.timeout().timeout(readTimeoutMs, TimeUnit.MILLISECONDS);
Response resp = call.execute();
if (resp.code() != HTTP_OK_CODE) {
String msg = String.format("HttpGet 响应 code 异常. [code] %s [url] %s", resp.code(), url);
log.error(msg);
throw new RuntimeException(msg);
}
return JSON.parseObject(resp.body().string(), clazz);
}
@SneakyThrows
public <T> T restApiPostByThreadPool(String url, Map<String, String> headers, Map<String, String> paramValues, Long readTimeoutMs, Class<T> clazz) {
String buildUrl = buildUrl(url, paramValues);
Request request = new Request.Builder()
.url(buildUrl)
.headers(Headers.of(headers))
.post(RequestBody.create(jsonMediaType, ""))
.build();
Call call = okHttpClient.newCall(request);
call.timeout().timeout(readTimeoutMs, TimeUnit.MILLISECONDS);
Response resp = call.execute();
if (resp.code() != HTTP_OK_CODE) {
String msg = String.format("HttpPost 响应 code 异常. [code] %s [url] %s.", resp.code(), url);
log.error(msg);
throw new RuntimeException(msg);
}
return JSON.parseObject(resp.body().string(), clazz);
}
}

@ -8,8 +8,8 @@
},
{
"name": "spring.dynamic.thread-pool.enabled",
"type": "java.lang.Boolean",
"defaultValue": false,
"type": "java.lang.String",
"defaultValue": "false",
"description": "dynamic thread-pool enabled."
},
{

@ -1,2 +1,3 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=io.dynamic.threadpool.starter.config.CommonConfiguration, \
io.dynamic.threadpool.starter.config.OkHttpClientConfig
io.dynamic.threadpool.starter.config.OkHttpClientConfig,\
io.dynamic.threadpool.starter.config.DynamicThreadPoolAutoConfiguration

@ -2,6 +2,6 @@ spring:
dynamic:
thread-pool:
enabled: true
server-addr: localhost:6691
namespace: public
item-id: message-center
server-addr: http://localhost:6691
namespace: common
item-id: message

@ -41,6 +41,11 @@
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
<dependency>
<groupId>io.dynamic-threadpool</groupId>
<artifactId>common</artifactId>
</dependency>
</dependencies>
<build>

@ -3,6 +3,9 @@ package io.dynamic.threadpool.server.controller;
import io.dynamic.threadpool.server.constant.Constants;
import io.dynamic.threadpool.server.model.ConfigInfoBase;
import io.dynamic.threadpool.server.service.ConfigService;
import io.dynamic.threadpool.server.service.ConfigServletInner;
import io.dynamict.hreadpool.common.web.base.Result;
import io.dynamict.hreadpool.common.web.base.Results;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.StringUtils;
@ -25,13 +28,16 @@ public class ConfigController {
@Autowired
private ConfigService configService;
@Autowired
private ConfigServletInner configServletInner;
@GetMapping
public ConfigInfoBase detailConfigInfo(
@RequestParam("tdId") String tdId,
public Result<ConfigInfoBase> detailConfigInfo(
@RequestParam("tpId") String tpId,
@RequestParam("itemId") String itemId,
@RequestParam(value = "namespace", required = false, defaultValue = "") String namespace) {
@RequestParam(value = "namespace") String namespace) {
return configService.findConfigAllInfo(tdId, itemId, namespace);
return Results.success(configService.findConfigAllInfo(tpId, itemId, namespace));
}
@SneakyThrows
@ -45,6 +51,8 @@ public class ConfigController {
}
probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
configServletInner.doPollingConfig(request, response, null, probeModify.length());
}
}

@ -0,0 +1,12 @@
package io.dynamic.threadpool.server.event;
import io.dynamic.threadpool.server.notify.Event;
/**
* Local Data Change Event.
*
* @author chen.ma
* @date 2021/6/23 19:13
*/
public class LocalDataChangeEvent extends Event {
}

@ -0,0 +1,20 @@
package io.dynamic.threadpool.server.notify;
import cn.hutool.core.collection.ConcurrentHashSet;
import io.dynamic.threadpool.server.notify.listener.Subscriber;
/**
* The default event publisher implementation.
*
* @author chen.ma
* @date 2021/6/23 19:06
*/
public class DefaultPublisher extends Thread implements EventPublisher {
protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet();
@Override
public void addSubscriber(Subscriber subscriber) {
subscribers.add(subscriber);
}
}

@ -0,0 +1,44 @@
package io.dynamic.threadpool.server.notify;
import cn.hutool.core.collection.ConcurrentHashSet;
import io.dynamic.threadpool.server.notify.listener.Subscriber;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Default Share Publisher.
*
* @author chen.ma
* @date 2021/6/23 19:05
*/
public class DefaultSharePublisher {
private final Map<Class<? extends SlowEvent>, Set<Subscriber>> subMappings = new ConcurrentHashMap();
protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet();
private final Lock lock = new ReentrantLock();
public void addSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType) {
Class<? extends SlowEvent> subSlowEventType = (Class<? extends SlowEvent>) subscribeType;
subscribers.add(subscriber);
lock.lock();
try {
Set<Subscriber> sets = subMappings.get(subSlowEventType);
if (sets == null) {
Set<Subscriber> newSet = new ConcurrentHashSet<Subscriber>();
newSet.add(subscriber);
subMappings.put(subSlowEventType, newSet);
return;
}
sets.add(subscriber);
} finally {
lock.unlock();
}
}
}

@ -0,0 +1,26 @@
package io.dynamic.threadpool.server.notify;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
/**
* An abstract class for event.
*
* @author chen.ma
* @date 2021/6/23 18:59
*/
public abstract class Event implements Serializable {
private static final AtomicLong SEQUENCE = new AtomicLong(0);
private final long sequence = SEQUENCE.getAndIncrement();
/**
* Event sequence number, which can be used to handle the sequence of events.
*
* @return sequence num, It's best to make sure it's monotone.
*/
public long sequence() {
return sequence;
}
}

@ -0,0 +1,15 @@
package io.dynamic.threadpool.server.notify;
import io.dynamic.threadpool.server.notify.listener.Subscriber;
/**
* Event publisher.
*
* @author chen.ma
* @date 2021/6/23 18:58
*/
public interface EventPublisher {
void addSubscriber(Subscriber subscriber);
}

@ -0,0 +1,62 @@
package io.dynamic.threadpool.server.notify;
import io.dynamic.threadpool.server.notify.listener.SmartSubscriber;
import io.dynamic.threadpool.server.notify.listener.Subscriber;
import io.dynamic.threadpool.server.toolkit.ClassUtil;
import io.dynamic.threadpool.server.toolkit.MapUtil;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
/**
* Unified Event Notify Center.
*
* @author chen.ma
* @date 2021/6/23 18:58
*/
public class NotifyCenter {
private static final NotifyCenter INSTANCE = new NotifyCenter();
public static int ringBufferSize = 16384;
private DefaultSharePublisher sharePublisher;
private static BiFunction<Class<? extends Event>, Integer, EventPublisher> publisherFactory = null;
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap(16);
public static <T> void registerSubscriber(final Subscriber consumer) {
if (consumer instanceof SmartSubscriber) {
for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
if (ClassUtil.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
} else {
addSubscriber(consumer, subscribeType);
}
}
return;
}
final Class<? extends Event> subscribeType = consumer.subscribeType();
if (ClassUtil.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
return;
}
addSubscriber(consumer, subscribeType);
}
private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType) {
final String topic = ClassUtil.getCanonicalName(subscribeType);
synchronized (NotifyCenter.class) {
// MapUtils.computeIfAbsent is a unsafe method.
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, subscribeType, ringBufferSize);
}
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
publisher.addSubscriber(consumer);
}
}

@ -0,0 +1,14 @@
package io.dynamic.threadpool.server.notify;
/**
* Slow Event.
*
* @author chen.ma
* @date 2021/6/23 19:05
*/
public abstract class SlowEvent extends Event {
@Override
public long sequence() {
return 0;
}
}

@ -0,0 +1,16 @@
package io.dynamic.threadpool.server.notify.listener;
import io.dynamic.threadpool.server.notify.Event;
import java.util.List;
/**
* Subscribers to multiple events can be listened to.
*
* @author chen.ma
* @date 2021/6/23 19:02
*/
public abstract class SmartSubscriber extends Subscriber {
public abstract List<Class<? extends Event>> subscribeTypes();
}

@ -0,0 +1,27 @@
package io.dynamic.threadpool.server.notify.listener;
import io.dynamic.threadpool.server.notify.Event;
/**
* An abstract subscriber class for subscriber interface.
*
* @author chen.ma
* @date 2021/6/23 19:02
*/
public abstract class Subscriber<T extends Event> {
/**
* Event callback.
*
* @param event
*/
public abstract void onEvent(T event);
/**
* Type of this subscriber's subscription.
*
* @return
*/
public abstract Class<? extends Event> subscribeType();
}

@ -0,0 +1,29 @@
package io.dynamic.threadpool.server.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Map;
/**
* Config Servlet Inner.
*
* @author chen.ma
* @date 2021/6/22 23:13
*/
@Service
public class ConfigServletInner {
@Autowired
private LongPollingService longPollingService;
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map<String, String> clientMd5Map, int probeRequestSize) {
if (LongPollingService.isSupportLongPolling(request)) {
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}
return HttpServletResponse.SC_OK + "";
}
}

@ -0,0 +1,184 @@
package io.dynamic.threadpool.server.service;
import io.dynamic.threadpool.server.event.LocalDataChangeEvent;
import io.dynamic.threadpool.server.notify.Event;
import io.dynamic.threadpool.server.notify.NotifyCenter;
import io.dynamic.threadpool.server.notify.listener.Subscriber;
import io.dynamic.threadpool.server.toolkit.ConfigExecutor;
import io.dynamic.threadpool.server.toolkit.RequestUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
*
*
* @author chen.ma
* @date 2021/6/22 23:14
*/
@Slf4j
@Service
public class LongPollingService {
private static final int FIXED_POLLING_INTERVAL_MS = 10000;
public static final String LONG_POLLING_HEADER = "Long-Pulling-Timeout";
public static final String LONG_POLLING_NO_HANG_UP_HEADER = "Long-Pulling-Timeout-No-Hangup";
public static final String CLIENT_APPNAME_HEADER = "Client-AppName";
private Map<String, Long> retainIps = new ConcurrentHashMap();
public LongPollingService() {
allSubs = new ConcurrentLinkedQueue();
ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
if (isFixedPolling()) {
// Ignore.
} else {
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
// ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
}
}
}
@Override
public Class<? extends Event> subscribeType() {
return LocalDataChangeEvent.class;
}
});
}
class StatTask implements Runnable {
@Override
public void run() {
log.info("[long-pulling] client count " + allSubs.size());
}
}
class DataChangeTask implements Runnable {
final String groupKey;
final long changeTime = System.currentTimeMillis();
final boolean isBeta;
final List<String> betaIps;
DataChangeTask(String groupKey, boolean isBeta, List<String> betaIps) {
this.groupKey = groupKey;
this.isBeta = isBeta;
this.betaIps = betaIps;
}
@Override
public void run() {
}
}
public static boolean isSupportLongPolling(HttpServletRequest req) {
return null != req.getHeader(LONG_POLLING_HEADER);
}
private static boolean isFixedPolling() {
return SwitchService.getSwitchBoolean(SwitchService.FIXED_POLLING, false);
}
private static int getFixedPollingInterval() {
return SwitchService.getSwitchInteger(SwitchService.FIXED_POLLING_INTERVAL, FIXED_POLLING_INTERVAL_MS);
}
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {
String str = req.getHeader(LONG_POLLING_HEADER);
String noHangUpFlag = req.getHeader(LONG_POLLING_NO_HANG_UP_HEADER);
String appName = req.getHeader(CLIENT_APPNAME_HEADER);
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
} else {
}
String ip = RequestUtil.getRemoteIp(req);
final AsyncContext asyncContext = req.startAsync();
asyncContext.setTimeout(0L);
ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName));
}
final Queue<ClientLongPolling> allSubs;
class ClientLongPolling implements Runnable {
final AsyncContext asyncContext;
final Map<String, String> clientMd5Map;
final long createTime;
final String ip;
final String appName;
final int probeRequestSize;
final long timeoutTime;
Future<?> asyncTimeoutFuture;
public ClientLongPolling(AsyncContext asyncContext, Map<String, String> clientMd5Map, String ip, int probeRequestSize, long timeout, String appName) {
this.asyncContext = asyncContext;
this.clientMd5Map = clientMd5Map;
this.ip = ip;
this.probeRequestSize = probeRequestSize;
this.timeoutTime = timeout;
this.appName = appName;
this.createTime = System.currentTimeMillis();
}
@Override
public void run() {
asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(() -> {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
allSubs.remove(ClientLongPolling.this);
if (isFixedPolling()) {
}
}, timeoutTime, TimeUnit.MILLISECONDS);
allSubs.add(this);
}
}
public Map<String, Long> getRetainIps() {
return retainIps;
}
}

@ -0,0 +1,50 @@
package io.dynamic.threadpool.server.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
/**
* SwitchService.
*
* @author chen.ma
* @date 2021/6/23 18:23
*/
@Slf4j
@Service
public class SwitchService {
public static final String FIXED_DELAY_TIME = "fixedDelayTime";
public static final String FIXED_POLLING = "isFixedPolling";
public static final String FIXED_POLLING_INTERVAL = "fixedPollingInertval";
private static volatile Map<String, String> switches = new HashMap(16);
public static int getSwitchInteger(String key, int defaultValue) {
int rtn = defaultValue;
try {
String status = switches.get(key);
rtn = status != null ? Integer.parseInt(status) : defaultValue;
} catch (Exception e) {
rtn = defaultValue;
log.error("corrupt switch value {}, {}", key, switches.get(key));
}
return rtn;
}
public static boolean getSwitchBoolean(String key, boolean defaultValue) {
boolean rtn = defaultValue;
try {
String value = switches.get(key);
rtn = value != null ? Boolean.parseBoolean(value) : defaultValue;
} catch (Exception e) {
rtn = defaultValue;
log.error("corrupt switch value {}, {}", key, switches.get(key));
}
return rtn;
}
}

@ -0,0 +1,22 @@
package io.dynamic.threadpool.server.toolkit;
import java.util.Objects;
/**
* Class Util.
*
* @author chen.ma
* @date 2021/6/23 19:03
*/
public class ClassUtil {
public static boolean isAssignableFrom(Class clazz, Class cls) {
Objects.requireNonNull(cls, "cls");
return clazz.isAssignableFrom(cls);
}
public static String getCanonicalName(Class cls) {
Objects.requireNonNull(cls, "cls");
return cls.getCanonicalName();
}
}

@ -0,0 +1,35 @@
package io.dynamic.threadpool.server.toolkit;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import io.dynamict.hreadpool.common.executor.ExecutorFactory;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* Config Executor.
*
* @author chen.ma
* @date 2021/6/23 18:33
*/
public class ConfigExecutor {
private static final ScheduledExecutorService LONG_POLLING_EXECUTOR = ExecutorFactory.Managed
.newSingleScheduledExecutorService("default group",
ThreadFactoryBuilder.create()
.setNamePrefix("io.dynamic.threadPool.config.LongPolling")
.build());
public static void executeLongPolling(Runnable runnable) {
LONG_POLLING_EXECUTOR.execute(runnable);
}
public static ScheduledFuture<?> scheduleLongPolling(Runnable runnable, long period, TimeUnit unit) {
return LONG_POLLING_EXECUTOR.schedule(runnable, period, unit);
}
public static void scheduleLongPolling(Runnable runnable, long initialDelay, long period, TimeUnit unit) {
LONG_POLLING_EXECUTOR.scheduleWithFixedDelay(runnable, initialDelay, period, unit);
}
}

@ -0,0 +1,30 @@
package io.dynamic.threadpool.server.toolkit;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
/**
* Map Util.
*
* @author chen.ma
* @date 2021/6/23 19:09
*/
public class MapUtil {
public static Object computeIfAbsent(Map target, Object key, BiFunction mappingFunction, Object param1, Object param2) {
Objects.requireNonNull(target, "target");
Objects.requireNonNull(key, "key");
Objects.requireNonNull(mappingFunction, "mappingFunction");
Objects.requireNonNull(param1, "param1");
Objects.requireNonNull(param2, "param2");
Object val = target.get(key);
if (val == null) {
Object ret = mappingFunction.apply(param1, param2);
target.put(key, ret);
return ret;
}
return val;
}
}

@ -0,0 +1,31 @@
package io.dynamic.threadpool.server.toolkit;
import org.springframework.util.StringUtils;
import javax.servlet.http.HttpServletRequest;
/**
* Request Util.
*
* @author chen.ma
* @date 2021/6/23 18:28
*/
public class RequestUtil {
private static final String X_REAL_IP = "X-Real-IP";
private static final String X_FORWARDED_FOR = "X-Forwarded-For";
private static final String X_FORWARDED_FOR_SPLIT_SYMBOL = ",";
public static final String CLIENT_APPNAME_HEADER = "Client-AppName";
public static String getRemoteIp(HttpServletRequest request) {
String xForwardedFor = request.getHeader(X_FORWARDED_FOR);
if (!StringUtils.isEmpty(xForwardedFor)) {
return xForwardedFor.split(X_FORWARDED_FOR_SPLIT_SYMBOL)[0].trim();
}
String nginxHeader = request.getHeader(X_REAL_IP);
return StringUtils.isEmpty(nginxHeader) ? request.getRemoteAddr() : nginxHeader;
}
}
Loading…
Cancel
Save