fix : use rpc to complete the call

pull/1312/head
pizihao 2 years ago
parent 630e8402cd
commit 5ecb5926e7

@ -48,12 +48,16 @@ public class InstanceInfo {
private String clientBasePath; private String clientBasePath;
private Boolean enableRpc;
private String callBackUrl; private String callBackUrl;
private String identify; private String identify;
private String active; private String active;
private String clientVersion;
private volatile String vipAddress; private volatile String vipAddress;
private volatile String secureVipAddress; private volatile String secureVipAddress;

@ -20,6 +20,9 @@ package cn.hippo4j.common.model;
import lombok.Data; import lombok.Data;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable; import java.io.Serializable;
/** /**
@ -49,7 +52,7 @@ public class Result<T> implements Serializable {
/** /**
* Response data. * Response data.
*/ */
private T data; private transient T data;
/** /**
* Is success. * Is success.
@ -59,4 +62,37 @@ public class Result<T> implements Serializable {
public boolean isSuccess() { public boolean isSuccess() {
return SUCCESS_CODE.equals(code); return SUCCESS_CODE.equals(code);
} }
/**
* Redefine the behavior of serialization, that is, re-acquire the initially serialized
* data from the stream and re-serialize it. Simple serialization will result in the
* loss of the field identified by transient.
*/
private void writeObject(ObjectOutputStream s) throws IOException {
s.defaultWriteObject();
if (data == null) {
return;
}
// Serialization obj
s.writeObject(this.data);
}
/**
* Redefine the deserialization behavior, and sequentially deserialize the data specified during
* serialization, because there is data that is not deserialized during initial deserialization,
* such as fields defined by transient
*/
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
s.defaultReadObject();
try {
// Deserialization obj
if (isSuccess()) {
this.data = (T) s.readObject();
}
} catch (IOException e) {
// data may also be null when successful
}
}
} }

