feature: 修复 Server 端重启后 Client 端无法订阅问题, 日志打印修改.

pull/161/head
chen.ma 3 years ago
parent 056c403da3
commit f94c1bfc8f

@ -17,6 +17,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.github.dynamic.threadpool.common.constant.Constants.*;
@ -33,14 +34,14 @@ public class ClientWorker {
private long timeout;
private boolean isHealthServer = true;
private final HttpAgent agent;
private final ScheduledExecutorService executor;
private final ScheduledExecutorService executorService;
private AtomicBoolean isHealthServer = new AtomicBoolean(true);
private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap(16);
@SuppressWarnings("all")
@ -67,7 +68,7 @@ public class ClientWorker {
try {
checkConfigInfo();
} catch (Throwable e) {
log.error("[sub-check] rotate check error", e);
log.error("[Sub check] rotate check error", e);
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
@ -90,7 +91,7 @@ public class ClientWorker {
@SneakyThrows
private void checkStatus() {
// 服务端状态不正常睡眠 30s
if (!isHealthServer) {
if (!isHealthServer.get()) {
log.error("[Check config] Error. exception message, Thread sleep 30 s.");
Thread.sleep(30000);
}
@ -117,10 +118,6 @@ public class ClientWorker {
CacheData cacheData = cacheMap.get(tpId);
String poolContent = ContentUtil.getPoolContent(JSON.parseObject(content, PoolParameterInfo.class));
cacheData.setContent(poolContent);
/**
*
* log.info("[data-received] namespace :: {}, itemId :: {}, tpId :: {}, md5 :: {}", namespace, itemId, tpId, cacheData.getMd5());
*/
} catch (Exception ex) {
// ignore
}
@ -174,17 +171,16 @@ public class ClientWorker {
try {
long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
Result result = agent.httpPostByConfig(LISTENER_PATH, headers, params, readTimeoutMs);
/**
*
* log.warn("[check-update] get changed dataId error, code: {}", result == null ? "error" : result.getCode());
*/
// Server 端重启后会进入非健康状态, 不进入 catch 则为健康调用
isHealthServer.set(true);
if (result != null && result.isSuccess()) {
setHealthServer(true);
return parseUpdateDataIdResponse(result.getData().toString());
}
} catch (Exception ex) {
setHealthServer(false);
log.error("[check-update] get changed dataId exception. error message :: {}", ex.getMessage());
log.error("[Check update] get changed dataId exception. error message :: {}", ex.getMessage());
}
return Collections.emptyList();
@ -201,7 +197,7 @@ public class ClientWorker {
return result.getData().toString();
}
log.error("[sub-server-error] namespace :: {}, itemId :: {}, tpId :: {}, result code :: {}",
log.error("[Sub server] namespace :: {}, itemId :: {}, tpId :: {}, result code :: {}",
namespace, itemId, tpId, result.getCode());
return NULL;
}
@ -214,7 +210,7 @@ public class ClientWorker {
try {
response = URLDecoder.decode(response, "UTF-8");
} catch (Exception e) {
log.error("[polling-resp] decode modifiedDataIdsString error", e);
log.error("[Polling resp] decode modifiedDataIdsString error", e);
}
@ -226,13 +222,13 @@ public class ClientWorker {
String group = keyArr[1];
if (keyArr.length == 2) {
updateList.add(GroupKey.getKey(dataId, group));
log.info("[{}] [polling-resp] config changed. dataId={}, group={}", dataId, group);
log.info("[{}] [Polling resp] config changed. dataId={}, group={}", dataId, group);
} else if (keyArr.length == 3) {
String tenant = keyArr[2];
updateList.add(GroupKey.getKeyTenant(dataId, group, tenant));
log.info("[polling-resp] config changed. dataId={}, group={}, tenant={}", dataId, group, tenant);
log.info("[Polling resp] config changed. dataId={}, group={}, tenant={}", dataId, group, tenant);
} else {
log.error("[{}] [polling-resp] invalid dataIdAndGroup error {}", dataIdAndGroup);
log.error("[{}] [Polling resp] invalid dataIdAndGroup error {}", dataIdAndGroup);
}
}
}
@ -274,11 +270,11 @@ public class ClientWorker {
}
public boolean isHealthServer() {
return this.isHealthServer;
return this.isHealthServer.get();
}
private void setHealthServer(boolean isHealthServer) {
this.isHealthServer = isHealthServer;
this.isHealthServer.set(isHealthServer);
}
}

Loading…
Cancel
Save