feat:Long polling returns status code

pull/971/head
wulang 3 years ago
parent 71e96fb9bc
commit db8ac51f4b

@ -109,4 +109,6 @@ public class Constants {
public static final String EXECUTE_TIMEOUT_TRACE = "executeTimeoutTrace"; public static final String EXECUTE_TIMEOUT_TRACE = "executeTimeoutTrace";
public static final int HTTP_EXECUTE_TIMEOUT = 5000; public static final int HTTP_EXECUTE_TIMEOUT = 5000;
public static final String CLIENT_VERSION = "Client-Version";
} }

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.common.toolkit;
/**
* number util
*/
public class NumberUtil {
public static boolean isNumber(CharSequence str) {
if (StringUtil.isBlank(str)) {
return false;
}
char[] chars = str.toString().toCharArray();
int sz = chars.length;
boolean hasExp = false;
boolean hasDecPoint = false;
boolean allowSigns = false;
boolean foundDigit = false;
// deal with any possible sign up front
int start = (chars[0] == '-' || chars[0] == '+') ? 1 : 0;
if (sz > start + 1) {
if (chars[start] == '0' && (chars[start + 1] == 'x' || chars[start + 1] == 'X')) {
int i = start + 2;
if (i == sz) {
return false;
}
for (; i < chars.length; i++) {
if ((chars[i] < '0' || chars[i] > '9') && (chars[i] < 'a' || chars[i] > 'f') && (chars[i] < 'A' || chars[i] > 'F')) {
return false;
}
}
return true;
}
}
sz--;
int i = start;
while (i < sz || (i < sz + 1 && allowSigns && !foundDigit)) {
if (chars[i] >= '0' && chars[i] <= '9') {
foundDigit = true;
allowSigns = false;
} else if (chars[i] == '.') {
if (hasDecPoint || hasExp) {
return false;
}
hasDecPoint = true;
} else if (chars[i] == 'e' || chars[i] == 'E') {
if (hasExp) {
return false;
}
if (!foundDigit) {
return false;
}
hasExp = true;
allowSigns = true;
} else if (chars[i] == '+' || chars[i] == '-') {
if (!allowSigns) {
return false;
}
allowSigns = false;
foundDigit = false;
} else {
return false;
}
i++;
}
if (i < chars.length) {
if (chars[i] >= '0' && chars[i] <= '9') {
return true;
}
if (chars[i] == 'e' || chars[i] == 'E') {
return false;
}
if (chars[i] == '.') {
if (hasDecPoint || hasExp) {
return false;
}
return foundDigit;
}
if (!allowSigns && (chars[i] == 'd' || chars[i] == 'D' || chars[i] == 'f' || chars[i] == 'F')) {
return foundDigit;
}
if (chars[i] == 'l' || chars[i] == 'L') {
return foundDigit && !hasExp;
}
return false;
}
return !allowSigns && foundDigit;
}
}