@ -23,6 +23,7 @@ import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import java.io.Serializable;
import java.util.List; import java.util.List;
/** /**
@ -33,7 +34,7 @@ import java.util.List;
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
@Accessors(chain = true) @Accessors(chain = true)
public class ThreadDetailStateInfo { public class ThreadDetailStateInfo implements Serializable {
/** /**
* threadId * threadId

@ -20,12 +20,14 @@ package cn.hippo4j.common.model;
import lombok.Data; import lombok.Data;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import java.io.Serializable;
/** /**
* Thread-pool base info. * Thread-pool base info.
*/ */
@Data @Data
@Accessors(chain = true) @Accessors(chain = true)
public class ThreadPoolBaseInfo { public class ThreadPoolBaseInfo implements Serializable {
/** /**
* coreSize * coreSize

@ -33,6 +33,11 @@
<artifactId>hippo4j-threadpool-message</artifactId> <artifactId>hippo4j-threadpool-message</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-rpc</artifactId>
<version>${revision}</version>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId> <artifactId>spring-boot-configuration-processor</artifactId>

@ -53,6 +53,11 @@ public class BootstrapProperties implements BootstrapPropertiesInterface {
*/ */
private String nettyServerPort; private String nettyServerPort;
/**
* is the rpc switch turned on
*/
private Boolean enableRpc = false;
/** /**
* Report type * Report type
*/ */

@ -92,7 +92,8 @@ import org.springframework.core.env.ConfigurableEnvironment;
@ConditionalOnBean(MarkerConfiguration.Marker.class) @ConditionalOnBean(MarkerConfiguration.Marker.class)
@EnableConfigurationProperties(BootstrapProperties.class) @EnableConfigurationProperties(BootstrapProperties.class)
@ConditionalOnProperty(prefix = Constants.CONFIGURATION_PROPERTIES_PREFIX, value = "enable", matchIfMissing = true, havingValue = "true") @ConditionalOnProperty(prefix = Constants.CONFIGURATION_PROPERTIES_PREFIX, value = "enable", matchIfMissing = true, havingValue = "true")
@ImportAutoConfiguration({WebAdapterConfiguration.class, NettyClientConfiguration.class, DiscoveryConfiguration.class, MessageConfiguration.class, UtilAutoConfiguration.class}) @ImportAutoConfiguration({WebAdapterConfiguration.class, DiscoveryConfiguration.class, MessageConfiguration.class, UtilAutoConfiguration.class,
NettyServerConfiguration.class})
public class DynamicThreadPoolAutoConfiguration { public class DynamicThreadPoolAutoConfiguration {
private final BootstrapProperties properties; private final BootstrapProperties properties;

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.config;
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.model.*;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.ServerBareTakeHandler;
import cn.hippo4j.rpc.handler.ServerTakeHandler;
import cn.hippo4j.rpc.connection.SimpleServerConnection;
import cn.hippo4j.rpc.server.Server;
import cn.hippo4j.rpc.server.ServerSupport;
import cn.hippo4j.springboot.starter.controller.ThreadPoolAdapterController;
import cn.hippo4j.springboot.starter.controller.WebThreadPoolController;
import cn.hippo4j.springboot.starter.controller.WebThreadPoolRunStateController;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.List;
@Slf4j
@RequiredArgsConstructor
public class NettyServerConfiguration {
ServerPort serverPort;
final BootstrapProperties properties;
final ThreadPoolAdapterController threadPoolAdapterController;
final WebThreadPoolController webThreadPoolController;
final WebThreadPoolRunStateController webThreadPoolRunStateController;
private static final String GET_POOL_BASE_STATE = "getPoolBaseState";
private static final String GET_POOL_RUN_STATE = "getPoolRunState";
private static final String UPDATE_WEB_THREAD_POOL = "updateWebThreadPool";
private static final String GET_ADAPTER_THREAD_POOL = "getAdapterThreadPool";
private static final String UPDATE_ADAPTER_THREAD_POOL = "updateAdapterThreadPool";
private static final String GET_WEB_POOL_RUN_STATE = "getWebPoolRunState";
private static final String GET_THREAD_STATE_DETAIL = "getThreadStateDetail";
@PostConstruct
public void nettyServerPort() throws IOException {
if (Boolean.FALSE.equals(properties.getEnableRpc())) {
return;
}
this.serverPort = new ServerRandomPort();
// getPoolBaseState
ServerTakeHandler<String, Result<ThreadPoolBaseInfo>> getPoolBaseState =
new ServerTakeHandler<>(GET_POOL_BASE_STATE, webThreadPoolController::getPoolBaseState);
// getPoolRunState
ServerBareTakeHandler<Result<ThreadPoolRunStateInfo>> getPoolRunState =
new ServerBareTakeHandler<>(GET_POOL_RUN_STATE, webThreadPoolController::getPoolRunState);
// updateWebThreadPool
ServerTakeHandler<ThreadPoolParameterInfo, Result<Void>> updateWebThreadPool =
new ServerTakeHandler<>(UPDATE_WEB_THREAD_POOL, webThreadPoolController::updateWebThreadPool);
// getAdapterThreadPool
ServerTakeHandler<ThreadPoolAdapterParameter, Result<ThreadPoolAdapterState>> getAdapterThreadPool =
new ServerTakeHandler<>(GET_ADAPTER_THREAD_POOL, threadPoolAdapterController::getAdapterThreadPool);
// updateAdapterThreadPool
ServerTakeHandler<ThreadPoolAdapterParameter, Result<Void>> updateAdapterThreadPool =
new ServerTakeHandler<>(UPDATE_ADAPTER_THREAD_POOL, threadPoolAdapterController::updateAdapterThreadPool);
// getWebPoolRunState
ServerTakeHandler<String, Result<ThreadPoolRunStateInfo>> getWebPoolRunState =
new ServerTakeHandler<>(GET_WEB_POOL_RUN_STATE, webThreadPoolRunStateController::getPoolRunState);
// getThreadStateDetail
ServerTakeHandler<String, Result<List<ThreadDetailStateInfo>>> getThreadStateDetail =
new ServerTakeHandler<>(GET_THREAD_STATE_DETAIL, webThreadPoolRunStateController::getThreadStateDetail);
try (
SimpleServerConnection connection = new SimpleServerConnection(
getPoolBaseState,
getPoolRunState,
updateWebThreadPool,
getAdapterThreadPool,
updateAdapterThreadPool,
getWebPoolRunState,
getThreadStateDetail);
Server server = new ServerSupport(serverPort, connection)) {
if (log.isInfoEnabled()) {
log.info("Netty server started, binding to port {}", serverPort.getPort());
}
server.bind();
}
}
public int getServerPort() {
return serverPort == null ? 0 : serverPort.getPort();
}
/**
* Safely create random ports
*/
static class ServerRandomPort implements ServerPort {
final int port = getRandomPort();
@Override
public int getPort() {
return port;
}
private int getRandomPort() {
try (ServerSocket socket = new ServerSocket(0)) {
return socket.getLocalPort();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}

@ -205,7 +205,9 @@ public class ClientWorker implements DisposableBean {
if (isInitializingCacheList) { if (isInitializingCacheList) {
headers.put(LONG_PULLING_TIMEOUT_NO_HANGUP, "true"); headers.put(LONG_PULLING_TIMEOUT_NO_HANGUP, "true");
} }
headers.put(CLIENT_VERSION, version); if (version != null) {
headers.put(CLIENT_VERSION, version);
}
try { try {
long readTimeoutMs = timeout + Math.round(timeout >> 1); long readTimeoutMs = timeout + Math.round(timeout >> 1);
Result result = agent.httpPostByConfig(LISTENER_PATH, headers, params, readTimeoutMs); Result result = agent.httpPostByConfig(LISTENER_PATH, headers, params, readTimeoutMs);

@ -27,6 +27,7 @@ import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.core.toolkit.IdentifyUtil; import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.core.toolkit.inet.InetUtils; import cn.hippo4j.core.toolkit.inet.InetUtils;
import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import cn.hippo4j.springboot.starter.config.NettyServerConfiguration;
import cn.hippo4j.springboot.starter.remote.HttpAgent; import cn.hippo4j.springboot.starter.remote.HttpAgent;
import cn.hippo4j.springboot.starter.toolkit.CloudCommonIdUtil; import cn.hippo4j.springboot.starter.toolkit.CloudCommonIdUtil;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
@ -68,7 +69,7 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner, ThreadPoolA
for (Map.Entry<String, ThreadPoolAdapter> threadPoolAdapterEntry : threadPoolAdapterMap.entrySet()) { for (Map.Entry<String, ThreadPoolAdapter> threadPoolAdapterEntry : threadPoolAdapterMap.entrySet()) {
ThreadPoolAdapter threadPoolAdapter = threadPoolAdapterEntry.getValue(); ThreadPoolAdapter threadPoolAdapter = threadPoolAdapterEntry.getValue();
List<ThreadPoolAdapterState> threadPoolStates = threadPoolAdapter.getThreadPoolStates(); List<ThreadPoolAdapterState> threadPoolStates = threadPoolAdapter.getThreadPoolStates();
if (CollectionUtil.isEmpty(threadPoolStates) || threadPoolStates.size() == 0) { if (CollectionUtil.isEmpty(threadPoolStates)) {
continue; continue;
} }
ThreadPoolAdapterCacheConfig cacheConfig = new ThreadPoolAdapterCacheConfig(); ThreadPoolAdapterCacheConfig cacheConfig = new ThreadPoolAdapterCacheConfig();
@ -79,6 +80,14 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner, ThreadPoolA
String clientAddress = CloudCommonIdUtil.getClientIpPort(environment, hippo4jInetUtils); String clientAddress = CloudCommonIdUtil.getClientIpPort(environment, hippo4jInetUtils);
cacheConfig.setClientAddress(clientAddress); cacheConfig.setClientAddress(clientAddress);
cacheConfig.setThreadPoolAdapterStates(threadPoolStates); cacheConfig.setThreadPoolAdapterStates(threadPoolStates);
// tell the server whether this client supports rpc
boolean enableRpc = properties.getEnableRpc();
cacheConfig.setEnableRpc(enableRpc);
if (enableRpc) {
// the client registers its own IP address with the server
NettyServerConfiguration nettyServer = ApplicationContextHolder.getBean(NettyServerConfiguration.class);
cacheConfig.setNettyServerAddress(CloudCommonIdUtil.getNettyServerIpPort(nettyServer.getServerPort(), hippo4jInetUtils));
}
adapterCacheConfigList.add(cacheConfig); adapterCacheConfigList.add(cacheConfig);
} }
return adapterCacheConfigList; return adapterCacheConfigList;

@ -21,9 +21,13 @@ import cn.hippo4j.core.api.ClientNetworkService;
import cn.hippo4j.common.model.InstanceInfo; import cn.hippo4j.common.model.InstanceInfo;
import cn.hippo4j.common.extension.spi.ServiceLoaderRegistry; import cn.hippo4j.common.extension.spi.ServiceLoaderRegistry;
import cn.hippo4j.common.toolkit.ContentUtil; import cn.hippo4j.common.toolkit.ContentUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.handler.DynamicThreadPoolBannerHandler;
import cn.hippo4j.core.toolkit.IdentifyUtil; import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.core.toolkit.inet.InetUtils; import cn.hippo4j.core.toolkit.inet.InetUtils;
import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import cn.hippo4j.springboot.starter.config.NettyServerConfiguration;
import cn.hippo4j.springboot.starter.toolkit.CloudCommonIdUtil; import cn.hippo4j.springboot.starter.toolkit.CloudCommonIdUtil;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.core.env.ConfigurableEnvironment;
@ -62,8 +66,7 @@ public final class InstanceInfoProviderFactory {
String active = environment.getProperty("spring.profiles.active", "UNKNOWN"); String active = environment.getProperty("spring.profiles.active", "UNKNOWN");
InstanceInfo instanceInfo = new InstanceInfo(); InstanceInfo instanceInfo = new InstanceInfo();
String instanceId = CloudCommonIdUtil.getDefaultInstanceId(environment, inetUtils); String instanceId = CloudCommonIdUtil.getDefaultInstanceId(environment, inetUtils);
instanceId = new StringBuilder() instanceId = instanceId + IDENTIFY_SLICER_SYMBOL + CLIENT_IDENTIFICATION_VALUE;
.append(instanceId).append(IDENTIFY_SLICER_SYMBOL).append(CLIENT_IDENTIFICATION_VALUE).toString();
String contextPath = environment.getProperty("server.servlet.context-path", ""); String contextPath = environment.getProperty("server.servlet.context-path", "");
instanceInfo.setInstanceId(instanceId) instanceInfo.setInstanceId(instanceId)
.setIpApplicationName(CloudCommonIdUtil.getIpApplicationName(environment, inetUtils)) .setIpApplicationName(CloudCommonIdUtil.getIpApplicationName(environment, inetUtils))
@ -71,13 +74,27 @@ public final class InstanceInfoProviderFactory {
.setPort(port).setClientBasePath(contextPath).setGroupKey(ContentUtil.getGroupKey(itemId, namespace)); .setPort(port).setClientBasePath(contextPath).setGroupKey(ContentUtil.getGroupKey(itemId, namespace));
String[] customerNetwork = ServiceLoaderRegistry.getSingletonServiceInstances(ClientNetworkService.class) String[] customerNetwork = ServiceLoaderRegistry.getSingletonServiceInstances(ClientNetworkService.class)
.stream().findFirst().map(each -> each.getNetworkIpPort(environment)).orElse(null); .stream().findFirst().map(each -> each.getNetworkIpPort(environment)).orElse(null);
String callBackUrl = new StringBuilder().append(Optional.ofNullable(customerNetwork).map(each -> each[0]).orElse(instanceInfo.getHostName())).append(":") String serverPort;
.append(Optional.ofNullable(customerNetwork).map(each -> each[1]).orElse(port)).append(instanceInfo.getClientBasePath()) if (Boolean.FALSE.equals(bootstrapProperties.getEnableRpc())) {
.toString(); serverPort = Optional.ofNullable(customerNetwork).map(each -> each[1]).orElse(port);
} else {
NettyServerConfiguration nettyServer = ApplicationContextHolder.getBean(NettyServerConfiguration.class);
serverPort = String.valueOf(nettyServer.getServerPort());
}
String callBackUrl = StringUtil.newBuilder(
Optional.ofNullable(customerNetwork).map(each -> each[0]).orElse(instanceInfo.getHostName()),
":",
serverPort,
instanceInfo.getClientBasePath());
// notify server side clients of version information
DynamicThreadPoolBannerHandler bannerHandler = ApplicationContextHolder.getBean(DynamicThreadPoolBannerHandler.class);
instanceInfo.setClientVersion(bannerHandler.getVersion());
instanceInfo.setCallBackUrl(callBackUrl); instanceInfo.setCallBackUrl(callBackUrl);
String identify = IdentifyUtil.generate(environment, inetUtils); String identify = IdentifyUtil.generate(environment, inetUtils);
instanceInfo.setIdentify(identify); instanceInfo.setIdentify(identify);
instanceInfo.setActive(active.toUpperCase()); instanceInfo.setActive(active.toUpperCase());
instanceInfo.setEnableRpc(bootstrapProperties.getEnableRpc());
return instanceInfo; return instanceInfo;
} }
} }

@ -44,6 +44,19 @@ public class CloudCommonIdUtil {
return combineParts(hostname, SEPARATOR, port); return combineParts(hostname, SEPARATOR, port);
} }
/**
* Get netty server ip port.
*
* @param serverPort serverPort
* @param inetUtils inet utils
* @return ip and port
*/
public static String getNettyServerIpPort(int serverPort, InetUtils inetUtils) {
String hostname = inetUtils.findFirstNonLoopBackHostInfo().getIpAddress();
String port = String.valueOf(serverPort);
return combineParts(hostname, SEPARATOR, port);
}
/** /**
* Get default instance id. * Get default instance id.
* *

@ -12,6 +12,12 @@
"defaultValue": "8899", "defaultValue": "8899",
"description": "dynamic thread-pool server netty port." "description": "dynamic thread-pool server netty port."
}, },
{
"name": "spring.dynamic.thread-pool.enable-rpc",
"type": "java.lang.Boolean",
"defaultValue": false,
"description": "dynamic thread-pool rpc switch."
},
{ {
"name": "spring.dynamic.thread-pool.report-type", "name": "spring.dynamic.thread-pool.report-type",
"type": "java.lang.String", "type": "java.lang.String",

@ -52,6 +52,16 @@ public class ThreadPoolAdapterCacheConfig {
*/ */
private String clientAddress; private String clientAddress;
/**
* Open server address
*/
private String nettyServerAddress;
/**
* rpc switch
*/
private Boolean enableRpc = false;
/** /**
* Thread-pool adapter states * Thread-pool adapter states
*/ */

@ -45,6 +45,16 @@ public class ThreadPoolAdapterState {
*/ */
private String clientAddress; private String clientAddress;
/**
* Open server address
*/
private String nettyServerAddress;
/**
* rpc switch
*/
private Boolean enableRpc = false;
/** /**
* Core size * Core size
*/ */

@ -19,6 +19,11 @@
<artifactId>hippo4j-threadpool-infra-common</artifactId> <artifactId>hippo4j-threadpool-infra-common</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-rpc</artifactId>
<version>${revision}</version>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId> <artifactId>spring-boot-starter</artifactId>

@ -18,18 +18,27 @@
package cn.hippo4j.config.service; package cn.hippo4j.config.service;
import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig; import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig;
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState; import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.constant.ConfigModifyTypeConstants;
import cn.hippo4j.common.extension.design.AbstractSubjectCenter; import cn.hippo4j.common.extension.design.AbstractSubjectCenter;
import cn.hippo4j.common.extension.design.Observer; import cn.hippo4j.common.extension.design.Observer;
import cn.hippo4j.common.extension.design.ObserverMessage; import cn.hippo4j.common.extension.design.ObserverMessage;
import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.toolkit.UserContext;
import cn.hippo4j.common.toolkit.BeanUtil;
import cn.hippo4j.common.toolkit.StringUtil; import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.common.toolkit.http.HttpUtil; import cn.hippo4j.common.toolkit.http.HttpUtil;
import cn.hippo4j.common.model.Result; import cn.hippo4j.common.model.Result;
import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterReqDTO; import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterReqDTO;
import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterRespDTO; import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterRespDTO;
import cn.hippo4j.config.model.biz.threadpool.ConfigModifySaveReqDTO;
import cn.hippo4j.config.verify.ConfigModificationVerifyServiceChoose;
import cn.hippo4j.rpc.support.AddressUtil;
import cn.hippo4j.rpc.client.ClientSupport;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -51,6 +60,7 @@ import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL;
*/ */
@Slf4j @Slf4j
@Service @Service
@RequiredArgsConstructor
public class ThreadPoolAdapterService { public class ThreadPoolAdapterService {
/** /**
@ -58,6 +68,8 @@ public class ThreadPoolAdapterService {
*/ */
private static final Map<String, Map<String, Map<String, List<ThreadPoolAdapterState>>>> THREAD_POOL_ADAPTER_MAP = new ConcurrentHashMap<>(); private static final Map<String, Map<String, Map<String, List<ThreadPoolAdapterState>>>> THREAD_POOL_ADAPTER_MAP = new ConcurrentHashMap<>();
private final ConfigModificationVerifyServiceChoose configModificationVerifyServiceChoose;
static { static {
AbstractSubjectCenter.register(AbstractSubjectCenter.SubjectType.CLEAR_CONFIG_CACHE, new ClearThreadPoolAdapterCache()); AbstractSubjectCenter.register(AbstractSubjectCenter.SubjectType.CLEAR_CONFIG_CACHE, new ClearThreadPoolAdapterCache());
} }
@ -83,11 +95,15 @@ public class ThreadPoolAdapterService {
adapterStateList = new ArrayList<>(); adapterStateList = new ArrayList<>();
tenantItemMap.put(adapterState.getThreadPoolKey(), adapterStateList); tenantItemMap.put(adapterState.getThreadPoolKey(), adapterStateList);
} }
Optional<ThreadPoolAdapterState> first = adapterStateList.stream().filter(state -> Objects.equals(state.getClientAddress(), each.getClientAddress())).findFirst(); String clientAddress = each.getClientAddress();
String nettyServerAddress = each.getNettyServerAddress();
Optional<ThreadPoolAdapterState> first = adapterStateList.stream().filter(state -> Objects.equals(state.getClientAddress(), clientAddress)).findFirst();
if (!first.isPresent()) { if (!first.isPresent()) {
ThreadPoolAdapterState state = new ThreadPoolAdapterState(); ThreadPoolAdapterState state = new ThreadPoolAdapterState();
state.setClientAddress(each.getClientAddress()); state.setClientAddress(clientAddress);
state.setIdentify(each.getClientIdentify()); state.setIdentify(each.getClientIdentify());
state.setNettyServerAddress(nettyServerAddress);
state.setEnableRpc(each.getEnableRpc());
adapterStateList.add(state); adapterStateList.add(state);
} }
} }
@ -100,25 +116,34 @@ public class ThreadPoolAdapterService {
.map(each -> each.get(requestParameter.getTenant() + IDENTIFY_SLICER_SYMBOL + requestParameter.getItem())) .map(each -> each.get(requestParameter.getTenant() + IDENTIFY_SLICER_SYMBOL + requestParameter.getItem()))
.map(each -> each.get(requestParameter.getThreadPoolKey())) .map(each -> each.get(requestParameter.getThreadPoolKey()))
.orElse(new ArrayList<>()); .orElse(new ArrayList<>());
List<String> addressList = actual.stream().map(ThreadPoolAdapterState::getClientAddress).collect(Collectors.toList()); return actual.stream()
List<ThreadPoolAdapterRespDTO> result = new ArrayList<>(addressList.size()); .map(t -> {
addressList.forEach(each -> { try {
String url = StringUtil.newBuilder("http://", each, "/adapter/thread-pool/info"); if (Boolean.TRUE.equals(t.getEnableRpc())) {
Map<String, String> param = new HashMap<>(); ThreadPoolAdapterParameter parameter = new ThreadPoolAdapterParameter();
param.put("mark", requestParameter.getMark()); parameter.setThreadPoolKey(requestParameter.getThreadPoolKey());
param.put("threadPoolKey", requestParameter.getThreadPoolKey()); parameter.setMark(requestParameter.getMark());
try { String nettyServerAddress = t.getNettyServerAddress();
String resultStr = HttpUtil.get(url, param); Result<ThreadPoolAdapterState> result = ClientSupport.clientSend(
if (StringUtil.isNotBlank(resultStr)) { nettyServerAddress, "getAdapterThreadPool", parameter);
Result<ThreadPoolAdapterRespDTO> restResult = JSONUtil.parseObject(resultStr, new TypeReference<Result<ThreadPoolAdapterRespDTO>>() { return BeanUtil.convert(result.getData(), ThreadPoolAdapterRespDTO.class);
}); }
result.add(restResult.getData()); String clientAddress = t.getClientAddress();
} String url = StringUtil.newBuilder("http://", clientAddress, "/adapter/thread-pool/info");
} catch (Throwable ex) { Map<String, String> param = new HashMap<>();
log.error("Failed to get third-party thread pool data.", ex); param.put("mark", requestParameter.getMark());
} param.put("threadPoolKey", requestParameter.getThreadPoolKey());
}); String resultStr = HttpUtil.get(url, param);
return result; if (StringUtil.isNotBlank(resultStr)) {
Result<ThreadPoolAdapterRespDTO> restResult = JSONUtil.parseObject(resultStr, new TypeReference<Result<ThreadPoolAdapterRespDTO>>() {
});
return restResult.getData();
}
} catch (Throwable ex) {
log.error("Failed to get third-party thread pool data.", ex);
}
return null;
}).filter(Objects::nonNull).collect(Collectors.toList());
} }
public Set<String> queryThreadPoolKey(ThreadPoolAdapterReqDTO requestParameter) { public Set<String> queryThreadPoolKey(ThreadPoolAdapterReqDTO requestParameter) {
@ -133,10 +158,52 @@ public class ThreadPoolAdapterService {
return new HashSet<>(); return new HashSet<>();
} }
/**
* If the user's authority is the administrator authority, modify it directly, if not, add it to the database and wait for review
*
* @param requestParameter ThreadPoolAdapterReqDTO
*/
public void updateThreadPool(ThreadPoolAdapterReqDTO requestParameter) {
if (UserContext.getUserRole().equals("ROLE_ADMIN")) {
List<ThreadPoolAdapterState> actual = Optional.ofNullable(THREAD_POOL_ADAPTER_MAP.get(requestParameter.getMark()))
.map(each -> each.get(requestParameter.getTenant() + IDENTIFY_SLICER_SYMBOL + requestParameter.getItem()))
.map(each -> each.get(requestParameter.getThreadPoolKey()))
.orElse(new ArrayList<>());
actual.forEach(t -> {
if (Boolean.TRUE.equals(t.getEnableRpc())) {
String nettyServerAddress = t.getNettyServerAddress();
ThreadPoolAdapterParameter parameter = BeanUtil.convert(requestParameter, ThreadPoolAdapterParameter.class);
ClientSupport.clientSend(nettyServerAddress, "updateAdapterThreadPool", parameter);
} else {
for (String each : requestParameter.getClientAddressList()) {
String urlString = StringUtil.newBuilder("http://", each, "/adapter/thread-pool/update");
HttpUtil.post(urlString, requestParameter);
}
}
});
} else {
ConfigModifySaveReqDTO modifySaveReqDTO = BeanUtil.convert(requestParameter, ConfigModifySaveReqDTO.class);
modifySaveReqDTO.setModifyUser(UserContext.getUserName());
modifySaveReqDTO.setTenantId(requestParameter.getTenant());
modifySaveReqDTO.setItemId(requestParameter.getItem());
modifySaveReqDTO.setTpId(requestParameter.getThreadPoolKey());
modifySaveReqDTO.setType(ConfigModifyTypeConstants.ADAPTER_THREAD_POOL);
configModificationVerifyServiceChoose.choose(modifySaveReqDTO.getType()).saveConfigModifyApplication(modifySaveReqDTO);
}
}
public static void remove(String identify) { public static void remove(String identify) {
synchronized (ThreadPoolAdapterService.class) { synchronized (ThreadPoolAdapterService.class) {
THREAD_POOL_ADAPTER_MAP.values() THREAD_POOL_ADAPTER_MAP.values().forEach(each -> each.forEach((key, val) -> val.forEach((threadPoolKey, states) -> {
.forEach(each -> each.forEach((key, val) -> val.forEach((threadPoolKey, states) -> states.removeIf(adapterState -> Objects.equals(adapterState.getIdentify(), identify))))); states.stream()
.filter(s -> Objects.equals(s.getIdentify(), identify))
.forEach(t -> {
if (Boolean.TRUE.equals(t.getEnableRpc())) {
ClientSupport.closeClient(AddressUtil.getInetAddress(t.getNettyServerAddress()));
}
});
states.removeIf(s -> Objects.equals(s.getIdentify(), identify));
})));
} }
} }

@ -18,11 +18,14 @@
package cn.hippo4j.config.service.biz.impl; package cn.hippo4j.config.service.biz.impl;
import cn.hippo4j.common.extension.enums.VerifyEnum; import cn.hippo4j.common.extension.enums.VerifyEnum;
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.common.model.InstanceInfo; import cn.hippo4j.common.model.InstanceInfo;
import cn.hippo4j.common.toolkit.BeanUtil; import cn.hippo4j.common.toolkit.BeanUtil;
import cn.hippo4j.common.toolkit.ConditionUtil; import cn.hippo4j.common.toolkit.ConditionUtil;
import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.toolkit.UserContext; import cn.hippo4j.common.toolkit.UserContext;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.common.toolkit.http.HttpUtil;
import cn.hippo4j.config.mapper.HisConfigVerifyMapper; import cn.hippo4j.config.mapper.HisConfigVerifyMapper;
import cn.hippo4j.config.model.HisConfigVerifyInfo; import cn.hippo4j.config.model.HisConfigVerifyInfo;
import cn.hippo4j.config.model.biz.threadpool.ConfigModifySaveReqDTO; import cn.hippo4j.config.model.biz.threadpool.ConfigModifySaveReqDTO;
@ -30,6 +33,7 @@ import cn.hippo4j.config.model.biz.threadpool.ConfigModifyVerifyReqDTO;
import cn.hippo4j.config.service.biz.ConfigModificationVerifyService; import cn.hippo4j.config.service.biz.ConfigModificationVerifyService;
import cn.hippo4j.discovery.core.BaseInstanceRegistry; import cn.hippo4j.discovery.core.BaseInstanceRegistry;
import cn.hippo4j.discovery.core.Lease; import cn.hippo4j.discovery.core.Lease;
import cn.hippo4j.rpc.client.ClientSupport;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import javax.annotation.Resource; import javax.annotation.Resource;
@ -113,5 +117,20 @@ public abstract class AbstractConfigModificationVerifyService implements ConfigM
* *
* @param reqDTO * @param reqDTO
*/ */
protected abstract void updateThreadPoolParameter(ConfigModifyVerifyReqDTO reqDTO); protected void updateThreadPoolParameter(ConfigModifyVerifyReqDTO reqDTO) {
for (String each : getClientAddress(reqDTO)) {
ThreadPoolAdapterParameter parameter = new ThreadPoolAdapterParameter();
parameter.setMark(reqDTO.getMark());
parameter.setMaximumPoolSize(reqDTO.getMaximumPoolSize());
parameter.setCorePoolSize(reqDTO.getCorePoolSize());
parameter.setThreadPoolKey(reqDTO.getThreadPoolKey());
if (baseInstanceRegistry.getInstanceSupport(each)) {
String callUrl = baseInstanceRegistry.getInstanceCallUrl(each);
ClientSupport.clientSend(callUrl, "updateAdapterThreadPool", parameter);
} else {
String urlString = StringUtil.newBuilder("http://", each, "/adapter/thread-pool/update");
HttpUtil.post(urlString, reqDTO);
}
}
}
} }

@ -18,9 +18,6 @@
package cn.hippo4j.config.service.biz.impl; package cn.hippo4j.config.service.biz.impl;
import cn.hippo4j.common.constant.ConfigModifyTypeConstants; import cn.hippo4j.common.constant.ConfigModifyTypeConstants;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.common.toolkit.http.HttpUtil;
import cn.hippo4j.config.model.biz.threadpool.ConfigModifyVerifyReqDTO;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -36,11 +33,4 @@ public class AdapterThreadPoolConfigModificationVerifyServiceImpl extends Abstra
return ConfigModifyTypeConstants.ADAPTER_THREAD_POOL; return ConfigModifyTypeConstants.ADAPTER_THREAD_POOL;
} }
@Override
protected void updateThreadPoolParameter(ConfigModifyVerifyReqDTO reqDTO) {
for (String each : getClientAddress(reqDTO)) {
String urlString = StringUtil.newBuilder("http://", each, "/adapter/thread-pool/update");
HttpUtil.post(urlString, reqDTO);
}
}
} }

@ -18,9 +18,6 @@
package cn.hippo4j.config.service.biz.impl; package cn.hippo4j.config.service.biz.impl;
import cn.hippo4j.common.constant.ConfigModifyTypeConstants; import cn.hippo4j.common.constant.ConfigModifyTypeConstants;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.common.toolkit.http.HttpUtil;
import cn.hippo4j.config.model.biz.threadpool.ConfigModifyVerifyReqDTO;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -36,11 +33,4 @@ public class WebThreadPoolConfigModificationVerifyServiceImpl extends AbstractCo
return ConfigModifyTypeConstants.WEB_THREAD_POOL; return ConfigModifyTypeConstants.WEB_THREAD_POOL;
} }
@Override
protected void updateThreadPoolParameter(ConfigModifyVerifyReqDTO reqDTO) {
for (String each : getClientAddress(reqDTO)) {
String urlString = StringUtil.newBuilder("http://", each, "/web/update/pool");
HttpUtil.post(urlString, reqDTO);
}
}
} }

