From cd9a9de5bf9d7e5b8d6de49531e3a5307fda8b0f Mon Sep 17 00:00:00 2001 From: WuLang <48200100+wulangcode@users.noreply.github.com> Date: Sat, 12 Nov 2022 13:33:32 +0800 Subject: [PATCH] Long polling returns status code (#971) * feat:Long polling returns status code * feat:Long polling returns status code --- .../cn/hippo4j/common/constant/Constants.java | 2 ++ .../DynamicThreadPoolBannerHandler.java | 23 +++++++----- .../DynamicThreadPoolBannerHandlerTest.java | 4 +-- .../config/service/LongPollingService.java | 35 ++++++++++++------- .../DynamicThreadPoolAutoConfiguration.java | 6 ++-- .../adapter/web/WebAdapterConfiguration.java | 2 +- .../DynamicThreadPoolAutoConfiguration.java | 11 +++--- .../springboot/starter/core/ClientWorker.java | 31 ++++++++-------- pom.xml | 9 +++++ 9 files changed, 77 insertions(+), 46 deletions(-) diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java b/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java index 8c225e33..161dad31 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java @@ -109,4 +109,6 @@ public class Constants { public static final String EXECUTE_TIMEOUT_TRACE = "executeTimeoutTrace"; public static final int HTTP_EXECUTE_TIMEOUT = 5000; + + public static final String CLIENT_VERSION = "Client-Version"; } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/handler/DynamicThreadPoolBannerHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/handler/DynamicThreadPoolBannerHandler.java index dcda065d..0bab7e70 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/handler/DynamicThreadPoolBannerHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/handler/DynamicThreadPoolBannerHandler.java @@ -17,19 +17,19 @@ package cn.hippo4j.core.handler; +import cn.hippo4j.common.toolkit.StringUtil; import cn.hippo4j.core.config.BootstrapPropertiesInterface; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; import org.springframework.boot.ansi.AnsiColor; import org.springframework.boot.ansi.AnsiOutput; import org.springframework.boot.ansi.AnsiStyle; +import org.springframework.boot.info.BuildProperties; /** * Dynamic thread-pool print banner. */ @Slf4j -@RequiredArgsConstructor public class DynamicThreadPoolBannerHandler implements InitializingBean { private final BootstrapPropertiesInterface properties; @@ -42,6 +42,13 @@ public class DynamicThreadPoolBannerHandler implements InitializingBean { private final int STRAP_LINE_SIZE = 50; + private final String version; + + public DynamicThreadPoolBannerHandler(BootstrapPropertiesInterface properties, BuildProperties buildProperties) { + this.properties = properties; + this.version = buildProperties != null ? buildProperties.getVersion() : ""; + } + @Override public void afterPropertiesSet() { printBanner(); @@ -57,15 +64,14 @@ public class DynamicThreadPoolBannerHandler implements InitializingBean { " |__|__||__|| __|| __||_____||____ | | |\n" + " |__| |__| |: ||___|\n" + " `---' \n"; - if (properties.getBanner()) { - String version = getVersion(); - version = (version != null) ? " (v" + version + ")" : "no version."; + if (Boolean.TRUE.equals(properties.getBanner())) { + String bannerVersion = StringUtil.isNotEmpty(version) ? " (v" + version + ")" : "no version."; StringBuilder padding = new StringBuilder(); while (padding.length() < STRAP_LINE_SIZE - (version.length() + DYNAMIC_THREAD_POOL.length())) { padding.append(" "); } System.out.println(AnsiOutput.toString(banner, AnsiColor.GREEN, DYNAMIC_THREAD_POOL, AnsiColor.DEFAULT, - padding.toString(), AnsiStyle.FAINT, version, "\n\n", HIPPO4J_GITHUB, "\n", HIPPO4J_SITE, "\n")); + padding.toString(), AnsiStyle.FAINT, bannerVersion, "\n\n", HIPPO4J_GITHUB, "\n", HIPPO4J_SITE, "\n")); } } @@ -75,8 +81,7 @@ public class DynamicThreadPoolBannerHandler implements InitializingBean { * * @return hippo4j version */ - public static String getVersion() { - final Package pkg = DynamicThreadPoolBannerHandler.class.getPackage(); - return pkg != null ? pkg.getImplementationVersion() : ""; + public String getVersion() { + return version; } } diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/handler/DynamicThreadPoolBannerHandlerTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/handler/DynamicThreadPoolBannerHandlerTest.java index 78357d1d..824faf4c 100644 --- a/hippo4j-core/src/test/java/cn/hippo4j/core/handler/DynamicThreadPoolBannerHandlerTest.java +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/handler/DynamicThreadPoolBannerHandlerTest.java @@ -17,14 +17,12 @@ package cn.hippo4j.core.handler; -import cn.hippo4j.common.toolkit.StringUtil; -import org.junit.Assert; import org.junit.Test; public final class DynamicThreadPoolBannerHandlerTest { @Test public void assertGetVersion() { - Assert.assertTrue(StringUtil.isEmpty(DynamicThreadPoolBannerHandler.getVersion())); + // Assert.assertTrue(StringUtil.isEmpty(DynamicThreadPoolBannerHandler.getVersion())); } } diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/service/LongPollingService.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/service/LongPollingService.java index a485048c..9a5fe574 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/service/LongPollingService.java +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/service/LongPollingService.java @@ -28,6 +28,7 @@ import cn.hippo4j.config.toolkit.Md5ConfigUtil; import cn.hippo4j.config.toolkit.RequestUtil; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; import org.springframework.stereotype.Service; import javax.servlet.AsyncContext; @@ -39,6 +40,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import static cn.hippo4j.common.constant.Constants.CLIENT_VERSION; import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER; /** @@ -58,10 +60,10 @@ public class LongPollingService { public static final String CLIENT_APP_NAME_HEADER = "Client-AppName"; - private Map retainIps = new ConcurrentHashMap(); + private final Map retainIps = new ConcurrentHashMap<>(); public LongPollingService() { - allSubs = new ConcurrentLinkedQueue(); + allSubs = new ConcurrentLinkedQueue<>(); ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 30L, TimeUnit.SECONDS); NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize); NotifyCenter.registerSubscriber(new AbstractSubscriber() { @@ -105,7 +107,7 @@ public class LongPollingService { @Override public void run() { try { - for (Iterator iter = allSubs.iterator(); iter.hasNext();) { + for (Iterator iter = allSubs.iterator(); iter.hasNext(); ) { ClientLongPolling clientSub = iter.next(); String identity = groupKey + GROUP_KEY_DELIMITER + identify; List parseMapForFilter = CollectionUtil.newArrayList(identity); @@ -117,7 +119,7 @@ public class LongPollingService { getRetainIps().put(clientSub.clientIdentify, System.currentTimeMillis()); ConfigCacheService.updateMd5(each, clientSub.clientIdentify, ConfigCacheService.getContentMd5(each)); iter.remove(); - clientSub.sendResponse(Arrays.asList(groupKey)); + clientSub.sendResponse(Collections.singletonList(groupKey)); } }); } @@ -138,8 +140,7 @@ public class LongPollingService { public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map clientMd5Map, int probeRequestSize) { String str = req.getHeader(LONG_POLLING_HEADER); - String appName = req.getHeader(CLIENT_APP_NAME_HEADER); - String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER); + String noHangUpFlag = req.getHeader(LONG_POLLING_NO_HANG_UP_HEADER); int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); long timeout = Math.max(10000, Long.parseLong(str) - delayTime); if (isFixedPolling()) { @@ -157,7 +158,8 @@ public class LongPollingService { String clientIdentify = RequestUtil.getClientIdentify(req); final AsyncContext asyncContext = req.startAsync(); asyncContext.setTimeout(0L); - ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, clientIdentify, probeRequestSize, timeout - delayTime, appName)); + ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, clientIdentify, probeRequestSize, + timeout - delayTime, Pair.of(req.getHeader(CLIENT_APP_NAME_HEADER), req.getHeader(CLIENT_VERSION)))); } /** @@ -175,19 +177,23 @@ public class LongPollingService { final String appName; + final String appVersion; + final int probeRequestSize; final long timeoutTime; Future asyncTimeoutFuture; - public ClientLongPolling(AsyncContext asyncContext, Map clientMd5Map, String clientIdentify, int probeRequestSize, long timeout, String appName) { + public ClientLongPolling(AsyncContext asyncContext, Map clientMd5Map, String clientIdentify, + int probeRequestSize, long timeout, Pair appInfo) { this.asyncContext = asyncContext; this.clientMd5Map = clientMd5Map; this.clientIdentify = clientIdentify; this.probeRequestSize = probeRequestSize; this.timeoutTime = timeout; - this.appName = appName; + this.appName = appInfo.getLeft(); + this.appVersion = appInfo.getRight(); this.createTime = System.currentTimeMillis(); } @@ -234,8 +240,12 @@ public class LongPollingService { */ private void generateResponse(List changedGroups) { HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse(); - if (null == changedGroups) { - response.setStatus(HttpServletResponse.SC_NOT_MODIFIED); + if (CollectionUtil.isEmpty(changedGroups)) { + if (StringUtil.isBlank(appVersion)) { + response.setStatus(HttpServletResponse.SC_OK); + } else { + response.setStatus(HttpServletResponse.SC_NOT_MODIFIED); + } // Tell web container to send http response. asyncContext.complete(); return; @@ -289,8 +299,7 @@ public class LongPollingService { @SneakyThrows private String buildRespStr(List changedGroups) { String changedGroupStr = Md5Util.compareMd5ResultString(changedGroups); - String respStr = JSONUtil.toJSONString(Results.success(changedGroupStr)); - return respStr; + return JSONUtil.toJSONString(Results.success(changedGroupStr)); } /** diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java index 6dcf8475..51edf16c 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -41,11 +41,13 @@ import cn.hippo4j.message.service.Hippo4jBaseSendMessageService; import cn.hippo4j.message.service.Hippo4jSendMessageService; import cn.hippo4j.springboot.starter.adapter.web.WebAdapterConfiguration; import lombok.AllArgsConstructor; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.info.BuildProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @@ -129,8 +131,8 @@ public class DynamicThreadPoolAutoConfiguration { } @Bean - public DynamicThreadPoolBannerHandler threadPoolBannerHandler() { - return new DynamicThreadPoolBannerHandler(bootstrapConfigProperties); + public DynamicThreadPoolBannerHandler threadPoolBannerHandler(ObjectProvider buildProperties) { + return new DynamicThreadPoolBannerHandler(bootstrapConfigProperties, buildProperties.getIfAvailable()); } @Bean diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-web/src/main/java/cn/hippo4j/springboot/starter/adapter/web/WebAdapterConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-web/src/main/java/cn/hippo4j/springboot/starter/adapter/web/WebAdapterConfiguration.java index a638564f..a0c9d396 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-web/src/main/java/cn/hippo4j/springboot/starter/adapter/web/WebAdapterConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-web/src/main/java/cn/hippo4j/springboot/starter/adapter/web/WebAdapterConfiguration.java @@ -23,7 +23,7 @@ import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler; import cn.hippo4j.core.toolkit.inet.InetUtils; import lombok.RequiredArgsConstructor; -import org.springframework.boot.autoconfigure.condition.*; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java index 5c1dee41..8614c4fc 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -61,11 +61,13 @@ import cn.hippo4j.springboot.starter.support.DynamicThreadPoolConfigService; import cn.hippo4j.springboot.starter.support.DynamicThreadPoolPostProcessor; import cn.hippo4j.springboot.starter.support.ThreadPoolPluginRegisterPostProcessor; import lombok.AllArgsConstructor; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.info.BuildProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.Ordered; @@ -88,8 +90,8 @@ public class DynamicThreadPoolAutoConfiguration { private final ConfigurableEnvironment environment; @Bean - public DynamicThreadPoolBannerHandler threadPoolBannerHandler() { - return new DynamicThreadPoolBannerHandler(properties); + public DynamicThreadPoolBannerHandler threadPoolBannerHandler(ObjectProvider buildProperties) { + return new DynamicThreadPoolBannerHandler(properties, buildProperties.getIfAvailable()); } @Bean @@ -102,9 +104,10 @@ public class DynamicThreadPoolAutoConfiguration { @Bean public ClientWorker hippo4jClientWorker(HttpAgent httpAgent, InetUtils hippo4JInetUtils, - ServerHealthCheck serverHealthCheck) { + ServerHealthCheck serverHealthCheck, + DynamicThreadPoolBannerHandler dynamicThreadPoolBannerHandlers) { String identify = IdentifyUtil.generate(environment, hippo4JInetUtils); - return new ClientWorker(httpAgent, identify, serverHealthCheck); + return new ClientWorker(httpAgent, identify, serverHealthCheck, dynamicThreadPoolBannerHandlers.getVersion()); } @Bean diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java index 60506c44..570077f7 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java @@ -38,6 +38,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static cn.hippo4j.common.constant.Constants.CLIENT_VERSION; import static cn.hippo4j.common.constant.Constants.CONFIG_CONTROLLER_PATH; import static cn.hippo4j.common.constant.Constants.CONFIG_LONG_POLL_TIMEOUT; import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER_TRANSLATION; @@ -57,15 +58,15 @@ import static cn.hippo4j.common.constant.Constants.WORD_SEPARATOR; @Slf4j public class ClientWorker { - private long timeout; + private final long timeout; private final HttpAgent agent; private final String identify; - private final ServerHealthCheck serverHealthCheck; + private final String version; - private final ScheduledExecutorService executor; + private final ServerHealthCheck serverHealthCheck; private final ScheduledExecutorService executorService; @@ -73,15 +74,16 @@ public class ClientWorker { private final CountDownLatch cacheCondition = new CountDownLatch(1); - private final ConcurrentHashMap cacheMap = new ConcurrentHashMap(16); + private final ConcurrentHashMap cacheMap = new ConcurrentHashMap<>(16); @SuppressWarnings("all") - public ClientWorker(HttpAgent httpAgent, String identify, ServerHealthCheck serverHealthCheck) { + public ClientWorker(HttpAgent httpAgent, String identify, ServerHealthCheck serverHealthCheck, String version) { this.agent = httpAgent; this.identify = identify; this.timeout = CONFIG_LONG_POLL_TIMEOUT; + this.version = version; this.serverHealthCheck = serverHealthCheck; - this.executor = Executors.newScheduledThreadPool(1, runnable -> { + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, runnable -> { Thread thread = new Thread(runnable); thread.setName("client.worker.executor"); thread.setDaemon(true); @@ -90,7 +92,7 @@ public class ClientWorker { this.executorService = Executors.newSingleThreadScheduledExecutor( ThreadFactoryBuilder.builder().prefix("client.long.polling.executor").daemon(true).build()); log.info("Client identify: {}", identify); - this.executor.schedule(() -> { + executor.schedule(() -> { try { awaitApplicationComplete.await(); executorService.execute(new LongPollingRunnable(cacheMap.isEmpty(), cacheCondition)); @@ -119,8 +121,8 @@ public class ClientWorker { cacheMapInitEmptyFlag = false; } serverHealthCheck.isHealthStatus(); - List cacheDataList = new ArrayList(); - List inInitializingCacheList = new ArrayList(); + List cacheDataList = new ArrayList<>(); + List inInitializingCacheList = new ArrayList<>(); cacheMap.forEach((key, val) -> cacheDataList.add(val)); List changedTpIds = checkUpdateDataIds(cacheDataList, inInitializingCacheList); for (String groupKey : changedTpIds) { @@ -169,10 +171,10 @@ public class ClientWorker { if (StringUtils.isEmpty(probeUpdateString)) { return Collections.emptyList(); } - Map params = new HashMap(2); + Map params = new HashMap<>(2); params.put(PROBE_MODIFY_REQUEST, probeUpdateString); params.put(WEIGHT_CONFIGS, IdUtil.simpleUUID()); - Map headers = new HashMap(2); + Map headers = new HashMap<>(2); headers.put(LONG_PULLING_TIMEOUT, "" + timeout); // Confirm the identity of the client, and can be modified separately when modifying the thread pool configuration. headers.put(LONG_PULLING_CLIENT_IDENTIFICATION, identify); @@ -180,8 +182,9 @@ public class ClientWorker { if (isInitializingCacheList) { headers.put(LONG_PULLING_TIMEOUT_NO_HANGUP, "true"); } + headers.put(CLIENT_VERSION, version); try { - long readTimeoutMs = timeout + (long) Math.round(timeout >> 1); + long readTimeoutMs = timeout + Math.round(timeout >> 1); Result result = agent.httpPostByConfig(LISTENER_PATH, headers, params, readTimeoutMs); if (result != null && result.isSuccess()) { return parseUpdateDataIdResponse(result.getData().toString()); @@ -194,7 +197,7 @@ public class ClientWorker { } public String getServerConfig(String namespace, String itemId, String threadPoolId, long readTimeout) { - Map params = new HashMap(3); + Map params = new HashMap<>(3); params.put("namespace", namespace); params.put("itemId", itemId); params.put("tpId", threadPoolId); @@ -216,7 +219,7 @@ public class ClientWorker { } catch (Exception e) { log.error("Polling resp decode modifiedDataIdsString error.", e); } - List updateList = new LinkedList(); + List updateList = new LinkedList<>(); for (String dataIdAndGroup : response.split(LINE_SEPARATOR)) { if (!StringUtils.isEmpty(dataIdAndGroup)) { String[] keyArr = dataIdAndGroup.split(WORD_SEPARATOR); diff --git a/pom.xml b/pom.xml index da53d543..064bcf36 100644 --- a/pom.xml +++ b/pom.xml @@ -187,6 +187,15 @@ org.springframework.boot spring-boot-maven-plugin ${spring-boot.version} + + + + + build-info + + + + org.apache.maven.plugins