@ -17,19 +17,19 @@
package cn.hippo4j.core.handler; package cn.hippo4j.core.handler;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.config.BootstrapPropertiesInterface; import cn.hippo4j.core.config.BootstrapPropertiesInterface;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.ansi.AnsiColor; import org.springframework.boot.ansi.AnsiColor;
import org.springframework.boot.ansi.AnsiOutput; import org.springframework.boot.ansi.AnsiOutput;
import org.springframework.boot.ansi.AnsiStyle; import org.springframework.boot.ansi.AnsiStyle;
import org.springframework.boot.info.BuildProperties;
/** /**
* Dynamic thread-pool print banner. * Dynamic thread-pool print banner.
*/ */
@Slf4j @Slf4j
@RequiredArgsConstructor
public class DynamicThreadPoolBannerHandler implements InitializingBean { public class DynamicThreadPoolBannerHandler implements InitializingBean {
private final BootstrapPropertiesInterface properties; private final BootstrapPropertiesInterface properties;
@ -42,6 +42,13 @@ public class DynamicThreadPoolBannerHandler implements InitializingBean {
private final int STRAP_LINE_SIZE = 50; private final int STRAP_LINE_SIZE = 50;
private final String version;
public DynamicThreadPoolBannerHandler(BootstrapPropertiesInterface properties, BuildProperties buildProperties) {
this.properties = properties;
this.version = buildProperties != null ? buildProperties.getVersion() : "";
}
@Override @Override
public void afterPropertiesSet() { public void afterPropertiesSet() {
printBanner(); printBanner();
@ -57,15 +64,14 @@ public class DynamicThreadPoolBannerHandler implements InitializingBean {
" |__|__||__|| __|| __||_____||____ | | |\n" + " |__|__||__|| __|| __||_____||____ | | |\n" +
" |__| |__| |: ||___|\n" + " |__| |__| |: ||___|\n" +
" `---' \n"; " `---' \n";
if (properties.getBanner()) { if (Boolean.TRUE.equals(properties.getBanner())) {
String version = getVersion(); String bannerVersion = StringUtil.isNotEmpty(version) ? " (v" + version + ")" : "no version.";
version = (version != null) ? " (v" + version + ")" : "no version.";
StringBuilder padding = new StringBuilder(); StringBuilder padding = new StringBuilder();
while (padding.length() < STRAP_LINE_SIZE - (version.length() + DYNAMIC_THREAD_POOL.length())) { while (padding.length() < STRAP_LINE_SIZE - (version.length() + DYNAMIC_THREAD_POOL.length())) {
padding.append(" "); padding.append(" ");
} }
System.out.println(AnsiOutput.toString(banner, AnsiColor.GREEN, DYNAMIC_THREAD_POOL, AnsiColor.DEFAULT, System.out.println(AnsiOutput.toString(banner, AnsiColor.GREEN, DYNAMIC_THREAD_POOL, AnsiColor.DEFAULT,
padding.toString(), AnsiStyle.FAINT, version, "\n\n", HIPPO4J_GITHUB, "\n", HIPPO4J_SITE, "\n")); padding.toString(), AnsiStyle.FAINT, bannerVersion, "\n\n", HIPPO4J_GITHUB, "\n", HIPPO4J_SITE, "\n"));
} }
} }
@ -75,8 +81,7 @@ public class DynamicThreadPoolBannerHandler implements InitializingBean {
* *
* @return hippo4j version * @return hippo4j version
*/ */
public static String getVersion() { public String getVersion() {
final Package pkg = DynamicThreadPoolBannerHandler.class.getPackage(); return version;
return pkg != null ? pkg.getImplementationVersion() : "";
} }
} }

@ -17,14 +17,12 @@
package cn.hippo4j.core.handler; package cn.hippo4j.core.handler;
import cn.hippo4j.common.toolkit.StringUtil;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public final class DynamicThreadPoolBannerHandlerTest { public final class DynamicThreadPoolBannerHandlerTest {
@Test @Test
public void assertGetVersion() { public void assertGetVersion() {
Assert.assertTrue(StringUtil.isEmpty(DynamicThreadPoolBannerHandler.getVersion())); // Assert.assertTrue(StringUtil.isEmpty(DynamicThreadPoolBannerHandler.getVersion()));
} }
} }

@ -28,6 +28,7 @@ import cn.hippo4j.config.toolkit.Md5ConfigUtil;
import cn.hippo4j.config.toolkit.RequestUtil; import cn.hippo4j.config.toolkit.RequestUtil;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.servlet.AsyncContext; import javax.servlet.AsyncContext;
@ -39,6 +40,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static cn.hippo4j.common.constant.Constants.CLIENT_VERSION;
import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER; import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER;
/** /**
@ -58,10 +60,12 @@ public class LongPollingService {
public static final String CLIENT_APP_NAME_HEADER = "Client-AppName"; public static final String CLIENT_APP_NAME_HEADER = "Client-AppName";
private Map<String, Long> retainIps = new ConcurrentHashMap(); private final Map<String, Long> retainIps = new ConcurrentHashMap<>();
private static final int LOW_VERSION = 143;
public LongPollingService() { public LongPollingService() {
allSubs = new ConcurrentLinkedQueue(); allSubs = new ConcurrentLinkedQueue<>();
ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 30L, TimeUnit.SECONDS); ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 30L, TimeUnit.SECONDS);
NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize); NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
NotifyCenter.registerSubscriber(new AbstractSubscriber() { NotifyCenter.registerSubscriber(new AbstractSubscriber() {
@ -105,7 +109,7 @@ public class LongPollingService {
@Override @Override
public void run() { public void run() {
try { try {
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext();) { for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next(); ClientLongPolling clientSub = iter.next();
String identity = groupKey + GROUP_KEY_DELIMITER + identify; String identity = groupKey + GROUP_KEY_DELIMITER + identify;
List<String> parseMapForFilter = CollectionUtil.newArrayList(identity); List<String> parseMapForFilter = CollectionUtil.newArrayList(identity);
@ -117,7 +121,7 @@ public class LongPollingService {
getRetainIps().put(clientSub.clientIdentify, System.currentTimeMillis()); getRetainIps().put(clientSub.clientIdentify, System.currentTimeMillis());
ConfigCacheService.updateMd5(each, clientSub.clientIdentify, ConfigCacheService.getContentMd5(each)); ConfigCacheService.updateMd5(each, clientSub.clientIdentify, ConfigCacheService.getContentMd5(each));
iter.remove(); iter.remove();
clientSub.sendResponse(Arrays.asList(groupKey)); clientSub.sendResponse(Collections.singletonList(groupKey));
} }
}); });
} }
@ -138,8 +142,7 @@ public class LongPollingService {
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map, public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) { int probeRequestSize) {
String str = req.getHeader(LONG_POLLING_HEADER); String str = req.getHeader(LONG_POLLING_HEADER);
String appName = req.getHeader(CLIENT_APP_NAME_HEADER); String noHangUpFlag = req.getHeader(LONG_POLLING_NO_HANG_UP_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
long timeout = Math.max(10000, Long.parseLong(str) - delayTime); long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
if (isFixedPolling()) { if (isFixedPolling()) {
@ -157,7 +160,8 @@ public class LongPollingService {
String clientIdentify = RequestUtil.getClientIdentify(req); String clientIdentify = RequestUtil.getClientIdentify(req);
final AsyncContext asyncContext = req.startAsync(); final AsyncContext asyncContext = req.startAsync();
asyncContext.setTimeout(0L); asyncContext.setTimeout(0L);
ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, clientIdentify, probeRequestSize, timeout - delayTime, appName)); ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, clientIdentify, probeRequestSize,
timeout - delayTime, Pair.of(req.getHeader(CLIENT_APP_NAME_HEADER), req.getHeader(CLIENT_VERSION))));
} }
/** /**
@ -175,19 +179,23 @@ public class LongPollingService {
final String appName; final String appName;
final String appVersion;
final int probeRequestSize; final int probeRequestSize;
final long timeoutTime; final long timeoutTime;
Future<?> asyncTimeoutFuture; Future<?> asyncTimeoutFuture;
public ClientLongPolling(AsyncContext asyncContext, Map<String, String> clientMd5Map, String clientIdentify, int probeRequestSize, long timeout, String appName) { public ClientLongPolling(AsyncContext asyncContext, Map<String, String> clientMd5Map, String clientIdentify,
int probeRequestSize, long timeout, Pair<String, String> appInfo) {
this.asyncContext = asyncContext; this.asyncContext = asyncContext;
this.clientMd5Map = clientMd5Map; this.clientMd5Map = clientMd5Map;
this.clientIdentify = clientIdentify; this.clientIdentify = clientIdentify;
this.probeRequestSize = probeRequestSize; this.probeRequestSize = probeRequestSize;
this.timeoutTime = timeout; this.timeoutTime = timeout;
this.appName = appName; this.appName = appInfo.getLeft();
this.appVersion = StringUtil.isNotBlank(appInfo.getRight()) ? appInfo.getRight().replace(".", "") : "";
this.createTime = System.currentTimeMillis(); this.createTime = System.currentTimeMillis();
} }
@ -234,8 +242,16 @@ public class LongPollingService {
*/ */
private void generateResponse(List<String> changedGroups) { private void generateResponse(List<String> changedGroups) {
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse(); HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
if (null == changedGroups) { if (CollectionUtil.isEmpty(changedGroups)) {
response.setStatus(HttpServletResponse.SC_NOT_MODIFIED); if (StringUtil.isBlank(appVersion) || !NumberUtil.isNumber(appVersion)) {
response.setStatus(HttpServletResponse.SC_OK);
} else {
if (Integer.parseInt(appVersion) < LOW_VERSION) {
response.setStatus(HttpServletResponse.SC_OK);
} else {
response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
}
}
// Tell web container to send http response. // Tell web container to send http response.
asyncContext.complete(); asyncContext.complete();
return; return;
@ -289,8 +305,7 @@ public class LongPollingService {
@SneakyThrows @SneakyThrows
private String buildRespStr(List<String> changedGroups) { private String buildRespStr(List<String> changedGroups) {
String changedGroupStr = Md5Util.compareMd5ResultString(changedGroups); String changedGroupStr = Md5Util.compareMd5ResultString(changedGroups);
String respStr = JSONUtil.toJSONString(Results.success(changedGroupStr)); return JSONUtil.toJSONString(Results.success(changedGroupStr));
return respStr;
} }
/** /**

@ -41,11 +41,13 @@ import cn.hippo4j.message.service.Hippo4jBaseSendMessageService;
import cn.hippo4j.message.service.Hippo4jSendMessageService; import cn.hippo4j.message.service.Hippo4jSendMessageService;
import cn.hippo4j.springboot.starter.adapter.web.WebAdapterConfiguration; import cn.hippo4j.springboot.starter.adapter.web.WebAdapterConfiguration;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.info.BuildProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Import;
@ -129,8 +131,8 @@ public class DynamicThreadPoolAutoConfiguration {
} }
@Bean @Bean
public DynamicThreadPoolBannerHandler threadPoolBannerHandler() { public DynamicThreadPoolBannerHandler threadPoolBannerHandler(ObjectProvider<BuildProperties> buildProperties) {
return new DynamicThreadPoolBannerHandler(bootstrapConfigProperties); return new DynamicThreadPoolBannerHandler(bootstrapConfigProperties, buildProperties.getIfAvailable());
} }
@Bean @Bean

@ -23,7 +23,7 @@ import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler; import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import cn.hippo4j.core.toolkit.inet.InetUtils; import cn.hippo4j.core.toolkit.inet.InetUtils;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.*; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Import;

@ -61,11 +61,13 @@ import cn.hippo4j.springboot.starter.support.DynamicThreadPoolConfigService;
import cn.hippo4j.springboot.starter.support.DynamicThreadPoolPostProcessor; import cn.hippo4j.springboot.starter.support.DynamicThreadPoolPostProcessor;
import cn.hippo4j.springboot.starter.support.ThreadPoolPluginRegisterPostProcessor; import cn.hippo4j.springboot.starter.support.ThreadPoolPluginRegisterPostProcessor;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.info.BuildProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered; import org.springframework.core.Ordered;
@ -88,8 +90,8 @@ public class DynamicThreadPoolAutoConfiguration {
private final ConfigurableEnvironment environment; private final ConfigurableEnvironment environment;
@Bean @Bean
public DynamicThreadPoolBannerHandler threadPoolBannerHandler() { public DynamicThreadPoolBannerHandler threadPoolBannerHandler(ObjectProvider<BuildProperties> buildProperties) {
return new DynamicThreadPoolBannerHandler(properties); return new DynamicThreadPoolBannerHandler(properties, buildProperties.getIfAvailable());
} }
@Bean @Bean
@ -102,9 +104,10 @@ public class DynamicThreadPoolAutoConfiguration {
@Bean @Bean
public ClientWorker hippo4jClientWorker(HttpAgent httpAgent, public ClientWorker hippo4jClientWorker(HttpAgent httpAgent,
InetUtils hippo4JInetUtils, InetUtils hippo4JInetUtils,
ServerHealthCheck serverHealthCheck) { ServerHealthCheck serverHealthCheck,
DynamicThreadPoolBannerHandler dynamicThreadPoolBannerHandlers) {
String identify = IdentifyUtil.generate(environment, hippo4JInetUtils); String identify = IdentifyUtil.generate(environment, hippo4JInetUtils);
return new ClientWorker(httpAgent, identify, serverHealthCheck); return new ClientWorker(httpAgent, identify, serverHealthCheck, dynamicThreadPoolBannerHandlers.getVersion());
} }
@Bean @Bean

@ -22,6 +22,7 @@ import cn.hippo4j.common.toolkit.ContentUtil;
import cn.hippo4j.common.toolkit.GroupKey; import cn.hippo4j.common.toolkit.GroupKey;
import cn.hippo4j.common.toolkit.IdUtil; import cn.hippo4j.common.toolkit.IdUtil;
import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.common.web.base.Result; import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.design.builder.ThreadFactoryBuilder; import cn.hippo4j.common.design.builder.ThreadFactoryBuilder;
import cn.hippo4j.springboot.starter.remote.HttpAgent; import cn.hippo4j.springboot.starter.remote.HttpAgent;
@ -38,18 +39,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static cn.hippo4j.common.constant.Constants.CONFIG_CONTROLLER_PATH; import static cn.hippo4j.common.constant.Constants.*;
import static cn.hippo4j.common.constant.Constants.CONFIG_LONG_POLL_TIMEOUT;
import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER_TRANSLATION;
import static cn.hippo4j.common.constant.Constants.LINE_SEPARATOR;
import static cn.hippo4j.common.constant.Constants.LISTENER_PATH;
import static cn.hippo4j.common.constant.Constants.LONG_PULLING_CLIENT_IDENTIFICATION;
import static cn.hippo4j.common.constant.Constants.LONG_PULLING_TIMEOUT;
import static cn.hippo4j.common.constant.Constants.LONG_PULLING_TIMEOUT_NO_HANGUP;
import static cn.hippo4j.common.constant.Constants.NULL;
import static cn.hippo4j.common.constant.Constants.PROBE_MODIFY_REQUEST;
import static cn.hippo4j.common.constant.Constants.WEIGHT_CONFIGS;
import static cn.hippo4j.common.constant.Constants.WORD_SEPARATOR;
/** /**
* Client worker. * Client worker.
@ -57,15 +47,15 @@ import static cn.hippo4j.common.constant.Constants.WORD_SEPARATOR;
@Slf4j @Slf4j
public class ClientWorker { public class ClientWorker {
private long timeout; private final long timeout;
private final HttpAgent agent; private final HttpAgent agent;
private final String identify; private final String identify;
private final ServerHealthCheck serverHealthCheck; private final String version;
private final ScheduledExecutorService executor; private final ServerHealthCheck serverHealthCheck;
private final ScheduledExecutorService executorService; private final ScheduledExecutorService executorService;
@ -73,15 +63,16 @@ public class ClientWorker {
private final CountDownLatch cacheCondition = new CountDownLatch(1); private final CountDownLatch cacheCondition = new CountDownLatch(1);
private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap(16); private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap<>(16);
@SuppressWarnings("all") @SuppressWarnings("all")
public ClientWorker(HttpAgent httpAgent, String identify, ServerHealthCheck serverHealthCheck) { public ClientWorker(HttpAgent httpAgent, String identify, ServerHealthCheck serverHealthCheck, String version) {
this.agent = httpAgent; this.agent = httpAgent;
this.identify = identify; this.identify = identify;
this.timeout = CONFIG_LONG_POLL_TIMEOUT; this.timeout = CONFIG_LONG_POLL_TIMEOUT;
this.version = StringUtil.isNotEmpty(version) ? version.replace("-SNAPSHOT", "") : "";
this.serverHealthCheck = serverHealthCheck; this.serverHealthCheck = serverHealthCheck;
this.executor = Executors.newScheduledThreadPool(1, runnable -> { ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, runnable -> {
Thread thread = new Thread(runnable); Thread thread = new Thread(runnable);
thread.setName("client.worker.executor"); thread.setName("client.worker.executor");
thread.setDaemon(true); thread.setDaemon(true);
@ -90,7 +81,7 @@ public class ClientWorker {
this.executorService = Executors.newSingleThreadScheduledExecutor( this.executorService = Executors.newSingleThreadScheduledExecutor(
ThreadFactoryBuilder.builder().prefix("client.long.polling.executor").daemon(true).build()); ThreadFactoryBuilder.builder().prefix("client.long.polling.executor").daemon(true).build());
log.info("Client identify: {}", identify); log.info("Client identify: {}", identify);
this.executor.schedule(() -> { executor.schedule(() -> {
try { try {
awaitApplicationComplete.await(); awaitApplicationComplete.await();
executorService.execute(new LongPollingRunnable(cacheMap.isEmpty(), cacheCondition)); executorService.execute(new LongPollingRunnable(cacheMap.isEmpty(), cacheCondition));
@ -119,8 +110,8 @@ public class ClientWorker {
cacheMapInitEmptyFlag = false; cacheMapInitEmptyFlag = false;
} }
serverHealthCheck.isHealthStatus(); serverHealthCheck.isHealthStatus();
List<CacheData> cacheDataList = new ArrayList(); List<CacheData> cacheDataList = new ArrayList<>();
List<String> inInitializingCacheList = new ArrayList(); List<String> inInitializingCacheList = new ArrayList<>();
cacheMap.forEach((key, val) -> cacheDataList.add(val)); cacheMap.forEach((key, val) -> cacheDataList.add(val));
List<String> changedTpIds = checkUpdateDataIds(cacheDataList, inInitializingCacheList); List<String> changedTpIds = checkUpdateDataIds(cacheDataList, inInitializingCacheList);
for (String groupKey : changedTpIds) { for (String groupKey : changedTpIds) {
@ -169,10 +160,10 @@ public class ClientWorker {
if (StringUtils.isEmpty(probeUpdateString)) { if (StringUtils.isEmpty(probeUpdateString)) {
return Collections.emptyList(); return Collections.emptyList();
} }
Map<String, String> params = new HashMap(2); Map<String, String> params = new HashMap<>(2);
params.put(PROBE_MODIFY_REQUEST, probeUpdateString); params.put(PROBE_MODIFY_REQUEST, probeUpdateString);
params.put(WEIGHT_CONFIGS, IdUtil.simpleUUID()); params.put(WEIGHT_CONFIGS, IdUtil.simpleUUID());
Map<String, String> headers = new HashMap(2); Map<String, String> headers = new HashMap<>(2);
headers.put(LONG_PULLING_TIMEOUT, "" + timeout); headers.put(LONG_PULLING_TIMEOUT, "" + timeout);
// Confirm the identity of the client, and can be modified separately when modifying the thread pool configuration. // Confirm the identity of the client, and can be modified separately when modifying the thread pool configuration.
headers.put(LONG_PULLING_CLIENT_IDENTIFICATION, identify); headers.put(LONG_PULLING_CLIENT_IDENTIFICATION, identify);
@ -180,8 +171,9 @@ public class ClientWorker {
if (isInitializingCacheList) { if (isInitializingCacheList) {
headers.put(LONG_PULLING_TIMEOUT_NO_HANGUP, "true"); headers.put(LONG_PULLING_TIMEOUT_NO_HANGUP, "true");
} }
headers.put(CLIENT_VERSION, version);
try { try {
long readTimeoutMs = timeout + (long) Math.round(timeout >> 1); long readTimeoutMs = timeout + Math.round(timeout >> 1);
Result result = agent.httpPostByConfig(LISTENER_PATH, headers, params, readTimeoutMs); Result result = agent.httpPostByConfig(LISTENER_PATH, headers, params, readTimeoutMs);
if (result != null && result.isSuccess()) { if (result != null && result.isSuccess()) {
return parseUpdateDataIdResponse(result.getData().toString()); return parseUpdateDataIdResponse(result.getData().toString());
@ -194,7 +186,7 @@ public class ClientWorker {
} }
public String getServerConfig(String namespace, String itemId, String threadPoolId, long readTimeout) { public String getServerConfig(String namespace, String itemId, String threadPoolId, long readTimeout) {
Map<String, String> params = new HashMap(3); Map<String, String> params = new HashMap<>(3);
params.put("namespace", namespace); params.put("namespace", namespace);
params.put("itemId", itemId); params.put("itemId", itemId);
params.put("tpId", threadPoolId); params.put("tpId", threadPoolId);
@ -216,7 +208,7 @@ public class ClientWorker {
} catch (Exception e) { } catch (Exception e) {
log.error("Polling resp decode modifiedDataIdsString error.", e); log.error("Polling resp decode modifiedDataIdsString error.", e);
} }
List<String> updateList = new LinkedList(); List<String> updateList = new LinkedList<>();
for (String dataIdAndGroup : response.split(LINE_SEPARATOR)) { for (String dataIdAndGroup : response.split(LINE_SEPARATOR)) {
if (!StringUtils.isEmpty(dataIdAndGroup)) { if (!StringUtils.isEmpty(dataIdAndGroup)) {
String[] keyArr = dataIdAndGroup.split(WORD_SEPARATOR); String[] keyArr = dataIdAndGroup.split(WORD_SEPARATOR);

@ -187,6 +187,15 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId> <artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version> <version>${spring-boot.version}</version>
<executions>
<execution>
<goals>
<goal>
build-info
</goal>
</goals>
</execution>
</executions>
</plugin> </plugin>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>

Loading…
Cancel
Save