diff --git a/infra/common/src/main/java/cn/hippo4j/common/model/InstanceInfo.java b/infra/common/src/main/java/cn/hippo4j/common/model/InstanceInfo.java index 204e856a..f41208e3 100644 --- a/infra/common/src/main/java/cn/hippo4j/common/model/InstanceInfo.java +++ b/infra/common/src/main/java/cn/hippo4j/common/model/InstanceInfo.java @@ -48,12 +48,16 @@ public class InstanceInfo { private String clientBasePath; + private Boolean enableRpc; + private String callBackUrl; private String identify; private String active; + private String clientVersion; + private volatile String vipAddress; private volatile String secureVipAddress; diff --git a/infra/common/src/main/java/cn/hippo4j/common/model/Result.java b/infra/common/src/main/java/cn/hippo4j/common/model/Result.java index 878b1926..a4ae82a0 100644 --- a/infra/common/src/main/java/cn/hippo4j/common/model/Result.java +++ b/infra/common/src/main/java/cn/hippo4j/common/model/Result.java @@ -20,6 +20,9 @@ package cn.hippo4j.common.model; import lombok.Data; import lombok.experimental.Accessors; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.Serializable; /** @@ -49,7 +52,7 @@ public class Result implements Serializable { /** * Response data. */ - private T data; + private transient T data; /** * Is success. @@ -59,4 +62,37 @@ public class Result implements Serializable { public boolean isSuccess() { return SUCCESS_CODE.equals(code); } + + /** + * Redefine the behavior of serialization, that is, re-acquire the initially serialized + * data from the stream and re-serialize it. Simple serialization will result in the + * loss of the field identified by transient. + */ + private void writeObject(ObjectOutputStream s) throws IOException { + s.defaultWriteObject(); + if (data == null) { + return; + } + // Serialization obj + s.writeObject(this.data); + } + + /** + * Redefine the deserialization behavior, and sequentially deserialize the data specified during + * serialization, because there is data that is not deserialized during initial deserialization, + * such as fields defined by transient + */ + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { + s.defaultReadObject(); + try { + // Deserialization obj + if (isSuccess()) { + this.data = (T) s.readObject(); + } + } catch (IOException e) { + // data may also be null when successful + } + + } } diff --git a/infra/common/src/main/java/cn/hippo4j/common/model/ThreadDetailStateInfo.java b/infra/common/src/main/java/cn/hippo4j/common/model/ThreadDetailStateInfo.java index b8777540..c0033c0b 100644 --- a/infra/common/src/main/java/cn/hippo4j/common/model/ThreadDetailStateInfo.java +++ b/infra/common/src/main/java/cn/hippo4j/common/model/ThreadDetailStateInfo.java @@ -23,6 +23,7 @@ import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.Accessors; +import java.io.Serializable; import java.util.List; /** @@ -33,7 +34,7 @@ import java.util.List; @NoArgsConstructor @AllArgsConstructor @Accessors(chain = true) -public class ThreadDetailStateInfo { +public class ThreadDetailStateInfo implements Serializable { /** * threadId diff --git a/infra/common/src/main/java/cn/hippo4j/common/model/ThreadPoolBaseInfo.java b/infra/common/src/main/java/cn/hippo4j/common/model/ThreadPoolBaseInfo.java index 8fba9dd0..000c5050 100644 --- a/infra/common/src/main/java/cn/hippo4j/common/model/ThreadPoolBaseInfo.java +++ b/infra/common/src/main/java/cn/hippo4j/common/model/ThreadPoolBaseInfo.java @@ -20,12 +20,14 @@ package cn.hippo4j.common.model; import lombok.Data; import lombok.experimental.Accessors; +import java.io.Serializable; + /** * Thread-pool base info. */ @Data @Accessors(chain = true) -public class ThreadPoolBaseInfo { +public class ThreadPoolBaseInfo implements Serializable { /** * coreSize diff --git a/starters/threadpool/server/pom.xml b/starters/threadpool/server/pom.xml index 7092cd9f..15fd62d0 100644 --- a/starters/threadpool/server/pom.xml +++ b/starters/threadpool/server/pom.xml @@ -33,6 +33,11 @@ hippo4j-threadpool-message ${project.version} + + cn.hippo4j + hippo4j-threadpool-rpc + ${revision} + org.springframework.boot spring-boot-configuration-processor diff --git a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/config/BootstrapProperties.java b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/config/BootstrapProperties.java index ba348c75..d4f34de8 100644 --- a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/config/BootstrapProperties.java +++ b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/config/BootstrapProperties.java @@ -53,6 +53,11 @@ public class BootstrapProperties implements BootstrapPropertiesInterface { */ private String nettyServerPort; + /** + * is the rpc switch turned on + */ + private Boolean enableRpc = false; + /** * Report type */ diff --git a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java index 7e8ccd9a..43aa8445 100644 --- a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -92,7 +92,8 @@ import org.springframework.core.env.ConfigurableEnvironment; @ConditionalOnBean(MarkerConfiguration.Marker.class) @EnableConfigurationProperties(BootstrapProperties.class) @ConditionalOnProperty(prefix = Constants.CONFIGURATION_PROPERTIES_PREFIX, value = "enable", matchIfMissing = true, havingValue = "true") -@ImportAutoConfiguration({WebAdapterConfiguration.class, NettyClientConfiguration.class, DiscoveryConfiguration.class, MessageConfiguration.class, UtilAutoConfiguration.class}) +@ImportAutoConfiguration({WebAdapterConfiguration.class, DiscoveryConfiguration.class, MessageConfiguration.class, UtilAutoConfiguration.class, + NettyServerConfiguration.class}) public class DynamicThreadPoolAutoConfiguration { private final BootstrapProperties properties; diff --git a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/config/NettyServerConfiguration.java b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/config/NettyServerConfiguration.java new file mode 100644 index 00000000..7d051680 --- /dev/null +++ b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/config/NettyServerConfiguration.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.springboot.starter.config; + +import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; +import cn.hippo4j.adapter.base.ThreadPoolAdapterState; +import cn.hippo4j.common.model.*; +import cn.hippo4j.rpc.discovery.ServerPort; +import cn.hippo4j.rpc.handler.ServerBareTakeHandler; +import cn.hippo4j.rpc.handler.ServerTakeHandler; +import cn.hippo4j.rpc.connection.SimpleServerConnection; +import cn.hippo4j.rpc.server.Server; +import cn.hippo4j.rpc.server.ServerSupport; +import cn.hippo4j.springboot.starter.controller.ThreadPoolAdapterController; +import cn.hippo4j.springboot.starter.controller.WebThreadPoolController; +import cn.hippo4j.springboot.starter.controller.WebThreadPoolRunStateController; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.PostConstruct; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.List; + +@Slf4j +@RequiredArgsConstructor +public class NettyServerConfiguration { + + ServerPort serverPort; + + final BootstrapProperties properties; + final ThreadPoolAdapterController threadPoolAdapterController; + final WebThreadPoolController webThreadPoolController; + final WebThreadPoolRunStateController webThreadPoolRunStateController; + + private static final String GET_POOL_BASE_STATE = "getPoolBaseState"; + private static final String GET_POOL_RUN_STATE = "getPoolRunState"; + private static final String UPDATE_WEB_THREAD_POOL = "updateWebThreadPool"; + private static final String GET_ADAPTER_THREAD_POOL = "getAdapterThreadPool"; + private static final String UPDATE_ADAPTER_THREAD_POOL = "updateAdapterThreadPool"; + private static final String GET_WEB_POOL_RUN_STATE = "getWebPoolRunState"; + private static final String GET_THREAD_STATE_DETAIL = "getThreadStateDetail"; + + @PostConstruct + public void nettyServerPort() throws IOException { + if (Boolean.FALSE.equals(properties.getEnableRpc())) { + return; + } + this.serverPort = new ServerRandomPort(); + // getPoolBaseState + ServerTakeHandler> getPoolBaseState = + new ServerTakeHandler<>(GET_POOL_BASE_STATE, webThreadPoolController::getPoolBaseState); + // getPoolRunState + ServerBareTakeHandler> getPoolRunState = + new ServerBareTakeHandler<>(GET_POOL_RUN_STATE, webThreadPoolController::getPoolRunState); + // updateWebThreadPool + ServerTakeHandler> updateWebThreadPool = + new ServerTakeHandler<>(UPDATE_WEB_THREAD_POOL, webThreadPoolController::updateWebThreadPool); + // getAdapterThreadPool + ServerTakeHandler> getAdapterThreadPool = + new ServerTakeHandler<>(GET_ADAPTER_THREAD_POOL, threadPoolAdapterController::getAdapterThreadPool); + // updateAdapterThreadPool + ServerTakeHandler> updateAdapterThreadPool = + new ServerTakeHandler<>(UPDATE_ADAPTER_THREAD_POOL, threadPoolAdapterController::updateAdapterThreadPool); + // getWebPoolRunState + ServerTakeHandler> getWebPoolRunState = + new ServerTakeHandler<>(GET_WEB_POOL_RUN_STATE, webThreadPoolRunStateController::getPoolRunState); + // getThreadStateDetail + ServerTakeHandler>> getThreadStateDetail = + new ServerTakeHandler<>(GET_THREAD_STATE_DETAIL, webThreadPoolRunStateController::getThreadStateDetail); + + try ( + SimpleServerConnection connection = new SimpleServerConnection( + getPoolBaseState, + getPoolRunState, + updateWebThreadPool, + getAdapterThreadPool, + updateAdapterThreadPool, + getWebPoolRunState, + getThreadStateDetail); + Server server = new ServerSupport(serverPort, connection)) { + if (log.isInfoEnabled()) { + log.info("Netty server started, binding to port {}", serverPort.getPort()); + } + server.bind(); + } + } + + public int getServerPort() { + return serverPort == null ? 0 : serverPort.getPort(); + } + + /** + * Safely create random ports + */ + static class ServerRandomPort implements ServerPort { + + final int port = getRandomPort(); + + @Override + public int getPort() { + return port; + } + + private int getRandomPort() { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + } + +} diff --git a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java index a75a7f6a..a118f67c 100644 --- a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java +++ b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/core/ClientWorker.java @@ -205,7 +205,9 @@ public class ClientWorker implements DisposableBean { if (isInitializingCacheList) { headers.put(LONG_PULLING_TIMEOUT_NO_HANGUP, "true"); } - headers.put(CLIENT_VERSION, version); + if (version != null) { + headers.put(CLIENT_VERSION, version); + } try { long readTimeoutMs = timeout + Math.round(timeout >> 1); Result result = agent.httpPostByConfig(LISTENER_PATH, headers, params, readTimeoutMs); diff --git a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java index 82f67457..bc070272 100644 --- a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java +++ b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java @@ -27,6 +27,7 @@ import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.core.toolkit.IdentifyUtil; import cn.hippo4j.core.toolkit.inet.InetUtils; import cn.hippo4j.springboot.starter.config.BootstrapProperties; +import cn.hippo4j.springboot.starter.config.NettyServerConfiguration; import cn.hippo4j.springboot.starter.remote.HttpAgent; import cn.hippo4j.springboot.starter.toolkit.CloudCommonIdUtil; import lombok.AllArgsConstructor; @@ -68,7 +69,7 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner, ThreadPoolA for (Map.Entry threadPoolAdapterEntry : threadPoolAdapterMap.entrySet()) { ThreadPoolAdapter threadPoolAdapter = threadPoolAdapterEntry.getValue(); List threadPoolStates = threadPoolAdapter.getThreadPoolStates(); - if (CollectionUtil.isEmpty(threadPoolStates) || threadPoolStates.size() == 0) { + if (CollectionUtil.isEmpty(threadPoolStates)) { continue; } ThreadPoolAdapterCacheConfig cacheConfig = new ThreadPoolAdapterCacheConfig(); @@ -79,6 +80,14 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner, ThreadPoolA String clientAddress = CloudCommonIdUtil.getClientIpPort(environment, hippo4jInetUtils); cacheConfig.setClientAddress(clientAddress); cacheConfig.setThreadPoolAdapterStates(threadPoolStates); + // tell the server whether this client supports rpc + boolean enableRpc = properties.getEnableRpc(); + cacheConfig.setEnableRpc(enableRpc); + if (enableRpc) { + // the client registers its own IP address with the server + NettyServerConfiguration nettyServer = ApplicationContextHolder.getBean(NettyServerConfiguration.class); + cacheConfig.setNettyServerAddress(CloudCommonIdUtil.getNettyServerIpPort(nettyServer.getServerPort(), hippo4jInetUtils)); + } adapterCacheConfigList.add(cacheConfig); } return adapterCacheConfigList; diff --git a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/provider/InstanceInfoProviderFactory.java b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/provider/InstanceInfoProviderFactory.java index 697557d1..d4211d76 100644 --- a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/provider/InstanceInfoProviderFactory.java +++ b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/provider/InstanceInfoProviderFactory.java @@ -21,9 +21,13 @@ import cn.hippo4j.core.api.ClientNetworkService; import cn.hippo4j.common.model.InstanceInfo; import cn.hippo4j.common.extension.spi.ServiceLoaderRegistry; import cn.hippo4j.common.toolkit.ContentUtil; +import cn.hippo4j.common.toolkit.StringUtil; +import cn.hippo4j.core.config.ApplicationContextHolder; +import cn.hippo4j.core.executor.handler.DynamicThreadPoolBannerHandler; import cn.hippo4j.core.toolkit.IdentifyUtil; import cn.hippo4j.core.toolkit.inet.InetUtils; import cn.hippo4j.springboot.starter.config.BootstrapProperties; +import cn.hippo4j.springboot.starter.config.NettyServerConfiguration; import cn.hippo4j.springboot.starter.toolkit.CloudCommonIdUtil; import lombok.SneakyThrows; import org.springframework.core.env.ConfigurableEnvironment; @@ -62,8 +66,7 @@ public final class InstanceInfoProviderFactory { String active = environment.getProperty("spring.profiles.active", "UNKNOWN"); InstanceInfo instanceInfo = new InstanceInfo(); String instanceId = CloudCommonIdUtil.getDefaultInstanceId(environment, inetUtils); - instanceId = new StringBuilder() - .append(instanceId).append(IDENTIFY_SLICER_SYMBOL).append(CLIENT_IDENTIFICATION_VALUE).toString(); + instanceId = instanceId + IDENTIFY_SLICER_SYMBOL + CLIENT_IDENTIFICATION_VALUE; String contextPath = environment.getProperty("server.servlet.context-path", ""); instanceInfo.setInstanceId(instanceId) .setIpApplicationName(CloudCommonIdUtil.getIpApplicationName(environment, inetUtils)) @@ -71,13 +74,27 @@ public final class InstanceInfoProviderFactory { .setPort(port).setClientBasePath(contextPath).setGroupKey(ContentUtil.getGroupKey(itemId, namespace)); String[] customerNetwork = ServiceLoaderRegistry.getSingletonServiceInstances(ClientNetworkService.class) .stream().findFirst().map(each -> each.getNetworkIpPort(environment)).orElse(null); - String callBackUrl = new StringBuilder().append(Optional.ofNullable(customerNetwork).map(each -> each[0]).orElse(instanceInfo.getHostName())).append(":") - .append(Optional.ofNullable(customerNetwork).map(each -> each[1]).orElse(port)).append(instanceInfo.getClientBasePath()) - .toString(); + String serverPort; + if (Boolean.FALSE.equals(bootstrapProperties.getEnableRpc())) { + serverPort = Optional.ofNullable(customerNetwork).map(each -> each[1]).orElse(port); + } else { + NettyServerConfiguration nettyServer = ApplicationContextHolder.getBean(NettyServerConfiguration.class); + serverPort = String.valueOf(nettyServer.getServerPort()); + } + + String callBackUrl = StringUtil.newBuilder( + Optional.ofNullable(customerNetwork).map(each -> each[0]).orElse(instanceInfo.getHostName()), + ":", + serverPort, + instanceInfo.getClientBasePath()); + // notify server side clients of version information + DynamicThreadPoolBannerHandler bannerHandler = ApplicationContextHolder.getBean(DynamicThreadPoolBannerHandler.class); + instanceInfo.setClientVersion(bannerHandler.getVersion()); instanceInfo.setCallBackUrl(callBackUrl); String identify = IdentifyUtil.generate(environment, inetUtils); instanceInfo.setIdentify(identify); instanceInfo.setActive(active.toUpperCase()); + instanceInfo.setEnableRpc(bootstrapProperties.getEnableRpc()); return instanceInfo; } } diff --git a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/toolkit/CloudCommonIdUtil.java b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/toolkit/CloudCommonIdUtil.java index 53c79ff4..138720cb 100644 --- a/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/toolkit/CloudCommonIdUtil.java +++ b/starters/threadpool/server/src/main/java/cn/hippo4j/springboot/starter/toolkit/CloudCommonIdUtil.java @@ -44,6 +44,19 @@ public class CloudCommonIdUtil { return combineParts(hostname, SEPARATOR, port); } + /** + * Get netty server ip port. + * + * @param serverPort serverPort + * @param inetUtils inet utils + * @return ip and port + */ + public static String getNettyServerIpPort(int serverPort, InetUtils inetUtils) { + String hostname = inetUtils.findFirstNonLoopBackHostInfo().getIpAddress(); + String port = String.valueOf(serverPort); + return combineParts(hostname, SEPARATOR, port); + } + /** * Get default instance id. * diff --git a/starters/threadpool/server/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/starters/threadpool/server/src/main/resources/META-INF/additional-spring-configuration-metadata.json index fe916f2f..2dd76b5d 100644 --- a/starters/threadpool/server/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/starters/threadpool/server/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -12,6 +12,12 @@ "defaultValue": "8899", "description": "dynamic thread-pool server netty port." }, + { + "name": "spring.dynamic.thread-pool.enable-rpc", + "type": "java.lang.Boolean", + "defaultValue": false, + "description": "dynamic thread-pool rpc switch." + }, { "name": "spring.dynamic.thread-pool.report-type", "type": "java.lang.String", diff --git a/threadpool/adapter/base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterCacheConfig.java b/threadpool/adapter/base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterCacheConfig.java index 07b2c73c..285a3076 100644 --- a/threadpool/adapter/base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterCacheConfig.java +++ b/threadpool/adapter/base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterCacheConfig.java @@ -52,6 +52,16 @@ public class ThreadPoolAdapterCacheConfig { */ private String clientAddress; + /** + * Open server address + */ + private String nettyServerAddress; + + /** + * rpc switch + */ + private Boolean enableRpc = false; + /** * Thread-pool adapter states */ diff --git a/threadpool/adapter/base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterState.java b/threadpool/adapter/base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterState.java index b46c6c26..c829ec9f 100644 --- a/threadpool/adapter/base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterState.java +++ b/threadpool/adapter/base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterState.java @@ -45,6 +45,16 @@ public class ThreadPoolAdapterState { */ private String clientAddress; + /** + * Open server address + */ + private String nettyServerAddress; + + /** + * rpc switch + */ + private Boolean enableRpc = false; + /** * Core size */ diff --git a/threadpool/server/config/pom.xml b/threadpool/server/config/pom.xml index 75740382..d5f02768 100644 --- a/threadpool/server/config/pom.xml +++ b/threadpool/server/config/pom.xml @@ -19,6 +19,11 @@ hippo4j-threadpool-infra-common ${project.version} + + cn.hippo4j + hippo4j-threadpool-rpc + ${revision} + org.springframework.boot spring-boot-starter diff --git a/threadpool/server/config/src/main/java/cn/hippo4j/config/service/ThreadPoolAdapterService.java b/threadpool/server/config/src/main/java/cn/hippo4j/config/service/ThreadPoolAdapterService.java index 3d69cd56..71280c51 100644 --- a/threadpool/server/config/src/main/java/cn/hippo4j/config/service/ThreadPoolAdapterService.java +++ b/threadpool/server/config/src/main/java/cn/hippo4j/config/service/ThreadPoolAdapterService.java @@ -18,18 +18,27 @@ package cn.hippo4j.config.service; import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig; +import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; import cn.hippo4j.adapter.base.ThreadPoolAdapterState; +import cn.hippo4j.common.constant.ConfigModifyTypeConstants; import cn.hippo4j.common.extension.design.AbstractSubjectCenter; import cn.hippo4j.common.extension.design.Observer; import cn.hippo4j.common.extension.design.ObserverMessage; import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.common.toolkit.JSONUtil; +import cn.hippo4j.common.toolkit.UserContext; +import cn.hippo4j.common.toolkit.BeanUtil; import cn.hippo4j.common.toolkit.StringUtil; import cn.hippo4j.common.toolkit.http.HttpUtil; import cn.hippo4j.common.model.Result; import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterReqDTO; import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterRespDTO; +import cn.hippo4j.config.model.biz.threadpool.ConfigModifySaveReqDTO; +import cn.hippo4j.config.verify.ConfigModificationVerifyServiceChoose; +import cn.hippo4j.rpc.support.AddressUtil; +import cn.hippo4j.rpc.client.ClientSupport; import com.fasterxml.jackson.core.type.TypeReference; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -51,6 +60,7 @@ import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL; */ @Slf4j @Service +@RequiredArgsConstructor public class ThreadPoolAdapterService { /** @@ -58,6 +68,8 @@ public class ThreadPoolAdapterService { */ private static final Map>>> THREAD_POOL_ADAPTER_MAP = new ConcurrentHashMap<>(); + private final ConfigModificationVerifyServiceChoose configModificationVerifyServiceChoose; + static { AbstractSubjectCenter.register(AbstractSubjectCenter.SubjectType.CLEAR_CONFIG_CACHE, new ClearThreadPoolAdapterCache()); } @@ -83,11 +95,15 @@ public class ThreadPoolAdapterService { adapterStateList = new ArrayList<>(); tenantItemMap.put(adapterState.getThreadPoolKey(), adapterStateList); } - Optional first = adapterStateList.stream().filter(state -> Objects.equals(state.getClientAddress(), each.getClientAddress())).findFirst(); + String clientAddress = each.getClientAddress(); + String nettyServerAddress = each.getNettyServerAddress(); + Optional first = adapterStateList.stream().filter(state -> Objects.equals(state.getClientAddress(), clientAddress)).findFirst(); if (!first.isPresent()) { ThreadPoolAdapterState state = new ThreadPoolAdapterState(); - state.setClientAddress(each.getClientAddress()); + state.setClientAddress(clientAddress); state.setIdentify(each.getClientIdentify()); + state.setNettyServerAddress(nettyServerAddress); + state.setEnableRpc(each.getEnableRpc()); adapterStateList.add(state); } } @@ -100,25 +116,34 @@ public class ThreadPoolAdapterService { .map(each -> each.get(requestParameter.getTenant() + IDENTIFY_SLICER_SYMBOL + requestParameter.getItem())) .map(each -> each.get(requestParameter.getThreadPoolKey())) .orElse(new ArrayList<>()); - List addressList = actual.stream().map(ThreadPoolAdapterState::getClientAddress).collect(Collectors.toList()); - List result = new ArrayList<>(addressList.size()); - addressList.forEach(each -> { - String url = StringUtil.newBuilder("http://", each, "/adapter/thread-pool/info"); - Map param = new HashMap<>(); - param.put("mark", requestParameter.getMark()); - param.put("threadPoolKey", requestParameter.getThreadPoolKey()); - try { - String resultStr = HttpUtil.get(url, param); - if (StringUtil.isNotBlank(resultStr)) { - Result restResult = JSONUtil.parseObject(resultStr, new TypeReference>() { - }); - result.add(restResult.getData()); - } - } catch (Throwable ex) { - log.error("Failed to get third-party thread pool data.", ex); - } - }); - return result; + return actual.stream() + .map(t -> { + try { + if (Boolean.TRUE.equals(t.getEnableRpc())) { + ThreadPoolAdapterParameter parameter = new ThreadPoolAdapterParameter(); + parameter.setThreadPoolKey(requestParameter.getThreadPoolKey()); + parameter.setMark(requestParameter.getMark()); + String nettyServerAddress = t.getNettyServerAddress(); + Result result = ClientSupport.clientSend( + nettyServerAddress, "getAdapterThreadPool", parameter); + return BeanUtil.convert(result.getData(), ThreadPoolAdapterRespDTO.class); + } + String clientAddress = t.getClientAddress(); + String url = StringUtil.newBuilder("http://", clientAddress, "/adapter/thread-pool/info"); + Map param = new HashMap<>(); + param.put("mark", requestParameter.getMark()); + param.put("threadPoolKey", requestParameter.getThreadPoolKey()); + String resultStr = HttpUtil.get(url, param); + if (StringUtil.isNotBlank(resultStr)) { + Result restResult = JSONUtil.parseObject(resultStr, new TypeReference>() { + }); + return restResult.getData(); + } + } catch (Throwable ex) { + log.error("Failed to get third-party thread pool data.", ex); + } + return null; + }).filter(Objects::nonNull).collect(Collectors.toList()); } public Set queryThreadPoolKey(ThreadPoolAdapterReqDTO requestParameter) { @@ -133,10 +158,52 @@ public class ThreadPoolAdapterService { return new HashSet<>(); } + /** + * If the user's authority is the administrator authority, modify it directly, if not, add it to the database and wait for review + * + * @param requestParameter ThreadPoolAdapterReqDTO + */ + public void updateThreadPool(ThreadPoolAdapterReqDTO requestParameter) { + if (UserContext.getUserRole().equals("ROLE_ADMIN")) { + List actual = Optional.ofNullable(THREAD_POOL_ADAPTER_MAP.get(requestParameter.getMark())) + .map(each -> each.get(requestParameter.getTenant() + IDENTIFY_SLICER_SYMBOL + requestParameter.getItem())) + .map(each -> each.get(requestParameter.getThreadPoolKey())) + .orElse(new ArrayList<>()); + actual.forEach(t -> { + if (Boolean.TRUE.equals(t.getEnableRpc())) { + String nettyServerAddress = t.getNettyServerAddress(); + ThreadPoolAdapterParameter parameter = BeanUtil.convert(requestParameter, ThreadPoolAdapterParameter.class); + ClientSupport.clientSend(nettyServerAddress, "updateAdapterThreadPool", parameter); + } else { + for (String each : requestParameter.getClientAddressList()) { + String urlString = StringUtil.newBuilder("http://", each, "/adapter/thread-pool/update"); + HttpUtil.post(urlString, requestParameter); + } + } + }); + } else { + ConfigModifySaveReqDTO modifySaveReqDTO = BeanUtil.convert(requestParameter, ConfigModifySaveReqDTO.class); + modifySaveReqDTO.setModifyUser(UserContext.getUserName()); + modifySaveReqDTO.setTenantId(requestParameter.getTenant()); + modifySaveReqDTO.setItemId(requestParameter.getItem()); + modifySaveReqDTO.setTpId(requestParameter.getThreadPoolKey()); + modifySaveReqDTO.setType(ConfigModifyTypeConstants.ADAPTER_THREAD_POOL); + configModificationVerifyServiceChoose.choose(modifySaveReqDTO.getType()).saveConfigModifyApplication(modifySaveReqDTO); + } + } + public static void remove(String identify) { synchronized (ThreadPoolAdapterService.class) { - THREAD_POOL_ADAPTER_MAP.values() - .forEach(each -> each.forEach((key, val) -> val.forEach((threadPoolKey, states) -> states.removeIf(adapterState -> Objects.equals(adapterState.getIdentify(), identify))))); + THREAD_POOL_ADAPTER_MAP.values().forEach(each -> each.forEach((key, val) -> val.forEach((threadPoolKey, states) -> { + states.stream() + .filter(s -> Objects.equals(s.getIdentify(), identify)) + .forEach(t -> { + if (Boolean.TRUE.equals(t.getEnableRpc())) { + ClientSupport.closeClient(AddressUtil.getInetAddress(t.getNettyServerAddress())); + } + }); + states.removeIf(s -> Objects.equals(s.getIdentify(), identify)); + }))); } } diff --git a/threadpool/server/config/src/main/java/cn/hippo4j/config/service/biz/impl/AbstractConfigModificationVerifyService.java b/threadpool/server/config/src/main/java/cn/hippo4j/config/service/biz/impl/AbstractConfigModificationVerifyService.java index 62315301..5369e15a 100644 --- a/threadpool/server/config/src/main/java/cn/hippo4j/config/service/biz/impl/AbstractConfigModificationVerifyService.java +++ b/threadpool/server/config/src/main/java/cn/hippo4j/config/service/biz/impl/AbstractConfigModificationVerifyService.java @@ -18,11 +18,14 @@ package cn.hippo4j.config.service.biz.impl; import cn.hippo4j.common.extension.enums.VerifyEnum; +import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; import cn.hippo4j.common.model.InstanceInfo; import cn.hippo4j.common.toolkit.BeanUtil; import cn.hippo4j.common.toolkit.ConditionUtil; import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.toolkit.UserContext; +import cn.hippo4j.common.toolkit.StringUtil; +import cn.hippo4j.common.toolkit.http.HttpUtil; import cn.hippo4j.config.mapper.HisConfigVerifyMapper; import cn.hippo4j.config.model.HisConfigVerifyInfo; import cn.hippo4j.config.model.biz.threadpool.ConfigModifySaveReqDTO; @@ -30,6 +33,7 @@ import cn.hippo4j.config.model.biz.threadpool.ConfigModifyVerifyReqDTO; import cn.hippo4j.config.service.biz.ConfigModificationVerifyService; import cn.hippo4j.discovery.core.BaseInstanceRegistry; import cn.hippo4j.discovery.core.Lease; +import cn.hippo4j.rpc.client.ClientSupport; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import javax.annotation.Resource; @@ -113,5 +117,20 @@ public abstract class AbstractConfigModificationVerifyService implements ConfigM * * @param reqDTO */ - protected abstract void updateThreadPoolParameter(ConfigModifyVerifyReqDTO reqDTO); + protected void updateThreadPoolParameter(ConfigModifyVerifyReqDTO reqDTO) { + for (String each : getClientAddress(reqDTO)) { + ThreadPoolAdapterParameter parameter = new ThreadPoolAdapterParameter(); + parameter.setMark(reqDTO.getMark()); + parameter.setMaximumPoolSize(reqDTO.getMaximumPoolSize()); + parameter.setCorePoolSize(reqDTO.getCorePoolSize()); + parameter.setThreadPoolKey(reqDTO.getThreadPoolKey()); + if (baseInstanceRegistry.getInstanceSupport(each)) { + String callUrl = baseInstanceRegistry.getInstanceCallUrl(each); + ClientSupport.clientSend(callUrl, "updateAdapterThreadPool", parameter); + } else { + String urlString = StringUtil.newBuilder("http://", each, "/adapter/thread-pool/update"); + HttpUtil.post(urlString, reqDTO); + } + } + } } diff --git a/threadpool/server/config/src/main/java/cn/hippo4j/config/service/biz/impl/AdapterThreadPoolConfigModificationVerifyServiceImpl.java b/threadpool/server/config/src/main/java/cn/hippo4j/config/service/biz/impl/AdapterThreadPoolConfigModificationVerifyServiceImpl.java index fb6cf00f..70a497bd 100644 --- a/threadpool/server/config/src/main/java/cn/hippo4j/config/service/biz/impl/AdapterThreadPoolConfigModificationVerifyServiceImpl.java +++ b/threadpool/server/config/src/main/java/cn/hippo4j/config/service/biz/impl/AdapterThreadPoolConfigModificationVerifyServiceImpl.java @@ -18,9 +18,6 @@ package cn.hippo4j.config.service.biz.impl; import cn.hippo4j.common.constant.ConfigModifyTypeConstants; -import cn.hippo4j.common.toolkit.StringUtil; -import cn.hippo4j.common.toolkit.http.HttpUtil; -import cn.hippo4j.config.model.biz.threadpool.ConfigModifyVerifyReqDTO; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -36,11 +33,4 @@ public class AdapterThreadPoolConfigModificationVerifyServiceImpl extends Abstra return ConfigModifyTypeConstants.ADAPTER_THREAD_POOL; } - @Override - protected void updateThreadPoolParameter(ConfigModifyVerifyReqDTO reqDTO) { - for (String each : getClientAddress(reqDTO)) { - String urlString = StringUtil.newBuilder("http://", each, "/adapter/thread-pool/update"); - HttpUtil.post(urlString, reqDTO); - } - } } diff --git a/threadpool/server/config/src/main/java/cn/hippo4j/config/service/biz/impl/WebThreadPoolConfigModificationVerifyServiceImpl.java b/threadpool/server/config/src/main/java/cn/hippo4j/config/service/biz/impl/WebThreadPoolConfigModificationVerifyServiceImpl.java index e4fb6b17..9e527b77 100644 --- a/threadpool/server/config/src/main/java/cn/hippo4j/config/service/biz/impl/WebThreadPoolConfigModificationVerifyServiceImpl.java +++ b/threadpool/server/config/src/main/java/cn/hippo4j/config/service/biz/impl/WebThreadPoolConfigModificationVerifyServiceImpl.java @@ -18,9 +18,6 @@ package cn.hippo4j.config.service.biz.impl; import cn.hippo4j.common.constant.ConfigModifyTypeConstants; -import cn.hippo4j.common.toolkit.StringUtil; -import cn.hippo4j.common.toolkit.http.HttpUtil; -import cn.hippo4j.config.model.biz.threadpool.ConfigModifyVerifyReqDTO; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -36,11 +33,4 @@ public class WebThreadPoolConfigModificationVerifyServiceImpl extends AbstractCo return ConfigModifyTypeConstants.WEB_THREAD_POOL; } - @Override - protected void updateThreadPoolParameter(ConfigModifyVerifyReqDTO reqDTO) { - for (String each : getClientAddress(reqDTO)) { - String urlString = StringUtil.newBuilder("http://", each, "/web/update/pool"); - HttpUtil.post(urlString, reqDTO); - } - } } diff --git a/threadpool/server/console/src/main/java/cn/hippo4j/console/controller/ThreadPoolAdapterController.java b/threadpool/server/console/src/main/java/cn/hippo4j/console/controller/ThreadPoolAdapterController.java index 1cd0e43c..81fd297d 100644 --- a/threadpool/server/console/src/main/java/cn/hippo4j/console/controller/ThreadPoolAdapterController.java +++ b/threadpool/server/console/src/main/java/cn/hippo4j/console/controller/ThreadPoolAdapterController.java @@ -17,18 +17,11 @@ package cn.hippo4j.console.controller; -import cn.hippo4j.common.constant.ConfigModifyTypeConstants; -import cn.hippo4j.common.toolkit.BeanUtil; -import cn.hippo4j.common.toolkit.StringUtil; -import cn.hippo4j.common.toolkit.UserContext; -import cn.hippo4j.common.toolkit.http.HttpUtil; import cn.hippo4j.common.model.Result; import cn.hippo4j.server.common.base.Results; import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterReqDTO; import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterRespDTO; -import cn.hippo4j.config.model.biz.threadpool.ConfigModifySaveReqDTO; import cn.hippo4j.config.service.ThreadPoolAdapterService; -import cn.hippo4j.config.verify.ConfigModificationVerifyServiceChoose; import lombok.RequiredArgsConstructor; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; @@ -49,8 +42,6 @@ public class ThreadPoolAdapterController { private final ThreadPoolAdapterService threadPoolAdapterService; - private final ConfigModificationVerifyServiceChoose configModificationVerifyServiceChoose; - @GetMapping(REGISTER_ADAPTER_BASE_PATH + "/query") public Result> queryAdapterThreadPool(ThreadPoolAdapterReqDTO requestParameter) { List result = threadPoolAdapterService.query(requestParameter); @@ -65,20 +56,7 @@ public class ThreadPoolAdapterController { @PostMapping(REGISTER_ADAPTER_BASE_PATH + "/update") public Result updateAdapterThreadPool(@RequestBody ThreadPoolAdapterReqDTO requestParameter) { - if (UserContext.getUserRole().equals("ROLE_ADMIN")) { - for (String each : requestParameter.getClientAddressList()) { - String urlString = StringUtil.newBuilder("http://", each, "/adapter/thread-pool/update"); - HttpUtil.post(urlString, requestParameter); - } - } else { - ConfigModifySaveReqDTO modifySaveReqDTO = BeanUtil.convert(requestParameter, ConfigModifySaveReqDTO.class); - modifySaveReqDTO.setModifyUser(UserContext.getUserName()); - modifySaveReqDTO.setTenantId(requestParameter.getTenant()); - modifySaveReqDTO.setItemId(requestParameter.getItem()); - modifySaveReqDTO.setTpId(requestParameter.getThreadPoolKey()); - modifySaveReqDTO.setType(ConfigModifyTypeConstants.ADAPTER_THREAD_POOL); - configModificationVerifyServiceChoose.choose(modifySaveReqDTO.getType()).saveConfigModifyApplication(modifySaveReqDTO); - } + threadPoolAdapterService.updateThreadPool(requestParameter); return Results.success(); } diff --git a/threadpool/server/console/src/main/java/cn/hippo4j/console/controller/ThreadPoolController.java b/threadpool/server/console/src/main/java/cn/hippo4j/console/controller/ThreadPoolController.java index 8084b50a..6833b230 100644 --- a/threadpool/server/console/src/main/java/cn/hippo4j/console/controller/ThreadPoolController.java +++ b/threadpool/server/console/src/main/java/cn/hippo4j/console/controller/ThreadPoolController.java @@ -20,6 +20,7 @@ package cn.hippo4j.console.controller; import cn.hippo4j.common.constant.ConfigModifyTypeConstants; import cn.hippo4j.common.constant.Constants; import cn.hippo4j.common.model.InstanceInfo; +import cn.hippo4j.common.model.ThreadPoolParameterInfo; import cn.hippo4j.common.toolkit.BeanUtil; import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.common.toolkit.StringUtil; @@ -28,11 +29,11 @@ import cn.hippo4j.common.toolkit.http.HttpUtil; import cn.hippo4j.common.model.Result; import cn.hippo4j.server.common.base.Results; import cn.hippo4j.config.model.CacheItem; -import cn.hippo4j.config.model.biz.threadpool.ConfigModifySaveReqDTO; import cn.hippo4j.config.model.biz.threadpool.ThreadPoolDelReqDTO; +import cn.hippo4j.config.model.biz.threadpool.ThreadPoolSaveOrUpdateReqDTO; +import cn.hippo4j.config.model.biz.threadpool.ConfigModifySaveReqDTO; import cn.hippo4j.config.model.biz.threadpool.ThreadPoolQueryReqDTO; import cn.hippo4j.config.model.biz.threadpool.ThreadPoolRespDTO; -import cn.hippo4j.config.model.biz.threadpool.ThreadPoolSaveOrUpdateReqDTO; import cn.hippo4j.config.service.ConfigCacheService; import cn.hippo4j.config.service.biz.ThreadPoolService; import cn.hippo4j.config.verify.ConfigModificationVerifyServiceChoose; @@ -42,16 +43,17 @@ import cn.hippo4j.console.model.WebThreadPoolRespDTO; import cn.hippo4j.discovery.core.BaseInstanceRegistry; import cn.hippo4j.discovery.core.Lease; import cn.hippo4j.server.common.base.exception.ErrorCodeEnum; +import cn.hippo4j.rpc.client.ClientSupport; import com.baomidou.mybatisplus.core.metadata.IPage; import lombok.RequiredArgsConstructor; import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import java.util.ArrayList; @@ -122,6 +124,10 @@ public class ThreadPoolController { @GetMapping("/run/state/{tpId}") public Result runState(@PathVariable("tpId") String tpId, @RequestParam(value = "clientAddress") String clientAddress) { + if (baseInstanceRegistry.getInstanceSupport(clientAddress)) { + String callUrl = baseInstanceRegistry.getInstanceCallUrl(clientAddress); + return ClientSupport.clientSend(callUrl, "getWebPoolRunState", tpId); + } String urlString = StringUtil.newBuilder(HTTP, clientAddress, "/run/state/", tpId); return HttpUtil.get(urlString, Result.class); } @@ -129,6 +135,10 @@ public class ThreadPoolController { @GetMapping("/run/thread/state/{tpId}") public Result runThreadState(@PathVariable("tpId") String tpId, @RequestParam(value = "clientAddress") String clientAddress) { + if (baseInstanceRegistry.getInstanceSupport(clientAddress)) { + String callUrl = baseInstanceRegistry.getInstanceCallUrl(clientAddress); + return ClientSupport.clientSend(callUrl, "getThreadStateDetail", tpId); + } String urlString = StringUtil.newBuilder(HTTP, clientAddress, "/run/thread/state/", tpId); return HttpUtil.get(urlString, Result.class); } @@ -167,12 +177,22 @@ public class ThreadPoolController { @GetMapping("/web/base/info") public Result getPoolBaseState(@RequestParam(value = "mark") String mark, @RequestParam(value = "clientAddress") String clientAddress) { + boolean supportRpc = baseInstanceRegistry.getInstanceSupport(clientAddress); + if (supportRpc) { + String callUrl = baseInstanceRegistry.getInstanceCallUrl(clientAddress); + return ClientSupport.clientSend(callUrl, "getPoolBaseState", mark); + } String urlString = StringUtil.newBuilder(HTTP, clientAddress, "/web/base/info", "?mark=", mark); return HttpUtil.get(urlString, Result.class); } @GetMapping("/web/run/state") public Result getPoolRunState(@RequestParam(value = "clientAddress") String clientAddress) { + boolean supportRpc = baseInstanceRegistry.getInstanceSupport(clientAddress); + if (supportRpc) { + String callUrl = baseInstanceRegistry.getInstanceCallUrl(clientAddress); + return ClientSupport.clientSend(callUrl, "getPoolRunState"); + } String urlString = StringUtil.newBuilder(HTTP, clientAddress, "/web/run/state"); return HttpUtil.get(urlString, Result.class); } @@ -181,8 +201,14 @@ public class ThreadPoolController { public Result updateWebThreadPool(@RequestBody WebThreadPoolReqDTO requestParam) { if (UserContext.getUserRole().equals("ROLE_ADMIN")) { for (String each : requestParam.getClientAddressList()) { - String urlString = StringUtil.newBuilder(HTTP, each, "/web/update/pool"); - HttpUtil.post(urlString, requestParam); + ThreadPoolParameterInfo parameterInfo = BeanUtil.convert(requestParam, ThreadPoolParameterInfo.class); + if (baseInstanceRegistry.getInstanceSupport(each)) { + String callUrl = baseInstanceRegistry.getInstanceCallUrl(each); + ClientSupport.clientSend(callUrl, "updateWebThreadPool", parameterInfo); + } else { + String urlString = StringUtil.newBuilder(HTTP, each, "/web/update/pool"); + HttpUtil.post(urlString, requestParam); + } } } else { ConfigModifySaveReqDTO modifySaveReqDTO = BeanUtil.convert(requestParam, ConfigModifySaveReqDTO.class); @@ -206,9 +232,9 @@ public class ThreadPoolController { String groupKey = getGroupKey(tpId, itemTenantKey); Map content = ConfigCacheService.getContent(groupKey); Map activeMap = - leases.stream().map(each -> each.getHolder()).filter(each -> StringUtil.isNotBlank(each.getActive())) + leases.stream().map(Lease::getHolder).filter(each -> StringUtil.isNotBlank(each.getActive())) .collect(Collectors.toMap(InstanceInfo::getIdentify, InstanceInfo::getActive)); - Map clientBasePathMap = leases.stream().map(each -> each.getHolder()) + Map clientBasePathMap = leases.stream().map(Lease::getHolder) .filter(each -> StringUtil.isNotBlank(each.getClientBasePath())) .collect(Collectors.toMap(InstanceInfo::getIdentify, InstanceInfo::getClientBasePath)); List returnThreadPool = new ArrayList<>(); diff --git a/threadpool/server/discovery/src/main/java/cn/hippo4j/discovery/core/BaseInstanceRegistry.java b/threadpool/server/discovery/src/main/java/cn/hippo4j/discovery/core/BaseInstanceRegistry.java index 3c05f901..99b0e52a 100644 --- a/threadpool/server/discovery/src/main/java/cn/hippo4j/discovery/core/BaseInstanceRegistry.java +++ b/threadpool/server/discovery/src/main/java/cn/hippo4j/discovery/core/BaseInstanceRegistry.java @@ -19,18 +19,22 @@ package cn.hippo4j.discovery.core; import cn.hippo4j.common.executor.ThreadFactoryBuilder; import cn.hippo4j.common.extension.design.AbstractSubjectCenter; +import cn.hippo4j.common.constant.Constants; import cn.hippo4j.common.model.InstanceInfo; import cn.hippo4j.common.model.InstanceInfo.InstanceStatus; import cn.hippo4j.common.toolkit.CollectionUtil; +import cn.hippo4j.common.toolkit.StringUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; +import java.util.Collection; +import java.util.Objects; +import java.util.TimerTask; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -154,6 +158,45 @@ public class BaseInstanceRegistry implements InstanceRegistry { return true; } + /** + * obtain whether a client supports rpc + * + * @param clientAddress address + * @return InstanceInfo + */ + public boolean getInstanceSupport(String clientAddress) { + return registry.values().stream().map(Map::values) + .flatMap(Collection::stream) + .map(Lease::getHolder) + .filter(Objects::nonNull) + .anyMatch(i -> { + String s = StringUtil.subBefore(i.getIdentify(), Constants.IDENTIFY_SLICER_SYMBOL); + return (Objects.equals(clientAddress, s) || Objects.equals(clientAddress, i.getCallBackUrl())) + && i.getEnableRpc(); + }); + } + + /** + * get server address + * + * @param clientAddress address + * @return address + */ + public String getInstanceCallUrl(String clientAddress) { + return registry.values().stream().map(Map::values) + .flatMap(Collection::stream) + .map(Lease::getHolder) + .filter(Objects::nonNull) + .filter(i -> { + String s = StringUtil.subBefore(i.getIdentify(), Constants.IDENTIFY_SLICER_SYMBOL); + return Objects.equals(clientAddress, s) || Objects.equals(clientAddress, i.getCallBackUrl()) + && i.getEnableRpc(); + }) + .findFirst() + .map(InstanceInfo::getCallBackUrl) + .orElse(null); + } + /** * EvictionTask */