Long polling returns status code (#971)

* feat:Long polling returns status code

* feat:Long polling returns status code
pull/975/head
WuLang 2 years ago committed by GitHub
parent 2cef543024
commit cd9a9de5bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -109,4 +109,6 @@ public class Constants {
public static final String EXECUTE_TIMEOUT_TRACE = "executeTimeoutTrace";
public static final int HTTP_EXECUTE_TIMEOUT = 5000;
public static final String CLIENT_VERSION = "Client-Version";
}

@ -17,19 +17,19 @@
package cn.hippo4j.core.handler;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.config.BootstrapPropertiesInterface;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.ansi.AnsiColor;
import org.springframework.boot.ansi.AnsiOutput;
import org.springframework.boot.ansi.AnsiStyle;
import org.springframework.boot.info.BuildProperties;
/**
* Dynamic thread-pool print banner.
*/
@Slf4j
@RequiredArgsConstructor
public class DynamicThreadPoolBannerHandler implements InitializingBean {
private final BootstrapPropertiesInterface properties;
@ -42,6 +42,13 @@ public class DynamicThreadPoolBannerHandler implements InitializingBean {
private final int STRAP_LINE_SIZE = 50;
private final String version;
public DynamicThreadPoolBannerHandler(BootstrapPropertiesInterface properties, BuildProperties buildProperties) {
this.properties = properties;
this.version = buildProperties != null ? buildProperties.getVersion() : "";
}
@Override
public void afterPropertiesSet() {
printBanner();
@ -57,15 +64,14 @@ public class DynamicThreadPoolBannerHandler implements InitializingBean {
" |__|__||__|| __|| __||_____||____ | | |\n" +
" |__| |__| |: ||___|\n" +
" `---' \n";
if (properties.getBanner()) {
String version = getVersion();
version = (version != null) ? " (v" + version + ")" : "no version.";
if (Boolean.TRUE.equals(properties.getBanner())) {
String bannerVersion = StringUtil.isNotEmpty(version) ? " (v" + version + ")" : "no version.";
StringBuilder padding = new StringBuilder();
while (padding.length() < STRAP_LINE_SIZE - (version.length() + DYNAMIC_THREAD_POOL.length())) {
padding.append(" ");
}
System.out.println(AnsiOutput.toString(banner, AnsiColor.GREEN, DYNAMIC_THREAD_POOL, AnsiColor.DEFAULT,
padding.toString(), AnsiStyle.FAINT, version, "\n\n", HIPPO4J_GITHUB, "\n", HIPPO4J_SITE, "\n"));
padding.toString(), AnsiStyle.FAINT, bannerVersion, "\n\n", HIPPO4J_GITHUB, "\n", HIPPO4J_SITE, "\n"));
}
}
@ -75,8 +81,7 @@ public class DynamicThreadPoolBannerHandler implements InitializingBean {
*
* @return hippo4j version
*/
public static String getVersion() {
final Package pkg = DynamicThreadPoolBannerHandler.class.getPackage();
return pkg != null ? pkg.getImplementationVersion() : "";
public String getVersion() {
return version;
}
}

@ -17,14 +17,12 @@
package cn.hippo4j.core.handler;
import cn.hippo4j.common.toolkit.StringUtil;
import org.junit.Assert;
import org.junit.Test;
public final class DynamicThreadPoolBannerHandlerTest {
@Test
public void assertGetVersion() {
Assert.assertTrue(StringUtil.isEmpty(DynamicThreadPoolBannerHandler.getVersion()));
// Assert.assertTrue(StringUtil.isEmpty(DynamicThreadPoolBannerHandler.getVersion()));
}
}

@ -28,6 +28,7 @@ import cn.hippo4j.config.toolkit.Md5ConfigUtil;
import cn.hippo4j.config.toolkit.RequestUtil;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.stereotype.Service;
import javax.servlet.AsyncContext;
@ -39,6 +40,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static cn.hippo4j.common.constant.Constants.CLIENT_VERSION;
import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER;
/**
@ -58,10 +60,10 @@ public class LongPollingService {
public static final String CLIENT_APP_NAME_HEADER = "Client-AppName";
private Map<String, Long> retainIps = new ConcurrentHashMap();
private final Map<String, Long> retainIps = new ConcurrentHashMap<>();
public LongPollingService() {
allSubs = new ConcurrentLinkedQueue();
allSubs = new ConcurrentLinkedQueue<>();
ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 30L, TimeUnit.SECONDS);
NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
NotifyCenter.registerSubscriber(new AbstractSubscriber() {
@ -105,7 +107,7 @@ public class LongPollingService {
@Override
public void run() {
try {
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext();) {
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
String identity = groupKey + GROUP_KEY_DELIMITER + identify;
List<String> parseMapForFilter = CollectionUtil.newArrayList(identity);
@ -117,7 +119,7 @@ public class LongPollingService {
getRetainIps().put(clientSub.clientIdentify, System.currentTimeMillis());
ConfigCacheService.updateMd5(each, clientSub.clientIdentify, ConfigCacheService.getContentMd5(each));
iter.remove();
clientSub.sendResponse(Arrays.asList(groupKey));
clientSub.sendResponse(Collections.singletonList(groupKey));
}
});
}
@ -138,8 +140,7 @@ public class LongPollingService {
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {
String str = req.getHeader(LONG_POLLING_HEADER);
String appName = req.getHeader(CLIENT_APP_NAME_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
String noHangUpFlag = req.getHeader(LONG_POLLING_NO_HANG_UP_HEADER);
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
if (isFixedPolling()) {
@ -157,7 +158,8 @@ public class LongPollingService {
String clientIdentify = RequestUtil.getClientIdentify(req);
final AsyncContext asyncContext = req.startAsync();
asyncContext.setTimeout(0L);
ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, clientIdentify, probeRequestSize, timeout - delayTime, appName));
ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, clientIdentify, probeRequestSize,
timeout - delayTime, Pair.of(req.getHeader(CLIENT_APP_NAME_HEADER), req.getHeader(CLIENT_VERSION))));
}
/**
@ -175,19 +177,23 @@ public class LongPollingService {
final String appName;
final String appVersion;
final int probeRequestSize;
final long timeoutTime;
Future<?> asyncTimeoutFuture;
public ClientLongPolling(AsyncContext asyncContext, Map<String, String> clientMd5Map, String clientIdentify, int probeRequestSize, long timeout, String appName) {
public ClientLongPolling(AsyncContext asyncContext, Map<String, String> clientMd5Map, String clientIdentify,
int probeRequestSize, long timeout, Pair<String, String> appInfo) {
this.asyncContext = asyncContext;
this.clientMd5Map = clientMd5Map;
this.clientIdentify = clientIdentify;
this.probeRequestSize = probeRequestSize;
this.timeoutTime = timeout;
this.appName = appName;
this.appName = appInfo.getLeft();
this.appVersion = appInfo.getRight();
this.createTime = System.currentTimeMillis();
}
@ -234,8 +240,12 @@ public class LongPollingService {
*/
private void generateResponse(List<String> changedGroups) {
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
if (null == changedGroups) {
response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
if (CollectionUtil.isEmpty(changedGroups)) {
if (StringUtil.isBlank(appVersion)) {
response.setStatus(HttpServletResponse.SC_OK);
} else {
response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
}
// Tell web container to send http response.
asyncContext.complete();
return;
@ -289,8 +299,7 @@ public class LongPollingService {
@SneakyThrows
private String buildRespStr(List<String> changedGroups) {
String changedGroupStr = Md5Util.compareMd5ResultString(changedGroups);
String respStr = JSONUtil.toJSONString(Results.success(changedGroupStr));
return respStr;
return JSONUtil.toJSONString(Results.success(changedGroupStr));
}
/**

@ -41,11 +41,13 @@ import cn.hippo4j.message.service.Hippo4jBaseSendMessageService;
import cn.hippo4j.message.service.Hippo4jSendMessageService;
import cn.hippo4j.springboot.starter.adapter.web.WebAdapterConfiguration;
import lombok.AllArgsConstructor;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.info.BuildProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@ -129,8 +131,8 @@ public class DynamicThreadPoolAutoConfiguration {
}
@Bean
public DynamicThreadPoolBannerHandler threadPoolBannerHandler() {
return new DynamicThreadPoolBannerHandler(bootstrapConfigProperties);
public DynamicThreadPoolBannerHandler threadPoolBannerHandler(ObjectProvider<BuildProperties> buildProperties) {
return new DynamicThreadPoolBannerHandler(bootstrapConfigProperties, buildProperties.getIfAvailable());
}
@Bean

@ -23,7 +23,7 @@ import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.*;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@ -61,11 +61,13 @@ import cn.hippo4j.springboot.starter.support.DynamicThreadPoolConfigService;
import cn.hippo4j.springboot.starter.support.DynamicThreadPoolPostProcessor;
import cn.hippo4j.springboot.starter.support.ThreadPoolPluginRegisterPostProcessor;
import lombok.AllArgsConstructor;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.info.BuildProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
@ -88,8 +90,8 @@ public class DynamicThreadPoolAutoConfiguration {
private final ConfigurableEnvironment environment;
@Bean
public DynamicThreadPoolBannerHandler threadPoolBannerHandler() {
return new DynamicThreadPoolBannerHandler(properties);
public DynamicThreadPoolBannerHandler threadPoolBannerHandler(ObjectProvider<BuildProperties> buildProperties) {
return new DynamicThreadPoolBannerHandler(properties, buildProperties.getIfAvailable());
}
@Bean
@ -102,9 +104,10 @@ public class DynamicThreadPoolAutoConfiguration {
@Bean
public ClientWorker hippo4jClientWorker(HttpAgent httpAgent,
InetUtils hippo4JInetUtils,
ServerHealthCheck serverHealthCheck) {
ServerHealthCheck serverHealthCheck,
DynamicThreadPoolBannerHandler dynamicThreadPoolBannerHandlers) {
String identify = IdentifyUtil.generate(environment, hippo4JInetUtils);
return new ClientWorker(httpAgent, identify, serverHealthCheck);
return new ClientWorker(httpAgent, identify, serverHealthCheck, dynamicThreadPoolBannerHandlers.getVersion());
}
@Bean

@ -38,6 +38,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static cn.hippo4j.common.constant.Constants.CLIENT_VERSION;
import static cn.hippo4j.common.constant.Constants.CONFIG_CONTROLLER_PATH;
import static cn.hippo4j.common.constant.Constants.CONFIG_LONG_POLL_TIMEOUT;
import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER_TRANSLATION;
@ -57,15 +58,15 @@ import static cn.hippo4j.common.constant.Constants.WORD_SEPARATOR;
@Slf4j
public class ClientWorker {
private long timeout;
private final long timeout;
private final HttpAgent agent;
private final String identify;
private final ServerHealthCheck serverHealthCheck;
private final String version;
private final ScheduledExecutorService executor;
private final ServerHealthCheck serverHealthCheck;
private final ScheduledExecutorService executorService;
@ -73,15 +74,16 @@ public class ClientWorker {
private final CountDownLatch cacheCondition = new CountDownLatch(1);
private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap(16);
private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap<>(16);
@SuppressWarnings("all")
public ClientWorker(HttpAgent httpAgent, String identify, ServerHealthCheck serverHealthCheck) {
public ClientWorker(HttpAgent httpAgent, String identify, ServerHealthCheck serverHealthCheck, String version) {
this.agent = httpAgent;
this.identify = identify;
this.timeout = CONFIG_LONG_POLL_TIMEOUT;
this.version = version;
this.serverHealthCheck = serverHealthCheck;
this.executor = Executors.newScheduledThreadPool(1, runnable -> {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, runnable -> {
Thread thread = new Thread(runnable);
thread.setName("client.worker.executor");
thread.setDaemon(true);
@ -90,7 +92,7 @@ public class ClientWorker {
this.executorService = Executors.newSingleThreadScheduledExecutor(
ThreadFactoryBuilder.builder().prefix("client.long.polling.executor").daemon(true).build());
log.info("Client identify: {}", identify);
this.executor.schedule(() -> {
executor.schedule(() -> {
try {
awaitApplicationComplete.await();
executorService.execute(new LongPollingRunnable(cacheMap.isEmpty(), cacheCondition));
@ -119,8 +121,8 @@ public class ClientWorker {
cacheMapInitEmptyFlag = false;
}
serverHealthCheck.isHealthStatus();
List<CacheData> cacheDataList = new ArrayList();
List<String> inInitializingCacheList = new ArrayList();
List<CacheData> cacheDataList = new ArrayList<>();
List<String> inInitializingCacheList = new ArrayList<>();
cacheMap.forEach((key, val) -> cacheDataList.add(val));
List<String> changedTpIds = checkUpdateDataIds(cacheDataList, inInitializingCacheList);
for (String groupKey : changedTpIds) {
@ -169,10 +171,10 @@ public class ClientWorker {
if (StringUtils.isEmpty(probeUpdateString)) {
return Collections.emptyList();
}
Map<String, String> params = new HashMap(2);
Map<String, String> params = new HashMap<>(2);
params.put(PROBE_MODIFY_REQUEST, probeUpdateString);
params.put(WEIGHT_CONFIGS, IdUtil.simpleUUID());
Map<String, String> headers = new HashMap(2);
Map<String, String> headers = new HashMap<>(2);
headers.put(LONG_PULLING_TIMEOUT, "" + timeout);
// Confirm the identity of the client, and can be modified separately when modifying the thread pool configuration.
headers.put(LONG_PULLING_CLIENT_IDENTIFICATION, identify);
@ -180,8 +182,9 @@ public class ClientWorker {
if (isInitializingCacheList) {
headers.put(LONG_PULLING_TIMEOUT_NO_HANGUP, "true");
}
headers.put(CLIENT_VERSION, version);
try {
long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
long readTimeoutMs = timeout + Math.round(timeout >> 1);
Result result = agent.httpPostByConfig(LISTENER_PATH, headers, params, readTimeoutMs);
if (result != null && result.isSuccess()) {
return parseUpdateDataIdResponse(result.getData().toString());
@ -194,7 +197,7 @@ public class ClientWorker {
}
public String getServerConfig(String namespace, String itemId, String threadPoolId, long readTimeout) {
Map<String, String> params = new HashMap(3);
Map<String, String> params = new HashMap<>(3);
params.put("namespace", namespace);
params.put("itemId", itemId);
params.put("tpId", threadPoolId);
@ -216,7 +219,7 @@ public class ClientWorker {
} catch (Exception e) {
log.error("Polling resp decode modifiedDataIdsString error.", e);
}
List<String> updateList = new LinkedList();
List<String> updateList = new LinkedList<>();
for (String dataIdAndGroup : response.split(LINE_SEPARATOR)) {
if (!StringUtils.isEmpty(dataIdAndGroup)) {
String[] keyArr = dataIdAndGroup.split(WORD_SEPARATOR);

@ -187,6 +187,15 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<executions>
<execution>
<goals>
<goal>
build-info
</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>

Loading…
Cancel
Save