The RPC module is used to modify the invocation between server modes (#983)

* fix : Change the log level to debug

* fix : ThreadPoolAdapterController WebThreadPoolController, WebThreadPoolRunStateController method in does not affect the use of the three interfaces of modify the calls for the use of RPC modules
pull/986/head
pizihao 2 years ago committed by GitHub
parent 6f0889435b
commit d0ab6c932d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.adapter.base;
import cn.hippo4j.common.web.base.Result;
/**
* Thread-pool adapter api.
*/
public interface ThreadPoolAdapterApi {
/**
* Get thread pool information for the third-party framework
*
* @param requestParameter Third party frame identification and other info
* @return thread pool info
*/
Result<ThreadPoolAdapterState> getAdapterThreadPool(ThreadPoolAdapterParameter requestParameter);
/**
* Example Modify the thread pool information
*
* @param requestParameter update info
*/
Result<Void> updateAdapterThreadPool(ThreadPoolAdapterParameter requestParameter);
}

@ -52,6 +52,11 @@ public class ThreadPoolAdapterCacheConfig {
*/
private String clientAddress;
/**
* Open server address
*/
private String localServerAddress;
/**
* Thread-pool adapter states
*/

@ -19,11 +19,13 @@ package cn.hippo4j.adapter.base;
import lombok.Data;
import java.io.Serializable;
/**
* Thread pool adapter parameter info.
*/
@Data
public class ThreadPoolAdapterParameter {
public class ThreadPoolAdapterParameter implements Serializable {
/**
* Mark

@ -19,11 +19,13 @@ package cn.hippo4j.adapter.base;
import lombok.Data;
import java.io.Serializable;
/**
* Thread pool adapter state info.
*/
@Data
public class ThreadPoolAdapterState {
public class ThreadPoolAdapterState implements Serializable {
/**
* Thread-pool keu
@ -45,6 +47,11 @@ public class ThreadPoolAdapterState {
*/
private String clientAddress;
/**
* Open server address
*/
private String localServerAddress;
/**
* Core size
*/

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.common.api;
import cn.hippo4j.common.model.ThreadPoolBaseInfo;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import cn.hippo4j.common.web.base.Result;
/**
* Web thread pool api.
*/
public interface WebThreadPoolApi {
/**
* Get thread pool information by identifying the mark
*
* @param mark Third party frame identification
* @return thread pool info
*/
Result<ThreadPoolBaseInfo> getPoolBaseState(String mark);
/**
* Get all thread pool information
*
* @return thread pool info
*/
Result<ThreadPoolRunStateInfo> getPoolRunState();
/**
* Example Modify the thread pool information
*
* @param threadPoolParameterInfo update info
*/
Result<Void> updateWebThreadPool(ThreadPoolParameterInfo threadPoolParameterInfo);
}

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.common.api;
import cn.hippo4j.common.model.ThreadDetailStateInfo;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import cn.hippo4j.common.web.base.Result;
import java.util.List;
/**
* Web thread-pool run state api.
*/
public interface WebThreadPoolRunStateApi {
/**
* Get the run state info of the web thread pool
*
* @param threadPoolId the thread pool id
* @return the info
*/
Result<ThreadPoolRunStateInfo> getPoolRunState(String threadPoolId);
/**
* Get the run state detail of the web thread pool
*
* @param threadPoolId the thread pool id
* @return the detail
*/
Result<List<ThreadDetailStateInfo>> getThreadStateDetail(String threadPoolId);
}

@ -50,6 +50,8 @@ public class InstanceInfo {
private String callBackUrl;
private String serverUrl;
private String identify;
private String active;

@ -23,6 +23,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import java.io.Serializable;
import java.util.List;
/**
@ -33,7 +34,7 @@ import java.util.List;
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
public class ThreadDetailStateInfo {
public class ThreadDetailStateInfo implements Serializable {
/**
* threadId

@ -20,12 +20,14 @@ package cn.hippo4j.common.model;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
/**
* Thread-pool base info.
*/
@Data
@Accessors(chain = true)
public class ThreadPoolBaseInfo {
public class ThreadPoolBaseInfo implements Serializable {
/**
* coreSize

@ -73,11 +73,14 @@ public class NettyClientConnection implements ClientConnection {
public Response connect(Request request) {
activeProcessChain.applyPreHandle(request);
this.channel = connectionPool.acquire(timeout);
boolean debugEnabled = log.isDebugEnabled();
Response response = null;
try {
String key = request.getKey();
this.future = channel.writeAndFlush(request);
log.info("Call successful, target address is {}:{}, request key is {}", address.getHostName(), address.getPort(), key);
if (debugEnabled) {
log.debug("Call successful, target address is {}:{}, request key is {}", address.getHostName(), address.getPort(), key);
}
// Wait for execution to complete
ResultHolder.putThread(key, Thread.currentThread());
LockSupport.parkNanos(timeout() * 1000000);
@ -86,7 +89,9 @@ public class NettyClientConnection implements ClientConnection {
throw new TimeOutException("Timeout waiting for server-side response");
}
activeProcessChain.applyPostHandle(request, response);
log.info("The response from {}:{} was received successfully with the response key {}.", address.getHostName(), address.getPort(), key);
if (debugEnabled) {
log.debug("The response from {}:{} was received successfully with the response key {}.", address.getHostName(), address.getPort(), key);
}
return response;
} catch (Exception ex) {
activeProcessChain.afterCompletion(request, response, ex);

@ -17,7 +17,6 @@
package cn.hippo4j.rpc.handler;
import cn.hippo4j.rpc.coder.NettyDecoder;
import cn.hippo4j.rpc.coder.NettyEncoder;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
@ -26,6 +25,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
@ -75,7 +75,9 @@ public class NettyClientPoolHandler extends AbstractNettyHandlerManager implemen
@Override
public void channelReleased(Channel ch) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER);
log.info("The connection buffer has been emptied of data");
if (log.isDebugEnabled()) {
log.debug("The connection buffer has been emptied of data");
}
}
@Override
@ -90,7 +92,7 @@ public class NettyClientPoolHandler extends AbstractNettyHandlerManager implemen
.setTcpNoDelay(false);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new NettyEncoder());
pipeline.addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
this.handlerEntities.stream()
.sorted()
.forEach(h -> {

@ -19,13 +19,13 @@ package cn.hippo4j.rpc.handler;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.rpc.process.ActivePostProcess;
import cn.hippo4j.rpc.process.ActiveProcessChain;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.DefaultResponse;
import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.model.DefaultResponse;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.process.ActivePostProcess;
import cn.hippo4j.rpc.process.ActiveProcessChain;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;

@ -94,7 +94,9 @@ public final class ActiveProcessChain {
try {
handle.afterCompletion(request, response, ex);
} catch (Throwable e) {
log.error("HandlerInterceptor.afterCompletion threw exception", e);
if (log.isErrorEnabled()) {
log.error("HandlerInterceptor.afterCompletion threw exception", e);
}
}
}
}

@ -18,7 +18,6 @@
package cn.hippo4j.rpc.server;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.rpc.coder.NettyDecoder;
import cn.hippo4j.rpc.coder.NettyEncoder;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.exception.ConnectionException;
@ -29,6 +28,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
@ -80,7 +80,7 @@ public class NettyServerConnection extends AbstractNettyHandlerManager implement
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new NettyEncoder());
pipeline.addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
handlerEntities.stream()
.sorted()
.forEach(h -> {
@ -95,7 +95,9 @@ public class NettyServerConnection extends AbstractNettyHandlerManager implement
try {
this.future = server.bind(port.getPort()).sync();
this.channel = this.future.channel();
log.info("The server is started and can receive requests. The listening port is {}", port.getPort());
if (log.isDebugEnabled()) {
log.debug("The server is started and can receive requests. The listening port is {}", port.getPort());
}
this.port = port;
this.future.channel().closeFuture().sync();
} catch (InterruptedException ex) {
@ -112,7 +114,9 @@ public class NettyServerConnection extends AbstractNettyHandlerManager implement
leader.shutdownGracefully();
worker.shutdownGracefully();
this.future.channel().close();
log.info("The server is shut down and no more requests are received. The release port is {}", port.getPort());
if (log.isDebugEnabled()) {
log.debug("The server is shut down and no more requests are received. The release port is {}", port.getPort());
}
}
@Override

@ -58,7 +58,9 @@ public class NettyConnectPool {
this.handler = handler;
this.pool = new FixedChannelPool(bootstrap, handler, healthCheck, acquireTimeoutAction,
timeout, maxConnect, maxPendingAcquires, true, true);
log.info("The connection pool is established with the connection target {}:{}", address.getHostName(), address.getPort());
if (log.isDebugEnabled()) {
log.info("The connection pool is established with the connection target {}:{}", address.getHostName(), address.getPort());
}
NettyConnectPoolHolder.createPool(address, this);
}

@ -19,6 +19,11 @@
<artifactId>hippo4j-common</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-rpc</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>

@ -17,19 +17,24 @@
package cn.hippo4j.config.service;
import cn.hippo4j.adapter.base.ThreadPoolAdapterApi;
import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig;
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.constant.ConfigModifyTypeConstants;
import cn.hippo4j.common.design.observer.AbstractSubjectCenter;
import cn.hippo4j.common.design.observer.Observer;
import cn.hippo4j.common.design.observer.ObserverMessage;
import cn.hippo4j.common.toolkit.BeanUtil;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.common.toolkit.http.HttpUtil;
import cn.hippo4j.common.toolkit.UserContext;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterReqDTO;
import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterRespDTO;
import com.fasterxml.jackson.core.type.TypeReference;
import cn.hippo4j.config.model.biz.threadpool.ConfigModifySaveReqDTO;
import cn.hippo4j.config.verify.ConfigModificationVerifyServiceChoose;
import cn.hippo4j.rpc.support.NettyProxyCenter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@ -44,6 +49,7 @@ import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL;
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class ThreadPoolAdapterService {
/**
@ -51,6 +57,10 @@ public class ThreadPoolAdapterService {
*/
private static final Map<String, Map<String, Map<String, List<ThreadPoolAdapterState>>>> THREAD_POOL_ADAPTER_MAP = new ConcurrentHashMap<>();
private static final Class<ThreadPoolAdapterApi> CLASS = ThreadPoolAdapterApi.class;
private final ConfigModificationVerifyServiceChoose configModificationVerifyServiceChoose;
static {
AbstractSubjectCenter.register(AbstractSubjectCenter.SubjectType.CLEAR_CONFIG_CACHE, new ClearThreadPoolAdapterCache());
}
@ -76,12 +86,16 @@ public class ThreadPoolAdapterService {
adapterStateList = new ArrayList<>();
tenantItemMap.put(adapterState.getThreadPoolKey(), adapterStateList);
}
Optional<ThreadPoolAdapterState> first = adapterStateList.stream().filter(state -> Objects.equals(state.getClientAddress(), each.getClientAddress())).findFirst();
String clientAddress = each.getClientAddress();
String localServerAddress = each.getLocalServerAddress();
Optional<ThreadPoolAdapterState> first = adapterStateList.stream().filter(state -> Objects.equals(state.getClientAddress(), clientAddress)).findFirst();
if (!first.isPresent()) {
ThreadPoolAdapterState state = new ThreadPoolAdapterState();
state.setClientAddress(each.getClientAddress());
state.setClientAddress(clientAddress);
state.setIdentify(each.getClientIdentify());
state.setLocalServerAddress(localServerAddress);
adapterStateList.add(state);
NettyProxyCenter.getProxy(CLASS, localServerAddress);
}
}
}
@ -93,20 +107,16 @@ public class ThreadPoolAdapterService {
.map(each -> each.get(requestParameter.getTenant() + IDENTIFY_SLICER_SYMBOL + requestParameter.getItem()))
.map(each -> each.get(requestParameter.getThreadPoolKey()))
.orElse(new ArrayList<>());
List<String> addressList = actual.stream().map(ThreadPoolAdapterState::getClientAddress).collect(Collectors.toList());
List<String> addressList = actual.stream().map(ThreadPoolAdapterState::getLocalServerAddress).collect(Collectors.toList());
List<ThreadPoolAdapterRespDTO> result = new ArrayList<>(addressList.size());
addressList.forEach(each -> {
String url = StringUtil.newBuilder("http://", each, "/adapter/thread-pool/info");
Map<String, String> param = new HashMap<>();
param.put("mark", requestParameter.getMark());
param.put("threadPoolKey", requestParameter.getThreadPoolKey());
try {
String resultStr = HttpUtil.get(url, param);
if (StringUtil.isNotBlank(resultStr)) {
Result<ThreadPoolAdapterRespDTO> restResult = JSONUtil.parseObject(resultStr, new TypeReference<Result<ThreadPoolAdapterRespDTO>>() {
});
result.add(restResult.getData());
}
ThreadPoolAdapterApi adapterApi = NettyProxyCenter.getProxy(CLASS, each);
ThreadPoolAdapterParameter parameter = new ThreadPoolAdapterParameter();
parameter.setThreadPoolKey(requestParameter.getThreadPoolKey());
parameter.setMark(requestParameter.getMark());
Result<ThreadPoolAdapterState> adapterThreadPool = adapterApi.getAdapterThreadPool(parameter);
result.add(BeanUtil.convert(adapterThreadPool.getData(), ThreadPoolAdapterRespDTO.class));
} catch (Throwable ex) {
log.error("Failed to get third-party thread pool data.", ex);
}
@ -126,10 +136,37 @@ public class ThreadPoolAdapterService {
return new HashSet<>();
}
public void updateThreadPool(ThreadPoolAdapterReqDTO requestParameter) {
if (UserContext.getUserRole().equals("ROLE_ADMIN")) {
List<ThreadPoolAdapterState> actual = Optional.ofNullable(THREAD_POOL_ADAPTER_MAP.get(requestParameter.getMark()))
.map(each -> each.get(requestParameter.getTenant() + IDENTIFY_SLICER_SYMBOL + requestParameter.getItem()))
.map(each -> each.get(requestParameter.getThreadPoolKey()))
.orElse(new ArrayList<>());
actual.forEach(t -> {
String localServerAddress = t.getLocalServerAddress();
ThreadPoolAdapterApi adapterApi = NettyProxyCenter.getProxy(CLASS, localServerAddress);
ThreadPoolAdapterParameter parameter = BeanUtil.convert(requestParameter, ThreadPoolAdapterParameter.class);
adapterApi.updateAdapterThreadPool(parameter);
});
} else {
ConfigModifySaveReqDTO modifySaveReqDTO = BeanUtil.convert(requestParameter, ConfigModifySaveReqDTO.class);
modifySaveReqDTO.setModifyUser(UserContext.getUserName());
modifySaveReqDTO.setTenantId(requestParameter.getTenant());
modifySaveReqDTO.setItemId(requestParameter.getItem());
modifySaveReqDTO.setTpId(requestParameter.getThreadPoolKey());
modifySaveReqDTO.setType(ConfigModifyTypeConstants.ADAPTER_THREAD_POOL);
configModificationVerifyServiceChoose.choose(modifySaveReqDTO.getType()).saveConfigModifyApplication(modifySaveReqDTO);
}
}
public static void remove(String identify) {
synchronized (ThreadPoolAdapterService.class) {
THREAD_POOL_ADAPTER_MAP.values()
.forEach(each -> each.forEach((key, val) -> val.forEach((threadPoolKey, states) -> states.removeIf(adapterState -> Objects.equals(adapterState.getIdentify(), identify)))));
THREAD_POOL_ADAPTER_MAP.values().forEach(each -> each.forEach((key, val) -> val.forEach((threadPoolKey, states) -> {
states.stream()
.filter(s -> Objects.equals(s.getIdentify(), identify))
.forEach(t -> NettyProxyCenter.removeProxy(CLASS, t.getLocalServerAddress()));
states.removeIf(s -> Objects.equals(s.getIdentify(), identify));
})));
}
}

@ -17,6 +17,8 @@
package cn.hippo4j.config.service.biz.impl;
import cn.hippo4j.adapter.base.ThreadPoolAdapterApi;
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.common.enums.VerifyEnum;
import cn.hippo4j.common.model.InstanceInfo;
import cn.hippo4j.common.toolkit.BeanUtil;
@ -30,6 +32,7 @@ import cn.hippo4j.config.model.biz.threadpool.ConfigModifyVerifyReqDTO;
import cn.hippo4j.config.service.biz.ConfigModificationVerifyService;
import cn.hippo4j.discovery.core.BaseInstanceRegistry;
import cn.hippo4j.discovery.core.Lease;
import cn.hippo4j.rpc.support.NettyProxyCenter;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import javax.annotation.Resource;
@ -45,6 +48,8 @@ public abstract class AbstractConfigModificationVerifyService implements ConfigM
@Resource
protected HisConfigVerifyMapper hisConfigVerifyMapper;
private static final Class<ThreadPoolAdapterApi> CLASS = ThreadPoolAdapterApi.class;
@Resource
private BaseInstanceRegistry baseInstanceRegistry;
@ -100,11 +105,11 @@ public abstract class AbstractConfigModificationVerifyService implements ConfigM
List<Lease<InstanceInfo>> leases = baseInstanceRegistry.listInstance(reqDTO.getItemId());
ConditionUtil
.condition(reqDTO.getModifyAll(),
() -> leases.forEach(lease -> clientAddressList.add(lease.getHolder().getCallBackUrl())),
() -> leases.forEach(lease -> clientAddressList.add(lease.getHolder().getServerUrl())),
() -> clientAddressList.add(
leases.stream()
.filter(lease -> lease.getHolder().getIdentify().equals(reqDTO.getIdentify())).findAny().orElseThrow(() -> new RuntimeException("该线程池实例不存在")).getHolder()
.getCallBackUrl()));
.getServerUrl()));
return clientAddressList;
}
@ -113,5 +118,15 @@ public abstract class AbstractConfigModificationVerifyService implements ConfigM
*
* @param reqDTO
*/
protected abstract void updateThreadPoolParameter(ConfigModifyVerifyReqDTO reqDTO);
protected void updateThreadPoolParameter(ConfigModifyVerifyReqDTO reqDTO) {
for (String each : getClientAddress(reqDTO)) {
ThreadPoolAdapterApi poolAdapterApi = NettyProxyCenter.getProxy(CLASS, each);
ThreadPoolAdapterParameter parameter = new ThreadPoolAdapterParameter();
parameter.setMark(reqDTO.getMark());
parameter.setMaximumPoolSize(reqDTO.getMaximumPoolSize());
parameter.setCorePoolSize(reqDTO.getCorePoolSize());
parameter.setThreadPoolKey(reqDTO.getThreadPoolKey());
poolAdapterApi.updateAdapterThreadPool(parameter);
}
}
}

@ -18,9 +18,6 @@
package cn.hippo4j.config.service.biz.impl;
import cn.hippo4j.common.constant.ConfigModifyTypeConstants;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.common.toolkit.http.HttpUtil;
import cn.hippo4j.config.model.biz.threadpool.ConfigModifyVerifyReqDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@ -36,11 +33,4 @@ public class AdapterThreadPoolConfigModificationVerifyServiceImpl extends Abstra
return ConfigModifyTypeConstants.ADAPTER_THREAD_POOL;
}
@Override
protected void updateThreadPoolParameter(ConfigModifyVerifyReqDTO reqDTO) {
for (String each : getClientAddress(reqDTO)) {
String urlString = StringUtil.newBuilder("http://", each, "/adapter/thread-pool/update");
HttpUtil.post(urlString, reqDTO);
}
}
}

@ -18,9 +18,6 @@
package cn.hippo4j.config.service.biz.impl;
import cn.hippo4j.common.constant.ConfigModifyTypeConstants;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.common.toolkit.http.HttpUtil;
import cn.hippo4j.config.model.biz.threadpool.ConfigModifyVerifyReqDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@ -36,11 +33,4 @@ public class WebThreadPoolConfigModificationVerifyServiceImpl extends AbstractCo
return ConfigModifyTypeConstants.WEB_THREAD_POOL;
}
@Override
protected void updateThreadPoolParameter(ConfigModifyVerifyReqDTO reqDTO) {
for (String each : getClientAddress(reqDTO)) {
String urlString = StringUtil.newBuilder("http://", each, "/web/update/pool");
HttpUtil.post(urlString, reqDTO);
}
}
}

@ -17,18 +17,11 @@
package cn.hippo4j.console.controller;
import cn.hippo4j.common.constant.ConfigModifyTypeConstants;
import cn.hippo4j.common.toolkit.BeanUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.common.toolkit.UserContext;
import cn.hippo4j.common.toolkit.http.HttpUtil;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterReqDTO;
import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterRespDTO;
import cn.hippo4j.config.model.biz.threadpool.ConfigModifySaveReqDTO;
import cn.hippo4j.config.service.ThreadPoolAdapterService;
import cn.hippo4j.config.verify.ConfigModificationVerifyServiceChoose;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
@ -49,8 +42,6 @@ public class ThreadPoolAdapterController {
private final ThreadPoolAdapterService threadPoolAdapterService;
private final ConfigModificationVerifyServiceChoose configModificationVerifyServiceChoose;
@GetMapping(REGISTER_ADAPTER_BASE_PATH + "/query")
public Result<List<ThreadPoolAdapterRespDTO>> queryAdapterThreadPool(ThreadPoolAdapterReqDTO requestParameter) {
List<ThreadPoolAdapterRespDTO> result = threadPoolAdapterService.query(requestParameter);
@ -65,20 +56,7 @@ public class ThreadPoolAdapterController {
@PostMapping(REGISTER_ADAPTER_BASE_PATH + "/update")
public Result<Void> updateAdapterThreadPool(@RequestBody ThreadPoolAdapterReqDTO requestParameter) {
if (UserContext.getUserRole().equals("ROLE_ADMIN")) {
for (String each : requestParameter.getClientAddressList()) {
String urlString = StringUtil.newBuilder("http://", each, "/adapter/thread-pool/update");
HttpUtil.post(urlString, requestParameter);
}
} else {
ConfigModifySaveReqDTO modifySaveReqDTO = BeanUtil.convert(requestParameter, ConfigModifySaveReqDTO.class);
modifySaveReqDTO.setModifyUser(UserContext.getUserName());
modifySaveReqDTO.setTenantId(requestParameter.getTenant());
modifySaveReqDTO.setItemId(requestParameter.getItem());
modifySaveReqDTO.setTpId(requestParameter.getThreadPoolKey());
modifySaveReqDTO.setType(ConfigModifyTypeConstants.ADAPTER_THREAD_POOL);
configModificationVerifyServiceChoose.choose(modifySaveReqDTO.getType()).saveConfigModifyApplication(modifySaveReqDTO);
}
threadPoolAdapterService.updateThreadPool(requestParameter);
return Results.success();
}

@ -17,23 +17,21 @@
package cn.hippo4j.console.controller;
import cn.hippo4j.common.api.WebThreadPoolApi;
import cn.hippo4j.common.api.WebThreadPoolRunStateApi;
import cn.hippo4j.common.constant.ConfigModifyTypeConstants;
import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.model.InstanceInfo;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.toolkit.BeanUtil;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.common.toolkit.UserContext;
import cn.hippo4j.common.toolkit.http.HttpUtil;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import cn.hippo4j.common.web.exception.ErrorCodeEnum;
import cn.hippo4j.config.model.CacheItem;
import cn.hippo4j.config.model.biz.threadpool.ConfigModifySaveReqDTO;
import cn.hippo4j.config.model.biz.threadpool.ThreadPoolDelReqDTO;
import cn.hippo4j.config.model.biz.threadpool.ThreadPoolQueryReqDTO;
import cn.hippo4j.config.model.biz.threadpool.ThreadPoolRespDTO;
import cn.hippo4j.config.model.biz.threadpool.ThreadPoolSaveOrUpdateReqDTO;
import cn.hippo4j.config.model.biz.threadpool.*;
import cn.hippo4j.config.service.ConfigCacheService;
import cn.hippo4j.config.service.biz.ThreadPoolService;
import cn.hippo4j.config.verify.ConfigModificationVerifyServiceChoose;
@ -42,17 +40,11 @@ import cn.hippo4j.console.model.WebThreadPoolReqDTO;
import cn.hippo4j.console.model.WebThreadPoolRespDTO;
import cn.hippo4j.discovery.core.BaseInstanceRegistry;
import cn.hippo4j.discovery.core.Lease;
import cn.hippo4j.rpc.support.NettyProxyCenter;
import com.baomidou.mybatisplus.core.metadata.IPage;
import lombok.RequiredArgsConstructor;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.List;
@ -75,7 +67,9 @@ public class ThreadPoolController {
private final ConfigModificationVerifyServiceChoose configModificationVerifyServiceChoose;
private static final String HTTP = "http://";
private static final Class<WebThreadPoolRunStateApi> STATE_API_CLASS = WebThreadPoolRunStateApi.class;
private static final Class<WebThreadPoolApi> POOL_API_CLASS = WebThreadPoolApi.class;
@PostMapping("/query/page")
public Result<IPage<ThreadPoolRespDTO>> queryNameSpacePage(@RequestBody ThreadPoolQueryReqDTO reqDTO) {
@ -122,15 +116,15 @@ public class ThreadPoolController {
@GetMapping("/run/state/{tpId}")
public Result runState(@PathVariable("tpId") String tpId,
@RequestParam(value = "clientAddress") String clientAddress) {
String urlString = StringUtil.newBuilder(HTTP, clientAddress, "/run/state/", tpId);
return HttpUtil.get(urlString, Result.class);
WebThreadPoolRunStateApi threadPoolRunStateApi = NettyProxyCenter.getProxy(STATE_API_CLASS, clientAddress);
return threadPoolRunStateApi.getPoolRunState(tpId);
}
@GetMapping("/run/thread/state/{tpId}")
public Result runThreadState(@PathVariable("tpId") String tpId,
@RequestParam(value = "clientAddress") String clientAddress) {
String urlString = StringUtil.newBuilder(HTTP, clientAddress, "/run/thread/state/", tpId);
return HttpUtil.get(urlString, Result.class);
WebThreadPoolRunStateApi threadPoolRunStateApi = NettyProxyCenter.getProxy(STATE_API_CLASS, clientAddress);
return threadPoolRunStateApi.getThreadStateDetail(tpId);
}
@GetMapping("/list/client/instance/{itemId}")
@ -145,7 +139,7 @@ public class ThreadPoolController {
for (Lease<InstanceInfo> each : leases) {
Result poolBaseState;
try {
poolBaseState = getPoolBaseState(mark, each.getHolder().getCallBackUrl());
poolBaseState = getPoolBaseState(mark, each.getHolder().getServerUrl());
} catch (Throwable ignored) {
continue;
}
@ -158,7 +152,7 @@ public class ThreadPoolController {
result.setTenantId(each.getHolder().getGroupKey().split("[+]")[1]);
result.setActive(each.getHolder().getActive());
result.setIdentify(each.getHolder().getIdentify());
result.setClientAddress(each.getHolder().getCallBackUrl());
result.setClientAddress(each.getHolder().getServerUrl());
returnThreadPool.add(result);
}
return Results.success(returnThreadPool);
@ -167,22 +161,23 @@ public class ThreadPoolController {
@GetMapping("/web/base/info")
public Result getPoolBaseState(@RequestParam(value = "mark") String mark,
@RequestParam(value = "clientAddress") String clientAddress) {
String urlString = StringUtil.newBuilder(HTTP, clientAddress, "/web/base/info", "?mark=", mark);
return HttpUtil.get(urlString, Result.class);
WebThreadPoolApi threadPoolApi = NettyProxyCenter.getProxy(POOL_API_CLASS, clientAddress);
return threadPoolApi.getPoolBaseState(mark);
}
@GetMapping("/web/run/state")
public Result getPoolRunState(@RequestParam(value = "clientAddress") String clientAddress) {
String urlString = StringUtil.newBuilder(HTTP, clientAddress, "/web/run/state");
return HttpUtil.get(urlString, Result.class);
WebThreadPoolApi threadPoolApi = NettyProxyCenter.getProxy(POOL_API_CLASS, clientAddress);
return threadPoolApi.getPoolRunState();
}
@PostMapping("/web/update/pool")
public Result<Void> updateWebThreadPool(@RequestBody WebThreadPoolReqDTO requestParam) {
if (UserContext.getUserRole().equals("ROLE_ADMIN")) {
for (String each : requestParam.getClientAddressList()) {
String urlString = StringUtil.newBuilder(HTTP, each, "/web/update/pool");
HttpUtil.post(urlString, requestParam);
WebThreadPoolApi threadPoolApi = NettyProxyCenter.getProxy(POOL_API_CLASS, each);
ThreadPoolParameterInfo parameterInfo = BeanUtil.convert(requestParam, ThreadPoolParameterInfo.class);
threadPoolApi.updateWebThreadPool(parameterInfo);
}
} else {
ConfigModifySaveReqDTO modifySaveReqDTO = BeanUtil.convert(requestParam, ConfigModifySaveReqDTO.class);
@ -206,9 +201,9 @@ public class ThreadPoolController {
String groupKey = getGroupKey(tpId, itemTenantKey);
Map<String, CacheItem> content = ConfigCacheService.getContent(groupKey);
Map<String, String> activeMap =
leases.stream().map(each -> each.getHolder()).filter(each -> StringUtil.isNotBlank(each.getActive()))
leases.stream().map(Lease::getHolder).filter(each -> StringUtil.isNotBlank(each.getActive()))
.collect(Collectors.toMap(InstanceInfo::getIdentify, InstanceInfo::getActive));
Map<String, String> clientBasePathMap = leases.stream().map(each -> each.getHolder())
Map<String, String> clientBasePathMap = leases.stream().map(Lease::getHolder)
.filter(each -> StringUtil.isNotBlank(each.getClientBasePath()))
.collect(Collectors.toMap(InstanceInfo::getIdentify, InstanceInfo::getClientBasePath));
List<ThreadPoolInstanceInfo> returnThreadPool = new ArrayList<>();

@ -33,6 +33,11 @@
<artifactId>hippo4j-message</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-rpc</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>

@ -54,6 +54,11 @@ public class BootstrapProperties implements BootstrapPropertiesInterface {
*/
private String nettyServerPort;
/**
* The service port that is open to the outside world, The default value is 16691
*/
private Integer localServerPort;
/**
* Report type
*/

@ -82,7 +82,8 @@ import org.springframework.core.env.ConfigurableEnvironment;
@ConditionalOnBean(MarkerConfiguration.Marker.class)
@EnableConfigurationProperties(BootstrapProperties.class)
@ConditionalOnProperty(prefix = BootstrapProperties.PREFIX, value = "enable", matchIfMissing = true, havingValue = "true")
@ImportAutoConfiguration({WebAdapterConfiguration.class, NettyClientConfiguration.class, DiscoveryConfiguration.class, MessageConfiguration.class, UtilAutoConfiguration.class})
@ImportAutoConfiguration({WebAdapterConfiguration.class, NettyClientConfiguration.class, DiscoveryConfiguration.class, MessageConfiguration.class, UtilAutoConfiguration.class,
NettyServerConfiguration.class})
public class DynamicThreadPoolAutoConfiguration {
private final BootstrapProperties properties;

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.config;
import cn.hippo4j.adapter.base.ThreadPoolAdapterApi;
import cn.hippo4j.common.api.WebThreadPoolApi;
import cn.hippo4j.common.api.WebThreadPoolRunStateApi;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.discovery.SpringContextInstance;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.Server;
import cn.hippo4j.rpc.support.NettyServerSupport;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.EnvironmentAware;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
@Component
public class NettyServerConfiguration implements ApplicationRunner, EnvironmentAware {
Environment environment;
private static final Class<?>[] classes = {
WebThreadPoolApi.class,
ThreadPoolAdapterApi.class,
WebThreadPoolRunStateApi.class
};
@Override
public void run(ApplicationArguments args) throws Exception {
Integer port = environment.getProperty("spring.dynamic.thread-pool.local-server-port", Integer.class);
ServerPort serverPort = () -> port == null ? 16691 : port;
NettyServerTakeHandler handler = new NettyServerTakeHandler(new SpringContextInstance());
try (
NettyServerConnection connection = new NettyServerConnection(handler);
Server server = new NettyServerSupport(serverPort, connection, classes)) {
server.bind();
}
}
@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}
}

@ -18,6 +18,7 @@
package cn.hippo4j.springboot.starter.controller;
import cn.hippo4j.adapter.base.ThreadPoolAdapter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterApi;
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.api.ClientNetworkService;
@ -31,10 +32,7 @@ import cn.hippo4j.springboot.starter.toolkit.CloudCommonIdUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.Optional;
@ -44,15 +42,16 @@ import static cn.hippo4j.adapter.base.ThreadPoolAdapterBeanContainer.THREAD_POOL
* Thread-pool adapter controller.
*/
@Slf4j
@RestController
// @RestController
@AllArgsConstructor
public class ThreadPoolAdapterController {
public class ThreadPoolAdapterController implements ThreadPoolAdapterApi {
private final ConfigurableEnvironment environment;
private final InetUtils hippo4JInetUtils;
@GetMapping("/adapter/thread-pool/info")
@Override
// @GetMapping("/adapter/thread-pool/info")
public Result<ThreadPoolAdapterState> getAdapterThreadPool(ThreadPoolAdapterParameter requestParameter) {
ThreadPoolAdapter threadPoolAdapter = THREAD_POOL_ADAPTER_BEAN_CONTAINER.get(requestParameter.getMark());
ThreadPoolAdapterState result = Optional.ofNullable(threadPoolAdapter).map(each -> {
@ -74,7 +73,8 @@ public class ThreadPoolAdapterController {
return Results.success(result);
}
@PostMapping("/adapter/thread-pool/update")
@Override
// @PostMapping("/adapter/thread-pool/update")
public Result<Void> updateAdapterThreadPool(@RequestBody ThreadPoolAdapterParameter requestParameter) {
log.info("[{}] Change third-party thread pool data. key: {}, coreSize: {}, maximumSize: {}",
requestParameter.getMark(), requestParameter.getThreadPoolKey(), requestParameter.getCorePoolSize(), requestParameter.getMaximumPoolSize());

@ -19,6 +19,7 @@ package cn.hippo4j.springboot.starter.controller;
import cn.hippo4j.adapter.web.WebThreadPoolHandlerChoose;
import cn.hippo4j.adapter.web.WebThreadPoolService;
import cn.hippo4j.common.api.WebThreadPoolApi;
import cn.hippo4j.common.model.ThreadPoolBaseInfo;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
@ -33,13 +34,14 @@ import org.springframework.web.bind.annotation.*;
* <p> At present, only Tomcat is well supported, and other web containers need to be improved.
*/
@CrossOrigin
@RestController
// @RestController
@AllArgsConstructor
public class WebThreadPoolController {
public class WebThreadPoolController implements WebThreadPoolApi {
private final WebThreadPoolHandlerChoose webThreadPoolServiceChoose;
@GetMapping("/web/base/info")
@Override
// @GetMapping("/web/base/info")
public Result<ThreadPoolBaseInfo> getPoolBaseState(@RequestParam(value = "mark") String mark) {
WebThreadPoolService webThreadPoolService = webThreadPoolServiceChoose.choose();
if (webThreadPoolService != null && webThreadPoolService.getClass().getSimpleName().contains(mark)) {
@ -48,13 +50,15 @@ public class WebThreadPoolController {
return Results.success(null);
}
@GetMapping("/web/run/state")
@Override
// @GetMapping("/web/run/state")
public Result<ThreadPoolRunStateInfo> getPoolRunState() {
ThreadPoolRunStateInfo result = webThreadPoolServiceChoose.choose().getWebRunStateInfo();
return Results.success(result);
}
@PostMapping("/web/update/pool")
@Override
// @PostMapping("/web/update/pool")
public Result<Void> updateWebThreadPool(@RequestBody ThreadPoolParameterInfo threadPoolParameterInfo) {
webThreadPoolServiceChoose.choose().updateWebThreadPool(threadPoolParameterInfo);
return Results.success();

@ -18,16 +18,15 @@
package cn.hippo4j.springboot.starter.controller;
import cn.hippo4j.common.api.ThreadDetailState;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import cn.hippo4j.common.api.WebThreadPoolRunStateApi;
import cn.hippo4j.common.model.ThreadDetailStateInfo;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import lombok.AllArgsConstructor;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@ -35,21 +34,23 @@ import java.util.List;
* Web thread-pool run state controller.
*/
@CrossOrigin
@RestController
// @RestController
@AllArgsConstructor
public class WebThreadPoolRunStateController {
public class WebThreadPoolRunStateController implements WebThreadPoolRunStateApi {
private final ThreadPoolRunStateHandler threadPoolRunStateHandler;
private final ThreadDetailState threadDetailState;
@GetMapping("/run/state/{threadPoolId}")
@Override
// @GetMapping("/run/state/{threadPoolId}")
public Result<ThreadPoolRunStateInfo> getPoolRunState(@PathVariable("threadPoolId") String threadPoolId) {
ThreadPoolRunStateInfo result = threadPoolRunStateHandler.getPoolRunState(threadPoolId);
return Results.success(result);
}
@GetMapping("/run/thread/state/{threadPoolId}")
@Override
// @GetMapping("/run/thread/state/{threadPoolId}")
public Result<List<ThreadDetailStateInfo>> getThreadStateDetail(@PathVariable("threadPoolId") String threadPoolId) {
List<ThreadDetailStateInfo> result = threadDetailState.getThreadDetailStateInfo(threadPoolId);
return Results.success(result);

@ -79,6 +79,8 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner, ThreadPoolA
String clientAddress = CloudCommonIdUtil.getClientIpPort(environment, hippo4JInetUtils);
cacheConfig.setClientAddress(clientAddress);
cacheConfig.setThreadPoolAdapterStates(threadPoolStates);
String localServerAddress = CloudCommonIdUtil.getLocalServerIpPort(environment, hippo4JInetUtils);
cacheConfig.setLocalServerAddress(localServerAddress);
adapterCacheConfigList.add(cacheConfig);
}
return adapterCacheConfigList;

@ -21,6 +21,7 @@ import cn.hippo4j.common.api.ClientNetworkService;
import cn.hippo4j.common.model.InstanceInfo;
import cn.hippo4j.common.spi.DynamicThreadPoolServiceLoader;
import cn.hippo4j.common.toolkit.ContentUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import cn.hippo4j.springboot.starter.config.BootstrapProperties;
@ -58,12 +59,12 @@ public final class InstanceInfoProviderFactory {
String namespace = bootstrapProperties.getNamespace();
String itemId = bootstrapProperties.getItemId();
String port = environment.getProperty("server.port", "8080");
String serverPort = environment.getProperty("spring.dynamic.thread-pool.local-server-port", "16691");
String applicationName = environment.getProperty("spring.dynamic.thread-pool.item-id");
String active = environment.getProperty("spring.profiles.active", "UNKNOWN");
InstanceInfo instanceInfo = new InstanceInfo();
String instanceId = CloudCommonIdUtil.getDefaultInstanceId(environment, inetUtils);
instanceId = new StringBuilder()
.append(instanceId).append(IDENTIFY_SLICER_SYMBOL).append(CLIENT_IDENTIFICATION_VALUE).toString();
instanceId = instanceId + IDENTIFY_SLICER_SYMBOL + CLIENT_IDENTIFICATION_VALUE;
String contextPath = environment.getProperty("server.servlet.context-path", "");
instanceInfo.setInstanceId(instanceId)
.setIpApplicationName(CloudCommonIdUtil.getIpApplicationName(environment, inetUtils))
@ -71,10 +72,19 @@ public final class InstanceInfoProviderFactory {
.setPort(port).setClientBasePath(contextPath).setGroupKey(ContentUtil.getGroupKey(itemId, namespace));
String[] customerNetwork = DynamicThreadPoolServiceLoader.getSingletonServiceInstances(ClientNetworkService.class)
.stream().findFirst().map(each -> each.getNetworkIpPort(environment)).orElse(null);
String callBackUrl = new StringBuilder().append(Optional.ofNullable(customerNetwork).map(each -> each[0]).orElse(instanceInfo.getHostName())).append(":")
.append(Optional.ofNullable(customerNetwork).map(each -> each[1]).orElse(port)).append(instanceInfo.getClientBasePath())
.toString();
String callBackUrl = StringUtil.newBuilder(
Optional.ofNullable(customerNetwork).map(each -> each[0]).orElse(instanceInfo.getHostName()),
":",
Optional.ofNullable(customerNetwork).map(each -> each[1]).orElse(port),
instanceInfo.getClientBasePath());
String serverUrl = StringUtil.newBuilder(
Optional.ofNullable(customerNetwork).map(each -> each[0]).orElse(instanceInfo.getHostName()),
":",
serverPort,
instanceInfo.getClientBasePath());
instanceInfo.setCallBackUrl(callBackUrl);
instanceInfo.setServerUrl(serverUrl);
String identify = IdentifyUtil.generate(environment, inetUtils);
instanceInfo.setIdentify(identify);
instanceInfo.setActive(active.toUpperCase());

@ -44,6 +44,19 @@ public class CloudCommonIdUtil {
return combineParts(hostname, SEPARATOR, port);
}
/**
* Get local server ip port.
*
* @param resolver resolver
* @param inetUtils inet utils
* @return ip and port
*/
public static String getLocalServerIpPort(PropertyResolver resolver, InetUtils inetUtils) {
String hostname = inetUtils.findFirstNonLoopBackHostInfo().getIpAddress();
String port = resolver.getProperty("spring.dynamic.thread-pool.local-server-port", "16691");
return combineParts(hostname, SEPARATOR, port);
}
/**
* Get default instance id.
*

@ -12,6 +12,12 @@
"defaultValue": "8899",
"description": "dynamic thread-pool server netty port."
},
{
"name": "spring.dynamic.thread-pool.local-server-port",
"type": "java.lang.String",
"defaultValue": "19961",
"description": "dynamic thread-pool local server port."
},
{
"name": "spring.dynamic.thread-pool.report-type",
"type": "java.lang.String",

Loading…
Cancel
Save