From f94c1bfc8f32657c3acc814ad67f38d50f5c9d74 Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Sat, 23 Oct 2021 00:51:21 +0800 Subject: [PATCH] =?UTF-8?q?feature:=20=E4=BF=AE=E5=A4=8D=20Server=20?= =?UTF-8?q?=E7=AB=AF=E9=87=8D=E5=90=AF=E5=90=8E=20Client=20=E7=AB=AF?= =?UTF-8?q?=E6=97=A0=E6=B3=95=E8=AE=A2=E9=98=85=E9=97=AE=E9=A2=98,=20?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E6=89=93=E5=8D=B0=E4=BF=AE=E6=94=B9.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../threadpool/starter/core/ClientWorker.java | 36 +++++++++---------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/ClientWorker.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/ClientWorker.java index a895b819..bf80561f 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/ClientWorker.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/ClientWorker.java @@ -17,6 +17,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static com.github.dynamic.threadpool.common.constant.Constants.*; @@ -33,14 +34,14 @@ public class ClientWorker { private long timeout; - private boolean isHealthServer = true; - private final HttpAgent agent; private final ScheduledExecutorService executor; private final ScheduledExecutorService executorService; + private AtomicBoolean isHealthServer = new AtomicBoolean(true); + private final ConcurrentHashMap cacheMap = new ConcurrentHashMap(16); @SuppressWarnings("all") @@ -67,7 +68,7 @@ public class ClientWorker { try { checkConfigInfo(); } catch (Throwable e) { - log.error("[sub-check] rotate check error", e); + log.error("[Sub check] rotate check error", e); } }, 1L, 10L, TimeUnit.MILLISECONDS); } @@ -90,7 +91,7 @@ public class ClientWorker { @SneakyThrows private void checkStatus() { // 服务端状态不正常睡眠 30s - if (!isHealthServer) { + if (!isHealthServer.get()) { log.error("[Check config] Error. exception message, Thread sleep 30 s."); Thread.sleep(30000); } @@ -117,10 +118,6 @@ public class ClientWorker { CacheData cacheData = cacheMap.get(tpId); String poolContent = ContentUtil.getPoolContent(JSON.parseObject(content, PoolParameterInfo.class)); cacheData.setContent(poolContent); - /** - * 考虑是否加入日志 - * log.info("[data-received] namespace :: {}, itemId :: {}, tpId :: {}, md5 :: {}", namespace, itemId, tpId, cacheData.getMd5()); - */ } catch (Exception ex) { // ignore } @@ -174,17 +171,16 @@ public class ClientWorker { try { long readTimeoutMs = timeout + (long) Math.round(timeout >> 1); Result result = agent.httpPostByConfig(LISTENER_PATH, headers, params, readTimeoutMs); - /** - * 考虑是否加入错误日志 - * log.warn("[check-update] get changed dataId error, code: {}", result == null ? "error" : result.getCode()); - */ + + // Server 端重启后会进入非健康状态, 不进入 catch 则为健康调用 + isHealthServer.set(true); if (result != null && result.isSuccess()) { setHealthServer(true); return parseUpdateDataIdResponse(result.getData().toString()); } } catch (Exception ex) { setHealthServer(false); - log.error("[check-update] get changed dataId exception. error message :: {}", ex.getMessage()); + log.error("[Check update] get changed dataId exception. error message :: {}", ex.getMessage()); } return Collections.emptyList(); @@ -201,7 +197,7 @@ public class ClientWorker { return result.getData().toString(); } - log.error("[sub-server-error] namespace :: {}, itemId :: {}, tpId :: {}, result code :: {}", + log.error("[Sub server] namespace :: {}, itemId :: {}, tpId :: {}, result code :: {}", namespace, itemId, tpId, result.getCode()); return NULL; } @@ -214,7 +210,7 @@ public class ClientWorker { try { response = URLDecoder.decode(response, "UTF-8"); } catch (Exception e) { - log.error("[polling-resp] decode modifiedDataIdsString error", e); + log.error("[Polling resp] decode modifiedDataIdsString error", e); } @@ -226,13 +222,13 @@ public class ClientWorker { String group = keyArr[1]; if (keyArr.length == 2) { updateList.add(GroupKey.getKey(dataId, group)); - log.info("[{}] [polling-resp] config changed. dataId={}, group={}", dataId, group); + log.info("[{}] [Polling resp] config changed. dataId={}, group={}", dataId, group); } else if (keyArr.length == 3) { String tenant = keyArr[2]; updateList.add(GroupKey.getKeyTenant(dataId, group, tenant)); - log.info("[polling-resp] config changed. dataId={}, group={}, tenant={}", dataId, group, tenant); + log.info("[Polling resp] config changed. dataId={}, group={}, tenant={}", dataId, group, tenant); } else { - log.error("[{}] [polling-resp] invalid dataIdAndGroup error {}", dataIdAndGroup); + log.error("[{}] [Polling resp] invalid dataIdAndGroup error {}", dataIdAndGroup); } } } @@ -274,11 +270,11 @@ public class ClientWorker { } public boolean isHealthServer() { - return this.isHealthServer; + return this.isHealthServer.get(); } private void setHealthServer(boolean isHealthServer) { - this.isHealthServer = isHealthServer; + this.isHealthServer.set(isHealthServer); } }