@ -17,18 +17,11 @@
package cn.hippo4j.console.controller; package cn.hippo4j.console.controller;
import cn.hippo4j.common.constant.ConfigModifyTypeConstants;
import cn.hippo4j.common.toolkit.BeanUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.common.toolkit.UserContext;
import cn.hippo4j.common.toolkit.http.HttpUtil;
import cn.hippo4j.common.model.Result; import cn.hippo4j.common.model.Result;
import cn.hippo4j.server.common.base.Results; import cn.hippo4j.server.common.base.Results;
import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterReqDTO; import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterReqDTO;
import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterRespDTO; import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterRespDTO;
import cn.hippo4j.config.model.biz.threadpool.ConfigModifySaveReqDTO;
import cn.hippo4j.config.service.ThreadPoolAdapterService; import cn.hippo4j.config.service.ThreadPoolAdapterService;
import cn.hippo4j.config.verify.ConfigModificationVerifyServiceChoose;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
@ -49,8 +42,6 @@ public class ThreadPoolAdapterController {
private final ThreadPoolAdapterService threadPoolAdapterService; private final ThreadPoolAdapterService threadPoolAdapterService;
private final ConfigModificationVerifyServiceChoose configModificationVerifyServiceChoose;
@GetMapping(REGISTER_ADAPTER_BASE_PATH + "/query") @GetMapping(REGISTER_ADAPTER_BASE_PATH + "/query")
public Result<List<ThreadPoolAdapterRespDTO>> queryAdapterThreadPool(ThreadPoolAdapterReqDTO requestParameter) { public Result<List<ThreadPoolAdapterRespDTO>> queryAdapterThreadPool(ThreadPoolAdapterReqDTO requestParameter) {
List<ThreadPoolAdapterRespDTO> result = threadPoolAdapterService.query(requestParameter); List<ThreadPoolAdapterRespDTO> result = threadPoolAdapterService.query(requestParameter);
@ -65,20 +56,7 @@ public class ThreadPoolAdapterController {
@PostMapping(REGISTER_ADAPTER_BASE_PATH + "/update") @PostMapping(REGISTER_ADAPTER_BASE_PATH + "/update")
public Result<Void> updateAdapterThreadPool(@RequestBody ThreadPoolAdapterReqDTO requestParameter) { public Result<Void> updateAdapterThreadPool(@RequestBody ThreadPoolAdapterReqDTO requestParameter) {
if (UserContext.getUserRole().equals("ROLE_ADMIN")) { threadPoolAdapterService.updateThreadPool(requestParameter);
for (String each : requestParameter.getClientAddressList()) {
String urlString = StringUtil.newBuilder("http://", each, "/adapter/thread-pool/update");
HttpUtil.post(urlString, requestParameter);
}
} else {
ConfigModifySaveReqDTO modifySaveReqDTO = BeanUtil.convert(requestParameter, ConfigModifySaveReqDTO.class);
modifySaveReqDTO.setModifyUser(UserContext.getUserName());
modifySaveReqDTO.setTenantId(requestParameter.getTenant());
modifySaveReqDTO.setItemId(requestParameter.getItem());
modifySaveReqDTO.setTpId(requestParameter.getThreadPoolKey());
modifySaveReqDTO.setType(ConfigModifyTypeConstants.ADAPTER_THREAD_POOL);
configModificationVerifyServiceChoose.choose(modifySaveReqDTO.getType()).saveConfigModifyApplication(modifySaveReqDTO);
}
return Results.success(); return Results.success();
} }

@ -20,6 +20,7 @@ package cn.hippo4j.console.controller;
import cn.hippo4j.common.constant.ConfigModifyTypeConstants; import cn.hippo4j.common.constant.ConfigModifyTypeConstants;
import cn.hippo4j.common.constant.Constants; import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.model.InstanceInfo; import cn.hippo4j.common.model.InstanceInfo;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.toolkit.BeanUtil; import cn.hippo4j.common.toolkit.BeanUtil;
import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.StringUtil; import cn.hippo4j.common.toolkit.StringUtil;
@ -28,11 +29,11 @@ import cn.hippo4j.common.toolkit.http.HttpUtil;
import cn.hippo4j.common.model.Result; import cn.hippo4j.common.model.Result;
import cn.hippo4j.server.common.base.Results; import cn.hippo4j.server.common.base.Results;
import cn.hippo4j.config.model.CacheItem; import cn.hippo4j.config.model.CacheItem;
import cn.hippo4j.config.model.biz.threadpool.ConfigModifySaveReqDTO;
import cn.hippo4j.config.model.biz.threadpool.ThreadPoolDelReqDTO; import cn.hippo4j.config.model.biz.threadpool.ThreadPoolDelReqDTO;
import cn.hippo4j.config.model.biz.threadpool.ThreadPoolSaveOrUpdateReqDTO;
import cn.hippo4j.config.model.biz.threadpool.ConfigModifySaveReqDTO;
import cn.hippo4j.config.model.biz.threadpool.ThreadPoolQueryReqDTO; import cn.hippo4j.config.model.biz.threadpool.ThreadPoolQueryReqDTO;
import cn.hippo4j.config.model.biz.threadpool.ThreadPoolRespDTO; import cn.hippo4j.config.model.biz.threadpool.ThreadPoolRespDTO;
import cn.hippo4j.config.model.biz.threadpool.ThreadPoolSaveOrUpdateReqDTO;
import cn.hippo4j.config.service.ConfigCacheService; import cn.hippo4j.config.service.ConfigCacheService;
import cn.hippo4j.config.service.biz.ThreadPoolService; import cn.hippo4j.config.service.biz.ThreadPoolService;
import cn.hippo4j.config.verify.ConfigModificationVerifyServiceChoose; import cn.hippo4j.config.verify.ConfigModificationVerifyServiceChoose;
@ -42,16 +43,17 @@ import cn.hippo4j.console.model.WebThreadPoolRespDTO;
import cn.hippo4j.discovery.core.BaseInstanceRegistry; import cn.hippo4j.discovery.core.BaseInstanceRegistry;
import cn.hippo4j.discovery.core.Lease; import cn.hippo4j.discovery.core.Lease;
import cn.hippo4j.server.common.base.exception.ErrorCodeEnum; import cn.hippo4j.server.common.base.exception.ErrorCodeEnum;
import cn.hippo4j.rpc.client.ClientSupport;
import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.metadata.IPage;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList; import java.util.ArrayList;
@ -122,6 +124,10 @@ public class ThreadPoolController {
@GetMapping("/run/state/{tpId}") @GetMapping("/run/state/{tpId}")
public Result runState(@PathVariable("tpId") String tpId, public Result runState(@PathVariable("tpId") String tpId,
@RequestParam(value = "clientAddress") String clientAddress) { @RequestParam(value = "clientAddress") String clientAddress) {
if (baseInstanceRegistry.getInstanceSupport(clientAddress)) {
String callUrl = baseInstanceRegistry.getInstanceCallUrl(clientAddress);
return ClientSupport.clientSend(callUrl, "getWebPoolRunState", tpId);
}
String urlString = StringUtil.newBuilder(HTTP, clientAddress, "/run/state/", tpId); String urlString = StringUtil.newBuilder(HTTP, clientAddress, "/run/state/", tpId);
return HttpUtil.get(urlString, Result.class); return HttpUtil.get(urlString, Result.class);
} }
@ -129,6 +135,10 @@ public class ThreadPoolController {
@GetMapping("/run/thread/state/{tpId}") @GetMapping("/run/thread/state/{tpId}")
public Result runThreadState(@PathVariable("tpId") String tpId, public Result runThreadState(@PathVariable("tpId") String tpId,
@RequestParam(value = "clientAddress") String clientAddress) { @RequestParam(value = "clientAddress") String clientAddress) {
if (baseInstanceRegistry.getInstanceSupport(clientAddress)) {
String callUrl = baseInstanceRegistry.getInstanceCallUrl(clientAddress);
return ClientSupport.clientSend(callUrl, "getThreadStateDetail", tpId);
}
String urlString = StringUtil.newBuilder(HTTP, clientAddress, "/run/thread/state/", tpId); String urlString = StringUtil.newBuilder(HTTP, clientAddress, "/run/thread/state/", tpId);
return HttpUtil.get(urlString, Result.class); return HttpUtil.get(urlString, Result.class);
} }
@ -167,12 +177,22 @@ public class ThreadPoolController {
@GetMapping("/web/base/info") @GetMapping("/web/base/info")
public Result getPoolBaseState(@RequestParam(value = "mark") String mark, public Result getPoolBaseState(@RequestParam(value = "mark") String mark,
@RequestParam(value = "clientAddress") String clientAddress) { @RequestParam(value = "clientAddress") String clientAddress) {
boolean supportRpc = baseInstanceRegistry.getInstanceSupport(clientAddress);
if (supportRpc) {
String callUrl = baseInstanceRegistry.getInstanceCallUrl(clientAddress);
return ClientSupport.clientSend(callUrl, "getPoolBaseState", mark);
}
String urlString = StringUtil.newBuilder(HTTP, clientAddress, "/web/base/info", "?mark=", mark); String urlString = StringUtil.newBuilder(HTTP, clientAddress, "/web/base/info", "?mark=", mark);
return HttpUtil.get(urlString, Result.class); return HttpUtil.get(urlString, Result.class);
} }
@GetMapping("/web/run/state") @GetMapping("/web/run/state")
public Result getPoolRunState(@RequestParam(value = "clientAddress") String clientAddress) { public Result getPoolRunState(@RequestParam(value = "clientAddress") String clientAddress) {
boolean supportRpc = baseInstanceRegistry.getInstanceSupport(clientAddress);
if (supportRpc) {
String callUrl = baseInstanceRegistry.getInstanceCallUrl(clientAddress);
return ClientSupport.clientSend(callUrl, "getPoolRunState");
}
String urlString = StringUtil.newBuilder(HTTP, clientAddress, "/web/run/state"); String urlString = StringUtil.newBuilder(HTTP, clientAddress, "/web/run/state");
return HttpUtil.get(urlString, Result.class); return HttpUtil.get(urlString, Result.class);
} }
@ -181,8 +201,14 @@ public class ThreadPoolController {
public Result<Void> updateWebThreadPool(@RequestBody WebThreadPoolReqDTO requestParam) { public Result<Void> updateWebThreadPool(@RequestBody WebThreadPoolReqDTO requestParam) {
if (UserContext.getUserRole().equals("ROLE_ADMIN")) { if (UserContext.getUserRole().equals("ROLE_ADMIN")) {
for (String each : requestParam.getClientAddressList()) { for (String each : requestParam.getClientAddressList()) {
String urlString = StringUtil.newBuilder(HTTP, each, "/web/update/pool"); ThreadPoolParameterInfo parameterInfo = BeanUtil.convert(requestParam, ThreadPoolParameterInfo.class);
HttpUtil.post(urlString, requestParam); if (baseInstanceRegistry.getInstanceSupport(each)) {
String callUrl = baseInstanceRegistry.getInstanceCallUrl(each);
ClientSupport.clientSend(callUrl, "updateWebThreadPool", parameterInfo);
} else {
String urlString = StringUtil.newBuilder(HTTP, each, "/web/update/pool");
HttpUtil.post(urlString, requestParam);
}
} }
} else { } else {
ConfigModifySaveReqDTO modifySaveReqDTO = BeanUtil.convert(requestParam, ConfigModifySaveReqDTO.class); ConfigModifySaveReqDTO modifySaveReqDTO = BeanUtil.convert(requestParam, ConfigModifySaveReqDTO.class);
@ -206,9 +232,9 @@ public class ThreadPoolController {
String groupKey = getGroupKey(tpId, itemTenantKey); String groupKey = getGroupKey(tpId, itemTenantKey);
Map<String, CacheItem> content = ConfigCacheService.getContent(groupKey); Map<String, CacheItem> content = ConfigCacheService.getContent(groupKey);
Map<String, String> activeMap = Map<String, String> activeMap =
leases.stream().map(each -> each.getHolder()).filter(each -> StringUtil.isNotBlank(each.getActive())) leases.stream().map(Lease::getHolder).filter(each -> StringUtil.isNotBlank(each.getActive()))
.collect(Collectors.toMap(InstanceInfo::getIdentify, InstanceInfo::getActive)); .collect(Collectors.toMap(InstanceInfo::getIdentify, InstanceInfo::getActive));
Map<String, String> clientBasePathMap = leases.stream().map(each -> each.getHolder()) Map<String, String> clientBasePathMap = leases.stream().map(Lease::getHolder)
.filter(each -> StringUtil.isNotBlank(each.getClientBasePath())) .filter(each -> StringUtil.isNotBlank(each.getClientBasePath()))
.collect(Collectors.toMap(InstanceInfo::getIdentify, InstanceInfo::getClientBasePath)); .collect(Collectors.toMap(InstanceInfo::getIdentify, InstanceInfo::getClientBasePath));
List<ThreadPoolInstanceInfo> returnThreadPool = new ArrayList<>(); List<ThreadPoolInstanceInfo> returnThreadPool = new ArrayList<>();

@ -19,18 +19,22 @@ package cn.hippo4j.discovery.core;
import cn.hippo4j.common.executor.ThreadFactoryBuilder; import cn.hippo4j.common.executor.ThreadFactoryBuilder;
import cn.hippo4j.common.extension.design.AbstractSubjectCenter; import cn.hippo4j.common.extension.design.AbstractSubjectCenter;
import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.model.InstanceInfo; import cn.hippo4j.common.model.InstanceInfo;
import cn.hippo4j.common.model.InstanceInfo.InstanceStatus; import cn.hippo4j.common.model.InstanceInfo.InstanceStatus;
import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.util.Collection;
import java.util.Objects;
import java.util.TimerTask;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
@ -154,6 +158,45 @@ public class BaseInstanceRegistry implements InstanceRegistry<InstanceInfo> {
return true; return true;
} }
/**
* obtain whether a client supports rpc
*
* @param clientAddress address
* @return InstanceInfo
*/
public boolean getInstanceSupport(String clientAddress) {
return registry.values().stream().map(Map::values)
.flatMap(Collection::stream)
.map(Lease::getHolder)
.filter(Objects::nonNull)
.anyMatch(i -> {
String s = StringUtil.subBefore(i.getIdentify(), Constants.IDENTIFY_SLICER_SYMBOL);
return (Objects.equals(clientAddress, s) || Objects.equals(clientAddress, i.getCallBackUrl()))
&& i.getEnableRpc();
});
}
/**
* get server address
*
* @param clientAddress address
* @return address
*/
public String getInstanceCallUrl(String clientAddress) {
return registry.values().stream().map(Map::values)
.flatMap(Collection::stream)
.map(Lease::getHolder)
.filter(Objects::nonNull)
.filter(i -> {
String s = StringUtil.subBefore(i.getIdentify(), Constants.IDENTIFY_SLICER_SYMBOL);
return Objects.equals(clientAddress, s) || Objects.equals(clientAddress, i.getCallBackUrl())
&& i.getEnableRpc();
})
.findFirst()
.map(InstanceInfo::getCallBackUrl)
.orElse(null);
}
/** /**
* EvictionTask * EvictionTask
*/ */

Loading…
Cancel
Save