Modify the first registration exception of the client.

pull/161/head
chen.ma 3 years ago
parent baec208a7a
commit 21e9c83d10

@ -38,6 +38,8 @@ public class Constants {
public static final String LONG_PULLING_TIMEOUT = "Long-Pulling-Timeout";
public static final String LONG_PULLING_TIMEOUT_NO_HANGUP = "Long-Pulling-Timeout-No-Hangup";
public static final String LISTENING_CONFIGS = "Listening-Configs";
public static final String GROUP_KEY_DELIMITER = "+";

@ -35,8 +35,12 @@ public class LongPollingService {
private static final int FIXED_POLLING_INTERVAL_MS = 10000;
private static final String TRUE_STR = "true";
public static final String LONG_POLLING_HEADER = "Long-Pulling-Timeout";
public static final String LONG_POLLING_NO_HANG_UP_HEADER = "Long-Pulling-Timeout-No-Hangup";
public static final String CLIENT_APPNAME_HEADER = "Client-AppName";
private Map<String, Long> retainIps = new ConcurrentHashMap();
@ -122,16 +126,23 @@ public class LongPollingService {
int probeRequestSize) {
String str = req.getHeader(LONG_POLLING_HEADER);
String appName = req.getHeader(CLIENT_APPNAME_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
} else {
long start = System.currentTimeMillis();
List<String> changedGroups = Md5ConfigUtil.compareMd5(req, clientMd5Map);
if (changedGroups.size() > 0) {
generateResponse(rsp, changedGroups);
return;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
log.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
}
}

@ -29,6 +29,8 @@ public class CacheData {
private int taskId;
private volatile boolean isInitializing = true;
private volatile long localConfigLastModified;
private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
@ -92,4 +94,12 @@ public class CacheData {
this.taskId = taskId;
}
public boolean isInitializing() {
return isInitializing;
}
public void setInitializing(boolean isInitializing) {
this.isInitializing = isInitializing;
}
}

@ -2,7 +2,6 @@ package com.github.dynamic.threadpool.starter.core;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.github.dynamic.threadpool.common.constant.Constants;
import com.github.dynamic.threadpool.common.model.PoolParameterInfo;
import com.github.dynamic.threadpool.common.toolkit.ContentUtil;
import com.github.dynamic.threadpool.common.toolkit.GroupKey;
@ -10,7 +9,6 @@ import com.github.dynamic.threadpool.common.web.base.Result;
import com.github.dynamic.threadpool.starter.remote.HttpAgent;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.net.URLDecoder;
@ -19,10 +17,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.github.dynamic.threadpool.common.constant.Constants.LINE_SEPARATOR;
import static com.github.dynamic.threadpool.common.constant.Constants.WORD_SEPARATOR;
import static com.github.dynamic.threadpool.common.constant.Constants.*;
/**
* Client Worker.
@ -50,7 +46,7 @@ public class ClientWorker {
@SuppressWarnings("all")
public ClientWorker(HttpAgent httpAgent) {
this.agent = httpAgent;
this.timeout = Constants.CONFIG_LONG_POLL_TIMEOUT;
this.timeout = CONFIG_LONG_POLL_TIMEOUT;
this.executor = Executors.newScheduledThreadPool(1, r -> {
Thread t = new Thread(r);
@ -106,62 +102,70 @@ public class ClientWorker {
checkStatus();
List<CacheData> cacheDataList = new ArrayList();
List<CacheData> queryCacheDataList = cacheMap.entrySet()
.stream().map(each -> each.getValue()).collect(Collectors.toList());
List<String> changedTpIds = checkUpdateDataIds(queryCacheDataList);
if (!CollectionUtils.isEmpty(changedTpIds)) {
/**
*
* log.info("[dynamic threadPool] tpIds changed :: {}", changedTpIds);
*/
for (String each : changedTpIds) {
String[] keys = StrUtil.split(each, Constants.GROUP_KEY_DELIMITER);
String tpId = keys[0];
String itemId = keys[1];
String namespace = keys[2];
try {
String content = getServerConfig(namespace, itemId, tpId, 3000L);
CacheData cacheData = cacheMap.get(tpId);
String poolContent = ContentUtil.getPoolContent(JSON.parseObject(content, PoolParameterInfo.class));
cacheData.setContent(poolContent);
cacheDataList.add(cacheData);
/**
*
* log.info("[data-received] namespace :: {}, itemId :: {}, tpId :: {}, md5 :: {}", namespace, itemId, tpId, cacheData.getMd5());
*/
} catch (Exception ex) {
// ignore
}
List<String> inInitializingCacheList = new ArrayList();
cacheMap.forEach((key, val) -> cacheDataList.add(val));
List<String> changedTpIds = checkUpdateDataIds(cacheDataList, inInitializingCacheList);
for (String each : changedTpIds) {
String[] keys = StrUtil.split(each, GROUP_KEY_DELIMITER);
String tpId = keys[0];
String itemId = keys[1];
String namespace = keys[2];
try {
String content = getServerConfig(namespace, itemId, tpId, 3000L);
CacheData cacheData = cacheMap.get(tpId);
String poolContent = ContentUtil.getPoolContent(JSON.parseObject(content, PoolParameterInfo.class));
cacheData.setContent(poolContent);
/**
*
* log.info("[data-received] namespace :: {}, itemId :: {}, tpId :: {}, md5 :: {}", namespace, itemId, tpId, cacheData.getMd5());
*/
} catch (Exception ex) {
// ignore
}
}
for (CacheData each : cacheDataList) {
each.checkListenerMd5();
for (CacheData cacheData : cacheDataList) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.tpId, cacheData.itemId, cacheData.tenantId))) {
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
executorService.execute(this);
}
}
private List<String> checkUpdateDataIds(List<CacheData> cacheDataList) {
private List<String> checkUpdateDataIds(List<CacheData> cacheDataList, List<String> inInitializingCacheList) {
StringBuilder sb = new StringBuilder();
for (CacheData cacheData : cacheDataList) {
sb.append(cacheData.tpId).append(WORD_SEPARATOR);
sb.append(cacheData.itemId).append(WORD_SEPARATOR);
sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
sb.append(cacheData.tenantId).append(LINE_SEPARATOR);
if (cacheData.isInitializing()) {
inInitializingCacheList.add(GroupKey.getKeyTenant(cacheData.tpId, cacheData.itemId, cacheData.tenantId));
}
}
return checkUpdateTpIds(sb.toString());
boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
return checkUpdateTpIds(sb.toString(), isInitializingCacheList);
}
public List<String> checkUpdateTpIds(String probeUpdateString) {
public List<String> checkUpdateTpIds(String probeUpdateString, boolean isInitializingCacheList) {
Map<String, String> params = new HashMap(2);
params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
params.put(PROBE_MODIFY_REQUEST, probeUpdateString);
Map<String, String> headers = new HashMap(2);
headers.put(Constants.LONG_PULLING_TIMEOUT, "" + timeout);
headers.put(LONG_PULLING_TIMEOUT, "" + timeout);
// told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) {
headers.put(LONG_PULLING_TIMEOUT_NO_HANGUP, "true");
}
if (StringUtils.isEmpty(probeUpdateString)) {
return Collections.emptyList();
@ -169,7 +173,7 @@ public class ClientWorker {
try {
long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
Result result = agent.httpPostByConfig(Constants.LISTENER_PATH, headers, params, readTimeoutMs);
Result result = agent.httpPostByConfig(LISTENER_PATH, headers, params, readTimeoutMs);
/**
*
* log.warn("[check-update] get changed dataId error, code: {}", result == null ? "error" : result.getCode());
@ -192,14 +196,14 @@ public class ClientWorker {
params.put("itemId", itemId);
params.put("tpId", tpId);
Result result = agent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, params, readTimeout);
Result result = agent.httpGetByConfig(CONFIG_CONTROLLER_PATH, null, params, readTimeout);
if (result.isSuccess()) {
return result.getData().toString();
}
log.error("[sub-server-error] namespace :: {}, itemId :: {}, tpId :: {}, result code :: {}",
namespace, itemId, tpId, result.getCode());
return Constants.NULL;
return NULL;
}
public List<String> parseUpdateDataIdResponse(String response) {
@ -260,7 +264,7 @@ public class ClientWorker {
log.error("[Cache Data] Error. Service Unavailable :: {}", ex.getMessage());
}
int taskId = cacheMap.size() / Constants.CONFIG_LONG_POLL_TIMEOUT;
int taskId = cacheMap.size() / CONFIG_LONG_POLL_TIMEOUT;
cacheData.setTaskId(taskId);
lastCacheData = cacheData;

Loading…
Cancel
Save