Merge pull request #477 from mabaiwan/develop

hippo4j-spring-boot-starter code naming and log optimization
pull/483/head
小马哥 2 years ago committed by GitHub
commit f4a3f3a966
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -17,7 +17,10 @@
package cn.hippo4j.common.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import java.util.List;
@ -26,6 +29,9 @@ import java.util.List;
* Thread detail state info.
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
public class ThreadDetailStateInfo {

@ -0,0 +1,44 @@
package cn.hippo4j.common.toolkit;
import cn.hutool.core.util.StrUtil;
import java.util.HashSet;
import java.util.Set;
/**
* Boolean util.
*/
public class BooleanUtil {
private static final Set<String> TREE_SET = new HashSet(3);
static {
TREE_SET.add("true");
TREE_SET.add("yes");
TREE_SET.add("1");
}
/**
* To boolean.
*
* @param valueStr
* @return
*/
public static boolean toBoolean(String valueStr) {
if (StrUtil.isNotBlank(valueStr)) {
valueStr = valueStr.trim().toLowerCase();
return TREE_SET.contains(valueStr);
}
return false;
}
/**
* Is true.
*
* @param bool
* @return
*/
public static boolean isTrue(Boolean bool) {
return Boolean.TRUE.equals(bool);
}
}

@ -29,7 +29,7 @@ import cn.hippo4j.config.model.biz.notify.NotifyRespDTO;
import cn.hippo4j.config.service.biz.NotifyService;
import cn.hippo4j.config.toolkit.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hippo4j.common.toolkit.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;

@ -103,9 +103,4 @@ public class BootstrapProperties implements BootstrapPropertiesInterface {
* Time interval for client to collect monitoring data. unit: ms
*/
private Long collectInterval = 5000L;
/**
* JSON serialization type.
*/
private String jsonSerializeType = "JACKSON";
}

@ -43,23 +43,21 @@ public class DiscoveryConfiguration {
private final ConfigurableEnvironment environment;
private final BootstrapProperties properties;
private final BootstrapProperties bootstrapProperties;
private final InetUtils hippo4JInetUtils;
@Bean
@SneakyThrows
public InstanceInfo instanceConfig() {
String namespace = properties.getNamespace();
String itemId = properties.getItemId();
String namespace = bootstrapProperties.getNamespace();
String itemId = bootstrapProperties.getItemId();
String port = environment.getProperty("server.port", "8080");
String applicationName = environment.getProperty("spring.dynamic.thread-pool.item-id");
String active = environment.getProperty("spring.profiles.active", "UNKNOWN");
InstanceInfo instanceInfo = new InstanceInfo();
String instanceId = CloudCommonIdUtil.getDefaultInstanceId(environment, hippo4JInetUtils);
instanceId = StrBuilder.create().append(instanceId).append(IDENTIFY_SLICER_SYMBOL).append(CLIENT_IDENTIFICATION_VALUE).toString();
String contextPath = environment.getProperty("server.servlet.context-path", "");
instanceInfo.setInstanceId(instanceId)
.setIpApplicationName(CloudCommonIdUtil.getIpApplicationName(environment, hippo4JInetUtils))
@ -68,16 +66,13 @@ public class DiscoveryConfiguration {
.setPort(port)
.setClientBasePath(contextPath)
.setGroupKey(ContentUtil.getGroupKey(itemId, namespace));
String callBackUrl = new StringBuilder().append(instanceInfo.getHostName()).append(":")
.append(port).append(instanceInfo.getClientBasePath())
.toString();
instanceInfo.setCallBackUrl(callBackUrl);
String identify = IdentifyUtil.generate(environment, hippo4JInetUtils);
instanceInfo.setIdentify(identify);
instanceInfo.setActive(active.toUpperCase());
return instanceInfo;
}

@ -24,9 +24,9 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
/**
* Netty ClientCon figuration
* Netty client configuration.
*/
@ConditionalOnProperty(prefix = BootstrapProperties.PREFIX, name = "report-type", matchIfMissing = false, havingValue = "netty")
@ConditionalOnProperty(prefix = BootstrapProperties.PREFIX, name = "report-type", havingValue = "netty")
public class NettyClientConfiguration {
@Bean

@ -40,14 +40,14 @@ public class WebThreadPoolController {
@GetMapping("/web/base/info")
public Result<ThreadPoolBaseInfo> getPoolBaseState() {
ThreadPoolBaseInfo poolBaseInfo = webThreadPoolServiceChoose.choose().simpleInfo();
return Results.success(poolBaseInfo);
ThreadPoolBaseInfo result = webThreadPoolServiceChoose.choose().simpleInfo();
return Results.success(result);
}
@GetMapping("/web/run/state")
public Result<ThreadPoolRunStateInfo> getPoolRunState() {
ThreadPoolRunStateInfo poolRunState = webThreadPoolServiceChoose.choose().getWebRunStateInfo();
return Results.success(poolRunState);
ThreadPoolRunStateInfo result = webThreadPoolServiceChoose.choose().getWebRunStateInfo();
return Results.success(result);
}
@PostMapping("/web/update/pool")

@ -45,13 +45,13 @@ public class WebThreadPoolRunStateController {
@GetMapping("/run/state/{threadPoolId}")
public Result<ThreadPoolRunStateInfo> getPoolRunState(@PathVariable("threadPoolId") String threadPoolId) {
ThreadPoolRunStateInfo poolRunState = threadPoolRunStateHandler.getPoolRunState(threadPoolId);
return Results.success(poolRunState);
ThreadPoolRunStateInfo result = threadPoolRunStateHandler.getPoolRunState(threadPoolId);
return Results.success(result);
}
@GetMapping("/run/thread/state/{threadPoolId}")
public Result<List<ThreadDetailStateInfo>> getThreadStateDetail(@PathVariable("threadPoolId") String threadPoolId) {
List<ThreadDetailStateInfo> detailStateInfo = threadDetailState.getThreadDetailStateInfo(threadPoolId);
return Results.success(detailStateInfo);
List<ThreadDetailStateInfo> result = threadDetailState.getThreadDetailStateInfo(threadPoolId);
return Results.success(result);
}
}

@ -44,50 +44,50 @@ public class BaseThreadDetailStateHandler implements ThreadDetailState {
@Override
public List<ThreadDetailStateInfo> getThreadDetailStateInfo(String threadPoolId) {
DynamicThreadPoolWrapper poolWrapper = GlobalThreadPoolManage.getExecutorService(threadPoolId);
ThreadPoolExecutor executor = poolWrapper.getExecutor();
return getThreadDetailStateInfo(executor);
DynamicThreadPoolWrapper dynamicThreadPoolWrapper = GlobalThreadPoolManage.getExecutorService(threadPoolId);
ThreadPoolExecutor threadPoolExecutor = dynamicThreadPoolWrapper.getExecutor();
return getThreadDetailStateInfo(threadPoolExecutor);
}
@Override
public List<ThreadDetailStateInfo> getThreadDetailStateInfo(ThreadPoolExecutor threadPoolExecutor) {
List<ThreadDetailStateInfo> resultThreadState = new ArrayList();
List<ThreadDetailStateInfo> resultThreadStates = new ArrayList();
try {
// TODO: Should the object be copied deeply to avoid the destruction of the worker
HashSet<Object> workers = (HashSet<Object>) ReflectUtil.getFieldValue(threadPoolExecutor, WORKERS);
if (CollectionUtil.isEmpty(workers)) {
return resultThreadState;
return resultThreadStates;
}
for (Object worker : workers) {
Thread thread;
try {
thread = (Thread) ReflectUtil.getFieldValue(worker, THREAD);
if (thread == null) {
log.warn("Reflection get worker thread is null. Worker :: {}", worker);
log.warn("Reflection get worker thread is null. Worker: {}", worker);
continue;
}
} catch (Exception ex) {
log.error("Reflection get worker thread exception. Worker :: {}", worker, ex);
log.error("Reflection get worker thread exception. Worker: {}", worker, ex);
continue;
}
long threadId = thread.getId();
String threadName = thread.getName();
String threadStatus = thread.getState().name();
StackTraceElement[] stackTrace = thread.getStackTrace();
List<String> stacks = new ArrayList(stackTrace.length);
List<String> threadStack = new ArrayList(stackTrace.length);
for (int i = 0; i < stackTrace.length; i++) {
stacks.add(stackTrace[i].toString());
threadStack.add(stackTrace[i].toString());
}
ThreadDetailStateInfo threadState = new ThreadDetailStateInfo();
threadState.setThreadId(threadId)
.setThreadName(threadName)
.setThreadStatus(threadStatus)
.setThreadStack(stacks);
resultThreadState.add(threadState);
ThreadDetailStateInfo threadState = ThreadDetailStateInfo.builder()
.threadId(threadId)
.threadName(threadName)
.threadStatus(threadStatus)
.threadStack(threadStack)
.build();
resultThreadStates.add(threadState);
}
} catch (Exception ex) {
log.error("Failed to get thread status.", ex);
}
return resultThreadState;
return resultThreadStates;
}
}

@ -22,6 +22,8 @@ import cn.hippo4j.springboot.starter.wrapper.ManagerListenerWrapper;
import cn.hippo4j.common.toolkit.ContentUtil;
import cn.hippo4j.common.toolkit.Md5Util;
import cn.hippo4j.common.constant.Constants;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CopyOnWriteArrayList;
@ -32,6 +34,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
@Slf4j
public class CacheData {
@Getter
public volatile String md5;
public volatile String content;
@ -40,21 +43,23 @@ public class CacheData {
public final String itemId;
public final String tpId;
public final String threadPoolId;
@Setter
private int taskId;
@Setter
private volatile boolean isInitializing = true;
private volatile long localConfigLastModified;
private final CopyOnWriteArrayList<ManagerListenerWrapper> listeners;
public CacheData(String tenantId, String itemId, String tpId) {
public CacheData(String tenantId, String itemId, String threadPoolId) {
this.tenantId = tenantId;
this.itemId = itemId;
this.tpId = tpId;
this.content = ContentUtil.getPoolContent(GlobalThreadPoolManage.getPoolParameter(tpId));
this.threadPoolId = threadPoolId;
this.content = ContentUtil.getPoolContent(GlobalThreadPoolManage.getPoolParameter(threadPoolId));
this.md5 = getMd5String(content);
this.listeners = new CopyOnWriteArrayList();
}
@ -65,28 +70,28 @@ public class CacheData {
}
ManagerListenerWrapper managerListenerWrap = new ManagerListenerWrapper(md5, listener);
if (listeners.addIfAbsent(managerListenerWrap)) {
log.info("Add listener status :: ok, thread pool id :: {}, listeners count :: {}", tpId, listeners.size());
log.info("Add listener status: ok, thread pool id: {}, listeners count: {}", threadPoolId, listeners.size());
}
}
public void checkListenerMd5() {
for (ManagerListenerWrapper wrap : listeners) {
if (!md5.equals(wrap.getLastCallMd5())) {
safeNotifyListener(content, md5, wrap);
for (ManagerListenerWrapper managerListenerWrapper : listeners) {
if (!md5.equals(managerListenerWrapper.getLastCallMd5())) {
safeNotifyListener(content, md5, managerListenerWrapper);
}
}
}
private void safeNotifyListener(String content, String md5, ManagerListenerWrapper wrap) {
Listener listener = wrap.getListener();
private void safeNotifyListener(String content, String md5, ManagerListenerWrapper managerListenerWrapper) {
Listener listener = managerListenerWrapper.getListener();
Runnable runnable = () -> {
wrap.setLastCallMd5(md5);
managerListenerWrapper.setLastCallMd5(md5);
listener.receiveConfigInfo(content);
};
try {
listener.getExecutor().execute(runnable);
} catch (Exception ex) {
log.error("Failed to execute listener. message :: {}", ex.getMessage());
log.error("Failed to execute listener. message: {}", ex.getMessage());
}
}
@ -99,19 +104,7 @@ public class CacheData {
return (null == config) ? Constants.NULL : Md5Util.md5Hex(config, Constants.ENCODE);
}
public String getMd5() {
return this.md5;
}
public void setTaskId(Integer taskId) {
this.taskId = taskId;
}
public boolean isInitializing() {
return isInitializing;
}
public void setInitializing(boolean isInitializing) {
this.isInitializing = isInitializing;
}
}

@ -48,7 +48,7 @@ public class ClientWorker {
private final HttpAgent agent;
private final String identification;
private final String identify;
private final ServerHealthCheck serverHealthCheck;
@ -61,26 +61,26 @@ public class ClientWorker {
private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap(16);
@SuppressWarnings("all")
public ClientWorker(HttpAgent httpAgent, String identification, ServerHealthCheck serverHealthCheck) {
public ClientWorker(HttpAgent httpAgent, String identify, ServerHealthCheck serverHealthCheck) {
this.agent = httpAgent;
this.identification = identification;
this.identify = identify;
this.timeout = CONFIG_LONG_POLL_TIMEOUT;
this.serverHealthCheck = serverHealthCheck;
this.executor = Executors.newScheduledThreadPool(1, r -> {
Thread t = new Thread(r);
t.setName("client.worker.executor");
t.setDaemon(true);
return t;
this.executor = Executors.newScheduledThreadPool(1, runnable -> {
Thread thread = new Thread(runnable);
thread.setName("client.worker.executor");
thread.setDaemon(true);
return thread;
});
this.executorService = Executors.newSingleThreadScheduledExecutor(
ThreadFactoryBuilder.builder().prefix("client.long.polling.executor").daemon(true).build());
log.info("Client identity :: {}", identification);
log.info("Client identify: {}", identify);
this.executor.scheduleWithFixedDelay(() -> {
try {
awaitApplicationComplete.await();
checkConfigInfo();
} catch (Throwable e) {
log.error("Sub check rotate check error.", e);
} catch (Throwable ex) {
log.error("Sub check rotate check error.", ex);
}
}, 1L, 1024L, TimeUnit.MILLISECONDS);
}
@ -123,7 +123,7 @@ public class ClientWorker {
}
for (CacheData cacheData : cacheDataList) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.tpId, cacheData.itemId, cacheData.tenantId))) {
.contains(GroupKey.getKeyTenant(cacheData.threadPoolId, cacheData.itemId, cacheData.tenantId))) {
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
@ -136,13 +136,13 @@ public class ClientWorker {
private List<String> checkUpdateDataIds(List<CacheData> cacheDataList, List<String> inInitializingCacheList) {
StringBuilder sb = new StringBuilder();
for (CacheData cacheData : cacheDataList) {
sb.append(cacheData.tpId).append(WORD_SEPARATOR);
sb.append(cacheData.threadPoolId).append(WORD_SEPARATOR);
sb.append(cacheData.itemId).append(WORD_SEPARATOR);
sb.append(cacheData.tenantId).append(WORD_SEPARATOR);
sb.append(identification).append(WORD_SEPARATOR);
sb.append(identify).append(WORD_SEPARATOR);
sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
if (cacheData.isInitializing()) {
inInitializingCacheList.add(GroupKey.getKeyTenant(cacheData.tpId, cacheData.itemId, cacheData.tenantId));
inInitializingCacheList.add(GroupKey.getKeyTenant(cacheData.threadPoolId, cacheData.itemId, cacheData.tenantId));
}
}
boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
@ -156,7 +156,7 @@ public class ClientWorker {
Map<String, String> headers = new HashMap(2);
headers.put(LONG_PULLING_TIMEOUT, "" + timeout);
// Confirm the identity of the client, and can be modified separately when modifying the thread pool configuration.
headers.put(LONG_PULLING_CLIENT_IDENTIFICATION, identification);
headers.put(LONG_PULLING_CLIENT_IDENTIFICATION, identify);
// told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) {
headers.put(LONG_PULLING_TIMEOUT_NO_HANGUP, "true");
@ -172,23 +172,22 @@ public class ClientWorker {
}
} catch (Exception ex) {
setHealthServer(false);
log.error("Check update get changed dataId exception. error message :: {}", ex.getMessage());
log.error("Check update get changed dataId exception. error message: {}", ex.getMessage());
}
return Collections.emptyList();
}
public String getServerConfig(String namespace, String itemId, String tpId, long readTimeout) {
public String getServerConfig(String namespace, String itemId, String threadPoolId, long readTimeout) {
Map<String, String> params = new HashMap(3);
params.put("namespace", namespace);
params.put("itemId", itemId);
params.put("tpId", tpId);
params.put("instanceId", identification);
params.put("tpId", threadPoolId);
params.put("instanceId", identify);
Result result = agent.httpGetByConfig(CONFIG_CONTROLLER_PATH, null, params, readTimeout);
if (result.isSuccess()) {
return JSONUtil.toJSONString(result.getData());
}
log.error("Sub server namespace :: {}, itemId :: {}, tpId :: {}, result code :: {}",
namespace, itemId, tpId, result.getCode());
log.error("Sub server namespace: {}, itemId: {}, threadPoolId: {}, result code: {}", namespace, itemId, threadPoolId, result.getCode());
return NULL;
}
@ -219,28 +218,28 @@ public class ClientWorker {
return updateList;
}
public void addTenantListeners(String namespace, String itemId, String tpId, List<? extends Listener> listeners) {
CacheData cacheData = addCacheDataIfAbsent(namespace, itemId, tpId);
public void addTenantListeners(String namespace, String itemId, String threadPoolId, List<? extends Listener> listeners) {
CacheData cacheData = addCacheDataIfAbsent(namespace, itemId, threadPoolId);
for (Listener listener : listeners) {
cacheData.addListener(listener);
}
}
public CacheData addCacheDataIfAbsent(String namespace, String itemId, String tpId) {
CacheData cacheData = cacheMap.get(tpId);
public CacheData addCacheDataIfAbsent(String namespace, String itemId, String threadPoolId) {
CacheData cacheData = cacheMap.get(threadPoolId);
if (cacheData != null) {
return cacheData;
}
cacheData = new CacheData(namespace, itemId, tpId);
CacheData lastCacheData = cacheMap.putIfAbsent(tpId, cacheData);
cacheData = new CacheData(namespace, itemId, threadPoolId);
CacheData lastCacheData = cacheMap.putIfAbsent(threadPoolId, cacheData);
if (lastCacheData == null) {
String serverConfig;
try {
serverConfig = getServerConfig(namespace, itemId, tpId, 3000L);
serverConfig = getServerConfig(namespace, itemId, threadPoolId, 3000L);
ThreadPoolParameterInfo poolInfo = JSONUtil.parseObject(serverConfig, ThreadPoolParameterInfo.class);
cacheData.setContent(ContentUtil.getPoolContent(poolInfo));
} catch (Exception ex) {
log.error("Cache Data Error. Service Unavailable :: {}", ex.getMessage());
log.error("Cache Data Error. Service Unavailable: {}", ex.getMessage());
}
int taskId = cacheMap.size() / CONFIG_LONG_POLL_TIMEOUT;
cacheData.setTaskId(taskId);

@ -27,7 +27,7 @@ import org.springframework.boot.diagnostics.FailureAnalysis;
public class ConfigEmptyAnalyzer extends AbstractFailureAnalyzer<ConfigEmptyException> {
@Override
protected FailureAnalysis analyze(Throwable rootFailure, ConfigEmptyException cause) {
return new FailureAnalysis(cause.getDescription(), cause.getAction(), cause);
protected FailureAnalysis analyze(Throwable rootFailure, ConfigEmptyException configEmptyException) {
return new FailureAnalysis(configEmptyException.getDescription(), configEmptyException.getAction(), configEmptyException);
}
}

@ -27,10 +27,10 @@ public interface ConfigService {
*
* @param tenantId
* @param itemId
* @param tpId
* @param threadPoolId
* @param listener
*/
void addListener(String tenantId, String itemId, String tpId, Listener listener);
void addListener(String tenantId, String itemId, String threadPoolId, Listener listener);
/**
* Get server status.

@ -63,7 +63,7 @@ public class DiscoveryClient implements DisposableBean {
new Integer(1),
ThreadFactoryBuilder.builder().daemon(true).prefix("client.discovery.scheduler").build());
register();
// init the schedule tasks
// Init the schedule tasks.
initScheduledTasks();
}
@ -79,10 +79,10 @@ public class DiscoveryClient implements DisposableBean {
registerResult = httpAgent.httpPostByDiscovery(urlPath, instanceInfo);
} catch (Exception ex) {
registerResult = Results.failure(ErrorCodeEnum.SERVICE_ERROR);
log.error("{}{} - registration failed :: {}", PREFIX, appPathIdentifier, ex.getMessage());
log.error("{}{} - registration failed: {}", PREFIX, appPathIdentifier, ex.getMessage());
}
if (log.isInfoEnabled()) {
log.info("{}{} - registration status :: {}", PREFIX, appPathIdentifier, registerResult.isSuccess() ? "success" : "fail");
log.info("{}{} - registration status: {}", PREFIX, appPathIdentifier, registerResult.isSuccess() ? "success" : "fail");
}
return registerResult.isSuccess();
}
@ -124,7 +124,7 @@ public class DiscoveryClient implements DisposableBean {
}
}
boolean renew() {
private boolean renew() {
Result renewResult;
try {
InstanceInfo.InstanceRenew instanceRenew = new InstanceInfo.InstanceRenew()

@ -21,6 +21,7 @@ import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.enums.EnableEnum;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.toolkit.BooleanUtil;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.core.executor.DynamicThreadPool;
@ -33,7 +34,6 @@ import cn.hippo4j.core.toolkit.inet.DynamicThreadPoolAnnotationUtil;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import cn.hippo4j.springboot.starter.remote.HttpAgent;
import cn.hutool.core.util.BooleanUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
@ -63,7 +63,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
private final ThreadPoolOperation threadPoolOperation;
private final ServerThreadPoolDynamicRefresh threadPoolDynamicRefresh;
private final ServerThreadPoolDynamicRefresh serverThreadPoolDynamicRefresh;
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) {
@ -88,14 +88,14 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
return bean;
}
DynamicThreadPoolExecutor dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean;
DynamicThreadPoolWrapper wrap = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor);
ThreadPoolExecutor remoteExecutor = fillPoolAndRegister(wrap);
subscribeConfig(wrap);
return remoteExecutor;
DynamicThreadPoolWrapper dynamicThreadPoolWrapper = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor);
ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(dynamicThreadPoolWrapper);
subscribeConfig(dynamicThreadPoolWrapper);
return remoteThreadPoolExecutor;
}
if (bean instanceof DynamicThreadPoolWrapper) {
DynamicThreadPoolWrapper wrap = (DynamicThreadPoolWrapper) bean;
registerAndSubscribe(wrap);
DynamicThreadPoolWrapper dynamicThreadPoolWrapper = (DynamicThreadPoolWrapper) bean;
registerAndSubscribe(dynamicThreadPoolWrapper);
}
return bean;
}
@ -103,22 +103,22 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
/**
* Register and subscribe.
*
* @param dynamicThreadPoolWrap
* @param dynamicThreadPoolWrapper
*/
protected void registerAndSubscribe(DynamicThreadPoolWrapper dynamicThreadPoolWrap) {
fillPoolAndRegister(dynamicThreadPoolWrap);
subscribeConfig(dynamicThreadPoolWrap);
protected void registerAndSubscribe(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) {
fillPoolAndRegister(dynamicThreadPoolWrapper);
subscribeConfig(dynamicThreadPoolWrapper);
}
/**
* Fill the thread pool and register.
*
* @param dynamicThreadPoolWrap
* @param dynamicThreadPoolWrapper
*/
protected ThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrapper dynamicThreadPoolWrap) {
String tpId = dynamicThreadPoolWrap.getThreadPoolId();
protected ThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) {
String threadPoolId = dynamicThreadPoolWrapper.getThreadPoolId();
Map<String, String> queryStrMap = new HashMap(3);
queryStrMap.put(TP_ID, tpId);
queryStrMap.put(TP_ID, threadPoolId);
queryStrMap.put(ITEM_ID, properties.getItemId());
queryStrMap.put(NAMESPACE, properties.getNamespace());
boolean isSubscribe = false;
@ -134,43 +134,43 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
newDynamicThreadPoolExecutor = ThreadPoolBuilder.builder()
.dynamicPool()
.workQueue(workQueue)
.threadFactory(tpId)
.threadFactory(threadPoolId)
.poolThreadSize(threadPoolParameterInfo.corePoolSizeAdapt(), threadPoolParameterInfo.maximumPoolSizeAdapt())
.keepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS)
.rejected(RejectedTypeEnum.createPolicy(threadPoolParameterInfo.getRejectedType()))
.allowCoreThreadTimeOut(EnableEnum.getBool(threadPoolParameterInfo.getAllowCoreThreadTimeOut()))
.build();
// Set dynamic thread pool enhancement parameters.
if (dynamicThreadPoolWrap.getExecutor() instanceof AbstractDynamicExecutorSupport) {
if (dynamicThreadPoolWrapper.getExecutor() instanceof AbstractDynamicExecutorSupport) {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(
BooleanUtil.toBoolean(threadPoolParameterInfo.getIsAlarm().toString()),
threadPoolParameterInfo.getCapacityAlarm(),
threadPoolParameterInfo.getLivenessAlarm());
GlobalNotifyAlarmManage.put(tpId, threadPoolNotifyAlarm);
TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getTaskDecorator();
GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm);
TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor()).getTaskDecorator();
((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setTaskDecorator(taskDecorator);
long awaitTerminationMillis = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).awaitTerminationMillis;
boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).waitForTasksToCompleteOnShutdown;
long awaitTerminationMillis = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor()).awaitTerminationMillis;
boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor()).waitForTasksToCompleteOnShutdown;
((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown);
long executeTimeOut = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getExecuteTimeOut();
long executeTimeOut = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor()).getExecuteTimeOut();
((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setExecuteTimeOut(executeTimeOut);
}
dynamicThreadPoolWrap.setExecutor(newDynamicThreadPoolExecutor);
dynamicThreadPoolWrapper.setExecutor(newDynamicThreadPoolExecutor);
isSubscribe = true;
}
}
} catch (Exception ex) {
newDynamicThreadPoolExecutor = dynamicThreadPoolWrap.getExecutor() != null ? dynamicThreadPoolWrap.getExecutor() : CommonDynamicThreadPool.getInstance(tpId);
dynamicThreadPoolWrap.setExecutor(newDynamicThreadPoolExecutor);
log.error("Failed to initialize thread pool configuration. error message :: {}", ex.getMessage());
newDynamicThreadPoolExecutor = dynamicThreadPoolWrapper.getExecutor() != null ? dynamicThreadPoolWrapper.getExecutor() : CommonDynamicThreadPool.getInstance(threadPoolId);
dynamicThreadPoolWrapper.setExecutor(newDynamicThreadPoolExecutor);
log.error("Failed to initialize thread pool configuration. error message: {}", ex.getMessage());
} finally {
if (Objects.isNull(dynamicThreadPoolWrap.getExecutor())) {
dynamicThreadPoolWrap.setExecutor(CommonDynamicThreadPool.getInstance(tpId));
if (Objects.isNull(dynamicThreadPoolWrapper.getExecutor())) {
dynamicThreadPoolWrapper.setExecutor(CommonDynamicThreadPool.getInstance(threadPoolId));
}
// Set whether to subscribe to the remote thread pool configuration.
dynamicThreadPoolWrap.setSubscribeFlag(isSubscribe);
dynamicThreadPoolWrapper.setSubscribeFlag(isSubscribe);
}
GlobalThreadPoolManage.register(dynamicThreadPoolWrap.getThreadPoolId(), threadPoolParameterInfo, dynamicThreadPoolWrap);
GlobalThreadPoolManage.register(dynamicThreadPoolWrapper.getThreadPoolId(), threadPoolParameterInfo, dynamicThreadPoolWrapper);
return newDynamicThreadPoolExecutor;
}
@ -192,7 +192,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
*/
protected void subscribeConfig(DynamicThreadPoolWrapper dynamicThreadPoolWrap) {
if (dynamicThreadPoolWrap.isSubscribeFlag()) {
threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getThreadPoolId(), configRefreshExecutorService, config -> threadPoolDynamicRefresh.dynamicRefresh(config));
threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getThreadPoolId(), configRefreshExecutorService, config -> serverThreadPoolDynamicRefresh.dynamicRefresh(config));
}
}
}

@ -145,7 +145,7 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh
ResizableCapacityLinkedBlockingQueue queue = (ResizableCapacityLinkedBlockingQueue) executor.getQueue();
queue.setCapacity(parameter.getCapacity());
} else {
log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type :: {}", executor.getQueue().getClass().getSimpleName());
log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type: {}", executor.getQueue().getClass().getSimpleName());
}
}
if (parameter.getKeepAliveTime() != null) {

@ -17,7 +17,10 @@
package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.adapter.base.*;
import cn.hippo4j.adapter.base.ThreadPoolAdapter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig;
import cn.hippo4j.adapter.base.ThreadPoolAdapterRegisterAction;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.web.base.Result;
@ -28,7 +31,6 @@ import cn.hippo4j.springboot.starter.remote.HttpAgent;
import cn.hippo4j.springboot.starter.toolkit.CloudCommonIdUtil;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
@ -36,9 +38,6 @@ import org.springframework.core.env.ConfigurableEnvironment;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL;
import static cn.hippo4j.common.constant.Constants.REGISTER_ADAPTER_PATH;
@ -65,24 +64,24 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner, ThreadPoolA
@Override
public List<ThreadPoolAdapterCacheConfig> getThreadPoolAdapterCacheConfigs(Map<String, ThreadPoolAdapter> threadPoolAdapterMap) {
List<ThreadPoolAdapterCacheConfig> cacheConfigList = Lists.newArrayList();
List<ThreadPoolAdapterCacheConfig> adapterCacheConfigList = Lists.newArrayList();
for (Map.Entry<String, ThreadPoolAdapter> threadPoolAdapterEntry : threadPoolAdapterMap.entrySet()) {
ThreadPoolAdapter val = threadPoolAdapterEntry.getValue();
List<ThreadPoolAdapterState> threadPoolStates = val.getThreadPoolStates();
ThreadPoolAdapter threadPoolAdapter = threadPoolAdapterEntry.getValue();
List<ThreadPoolAdapterState> threadPoolStates = threadPoolAdapter.getThreadPoolStates();
if (CollectionUtil.isEmpty(threadPoolStates) || threadPoolStates.size() == 0) {
continue;
}
ThreadPoolAdapterCacheConfig cacheConfig = new ThreadPoolAdapterCacheConfig();
cacheConfig.setMark(val.mark());
cacheConfig.setMark(threadPoolAdapter.mark());
String tenantItemKey = properties.getNamespace() + IDENTIFY_SLICER_SYMBOL + properties.getItemId();
cacheConfig.setTenantItemKey(tenantItemKey);
cacheConfig.setClientIdentify(IdentifyUtil.getIdentify());
String clientAddress = CloudCommonIdUtil.getClientIpPort(environment, hippo4JInetUtils);
cacheConfig.setClientAddress(clientAddress);
cacheConfig.setThreadPoolAdapterStates(threadPoolStates);
cacheConfigList.add(cacheConfig);
adapterCacheConfigList.add(cacheConfig);
}
return cacheConfigList;
return adapterCacheConfigList;
}
@Override

@ -33,14 +33,14 @@ public class ThreadPoolConfigService implements ConfigService, ApplicationListen
private final ServerHealthCheck serverHealthCheck;
public ThreadPoolConfigService(HttpAgent httpAgent, String identification, ServerHealthCheck serverHealthCheck) {
public ThreadPoolConfigService(HttpAgent httpAgent, String identify, ServerHealthCheck serverHealthCheck) {
this.serverHealthCheck = serverHealthCheck;
this.clientWorker = new ClientWorker(httpAgent, identification, serverHealthCheck);
this.clientWorker = new ClientWorker(httpAgent, identify, serverHealthCheck);
}
@Override
public void addListener(String tenantId, String itemId, String tpId, Listener listener) {
clientWorker.addTenantListeners(tenantId, itemId, tpId, Arrays.asList(listener));
public void addListener(String tenantId, String itemId, String threadPoolId, Listener listener) {
clientWorker.addTenantListeners(tenantId, itemId, threadPoolId, Arrays.asList(listener));
}
@Override

@ -35,7 +35,7 @@ public class ThreadPoolOperation {
this.configService = configService;
}
public Listener subscribeConfig(String tpId, Executor executor, ThreadPoolSubscribeCallback threadPoolSubscribeCallback) {
public Listener subscribeConfig(String threadPoolId, Executor executor, ThreadPoolSubscribeCallback threadPoolSubscribeCallback) {
Listener configListener = new Listener() {
@Override
@ -48,7 +48,7 @@ public class ThreadPoolOperation {
return executor;
}
};
configService.addListener(properties.getNamespace(), properties.getItemId(), tpId, configListener);
configService.addListener(properties.getNamespace(), properties.getItemId(), threadPoolId, configListener);
return configListener;
}
}

@ -99,7 +99,7 @@ public class ReportingEventExecutor implements Runnable, CommandLineRunner, Disp
Message message = messageCollectVessel.take();
messageSender.send(message);
} catch (Throwable ex) {
log.error("Consumption buffer container task failed. Number of buffer container tasks :: {}", messageCollectVessel.size(), ex);
log.error("Consumption buffer container task failed. Number of buffer container tasks: {}", messageCollectVessel.size(), ex);
}
}
}
@ -143,7 +143,7 @@ public class ReportingEventExecutor implements Runnable, CommandLineRunner, Disp
ThreadUtil.newThread(this, "client.thread.reporting.task", Boolean.TRUE).start();
}
if (GlobalThreadPoolManage.getThreadPoolNum() > 0) {
log.info("Dynamic thread pool :: [{}]. The dynamic thread pool starts data collection and reporting. ", getThreadPoolNum());
log.info("Dynamic thread pool: [{}]. The dynamic thread pool starts data collection and reporting.", getThreadPoolNum());
}
}

@ -64,7 +64,7 @@ public class NettyConnectSender implements MessageSender {
});
bootstrap.connect(serverNettyAgent.getNettyServerAddress(), serverNettyAgent.getNettyServerPort()).sync();
} catch (Exception e) {
log.error("netty send error ", e);
log.error("Netty send error.", e);
}
}
}

@ -70,7 +70,7 @@ public class ServerNotifyConfigBuilder implements NotifyConfigBuilder {
try {
result = httpAgent.httpPostByDiscovery(BASE_PATH + "/notify/list/config", new ThreadPoolNotifyRequest(groupKeys));
} catch (Throwable ex) {
log.error("Get dynamic thread pool notify configuration error. message :: {}", ex.getMessage());
log.error("Get dynamic thread pool notify configuration error. message: {}", ex.getMessage());
}
if (result != null && result.isSuccess() && result.getData() != null) {
String resultDataStr = JSONUtil.toJSONString(result.getData());

@ -110,8 +110,7 @@ public abstract class AbstractHealthCheck implements ServerHealthCheck, Initiali
@Override
@SneakyThrows
public boolean isHealthStatus() {
while (contextInitComplete
&& !healthStatus && !clientShutdownHook) {
while (contextInitComplete && !healthStatus && !clientShutdownHook) {
healthMainLock.lock();
try {
healthCondition.await();
@ -152,7 +151,7 @@ public abstract class AbstractHealthCheck implements ServerHealthCheck, Initiali
public void afterPropertiesSet() throws Exception {
/**
* Add a hook function, when the client stops, if the server is in an unhealthy state,
* the client destroy function will suspend operation
* the client destroy function will suspend operation.
*/
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
clientShutdownHook = true;

@ -61,8 +61,7 @@ public class SecurityProxy {
public boolean applyToken(List<String> servers) {
try {
if ((System.currentTimeMillis() - lastRefreshTime) < TimeUnit.SECONDS
.toMillis(tokenTtl - tokenRefreshWindow)) {
if ((System.currentTimeMillis() - lastRefreshTime) < TimeUnit.SECONDS.toMillis(tokenTtl - tokenRefreshWindow)) {
return true;
}
for (String server : servers) {
@ -85,7 +84,7 @@ public class SecurityProxy {
try {
Result<String> result = httpClientUtil.restApiPost(url, bodyMap, Result.class);
if (!result.isSuccess()) {
log.error("Error getting access token. message :: {}", result.getMessage());
log.error("Error getting access token. message: {}", result.getMessage());
return false;
}
String tokenJsonStr = JSONUtil.toJSONString(result.getData());
@ -94,7 +93,7 @@ public class SecurityProxy {
tokenTtl = tokenInfo.getTokenTtl();
tokenRefreshWindow = tokenTtl / 10;
} catch (Throwable ex) {
log.error("Failed to apply for token. message :: {}", ex.getMessage());
log.error("Failed to apply for token. message: {}", ex.getMessage());
return false;
}
}

Loading…
Cancel
Save