Merge pull request #222 from weihubeats/develop

优化spring事件解耦
pull/233/head
龙台 Long Tai 2 years ago committed by GitHub
commit fe0c5895f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.springboot.starter.event;
import cn.hippo4j.common.notify.request.ChangeParameterNotifyRequest;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.support.RejectedTypeEnum;
import cn.hippo4j.core.executor.support.ResizableCapacityLinkedBlockIngQueue;
import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties;
import cn.hippo4j.core.springboot.starter.config.ExecutorProperties;
import cn.hippo4j.core.springboot.starter.support.GlobalCoreThreadPoolManage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT;
import static cn.hippo4j.core.springboot.starter.event.ThreadPoolDynamicRefreshEventOrder.EXECUTORS_LISTENER;
/**
* @author : wh
* @date : 2022/5/13 10:06
* @description:
*/
@Slf4j
@Component
@RequiredArgsConstructor
@Order(EXECUTORS_LISTENER)
public class ExecutorsListener implements ApplicationListener<ThreadPoolDynamicRefreshEvent> {
private final ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler;
@Override
public void onApplicationEvent(ThreadPoolDynamicRefreshEvent threadPoolDynamicRefreshEvent) {
BootstrapCoreProperties bindableCoreProperties = threadPoolDynamicRefreshEvent.getBootstrapCoreProperties();
List<ExecutorProperties> executors = bindableCoreProperties.getExecutors();
for (ExecutorProperties properties : executors) {
String threadPoolId = properties.getThreadPoolId();
if (!checkConsistency(threadPoolId, properties)) {
continue;
}
// refresh executor pool
dynamicRefreshPool(threadPoolId, properties);
// old properties
ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId());
// refresh executor properties
GlobalCoreThreadPoolManage.refresh(threadPoolId, properties);
log.info(CHANGE_THREAD_POOL_TEXT,
threadPoolId.toUpperCase(),
String.format(CHANGE_DELIMITER, beforeProperties.getCorePoolSize(), properties.getCorePoolSize()),
String.format(CHANGE_DELIMITER, beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()),
String.format(CHANGE_DELIMITER, beforeProperties.getBlockingQueue(), properties.getBlockingQueue()),
String.format(CHANGE_DELIMITER, beforeProperties.getQueueCapacity(), properties.getQueueCapacity()),
String.format(CHANGE_DELIMITER, beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()),
String.format(CHANGE_DELIMITER, beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()),
String.format(CHANGE_DELIMITER, beforeProperties.getRejectedHandler(), properties.getRejectedHandler()),
String.format(CHANGE_DELIMITER, beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut()));
try {
threadPoolNotifyAlarmHandler.sendPoolConfigChange(newChangeRequest(beforeProperties, properties));
} catch (Throwable ex) {
log.error("Failed to send changSmartApplicationListenere notice. Message :: {}", ex.getMessage());
}
}
}
/**
* Construct ChangeParameterNotifyRequest instance
*
* @param beforeProperties old properties
* @param properties new properties
* @return instance
*/
private ChangeParameterNotifyRequest newChangeRequest(ExecutorProperties beforeProperties, ExecutorProperties properties) {
ChangeParameterNotifyRequest changeRequest = new ChangeParameterNotifyRequest();
changeRequest.setBeforeCorePoolSize(beforeProperties.getCorePoolSize());
changeRequest.setBeforeMaximumPoolSize(beforeProperties.getMaximumPoolSize());
changeRequest.setBeforeAllowsCoreThreadTimeOut(beforeProperties.getAllowCoreThreadTimeOut());
changeRequest.setBeforeKeepAliveTime(beforeProperties.getKeepAliveTime());
changeRequest.setBlockingQueueName(beforeProperties.getBlockingQueue());
changeRequest.setBeforeQueueCapacity(beforeProperties.getQueueCapacity());
changeRequest.setBeforeRejectedName(beforeProperties.getRejectedHandler());
changeRequest.setBeforeExecuteTimeOut(beforeProperties.getExecuteTimeOut());
changeRequest.setThreadPoolId(beforeProperties.getThreadPoolId());
changeRequest.setNowCorePoolSize(properties.getCorePoolSize());
changeRequest.setNowMaximumPoolSize(properties.getMaximumPoolSize());
changeRequest.setNowAllowsCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut());
changeRequest.setNowKeepAliveTime(properties.getKeepAliveTime());
changeRequest.setNowQueueCapacity(properties.getQueueCapacity());
changeRequest.setNowRejectedName(properties.getRejectedHandler());
changeRequest.setNowExecuteTimeOut(properties.getExecuteTimeOut());
return changeRequest;
}
/**
* Check consistency.
*
* @param threadPoolId
* @param properties
*/
private boolean checkConsistency(String threadPoolId, ExecutorProperties properties) {
ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId());
ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor();
boolean result = !Objects.equals(beforeProperties.getCorePoolSize(), properties.getCorePoolSize())
|| !Objects.equals(beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize())
|| !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())
|| !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut())
|| !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())
|| !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())
||
(!Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity())
&& Objects.equals(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.name, executor.getQueue().getClass().getSimpleName()));
return result;
}
/**
* Dynamic refresh pool.
*
* @param threadPoolId
* @param properties
*/
private void dynamicRefreshPool(String threadPoolId, ExecutorProperties properties) {
ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId());
ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor();
if (properties.getMaximumPoolSize() != null && properties.getCorePoolSize() != null) {
if (properties.getMaximumPoolSize() < executor.getMaximumPoolSize()) {
executor.setCorePoolSize(properties.getCorePoolSize());
executor.setMaximumPoolSize(properties.getMaximumPoolSize());
} else {
executor.setMaximumPoolSize(properties.getMaximumPoolSize());
executor.setCorePoolSize(properties.getCorePoolSize());
}
} else {
if (properties.getMaximumPoolSize() != null) {
executor.setMaximumPoolSize(properties.getMaximumPoolSize());
}
if (properties.getCorePoolSize() != null) {
executor.setCorePoolSize(properties.getCorePoolSize());
}
}
if (!Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())) {
executor.allowCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut());
}
if (!Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut())) {
if (executor instanceof AbstractDynamicExecutorSupport) {
((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(properties.getExecuteTimeOut());
}
}
if (!Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) {
RejectedExecutionHandler rejectedExecutionHandler = RejectedTypeEnum.createPolicy(properties.getRejectedHandler());
if (executor instanceof AbstractDynamicExecutorSupport) {
DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor;
dynamicExecutor.setRedundancyHandler(rejectedExecutionHandler);
AtomicLong rejectCount = dynamicExecutor.getRejectCount();
rejectedExecutionHandler = RejectedProxyUtil.createProxy(rejectedExecutionHandler, threadPoolId, rejectCount);
}
executor.setRejectedExecutionHandler(rejectedExecutionHandler);
}
if (!Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) {
executor.setKeepAliveTime(properties.getKeepAliveTime(), TimeUnit.SECONDS);
}
if (!Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity())
&& Objects.equals(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.name, executor.getQueue().getClass().getSimpleName())) {
if (executor.getQueue() instanceof ResizableCapacityLinkedBlockIngQueue) {
ResizableCapacityLinkedBlockIngQueue<?> queue = (ResizableCapacityLinkedBlockIngQueue<?>) executor.getQueue();
queue.setCapacity(properties.getQueueCapacity());
} else {
log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type :: {}", executor.getQueue().getClass().getSimpleName());
}
}
}
}

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.springboot.starter.event;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.notify.HippoBaseSendMessageService;
import cn.hippo4j.common.notify.NotifyConfigDTO;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties;
import cn.hippo4j.core.springboot.starter.config.ExecutorProperties;
import cn.hippo4j.core.springboot.starter.notify.CoreNotifyConfigBuilder;
import org.springframework.context.ApplicationListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import static cn.hippo4j.core.springboot.starter.event.ThreadPoolDynamicRefreshEventOrder.PLATFORMS_LISTENER;
/**
* @author : wh
* @date : 2022/5/13 10:03
* @description:
*/
@Component
@Order(PLATFORMS_LISTENER)
public class PlatformsListener implements ApplicationListener<ThreadPoolDynamicRefreshEvent> {
@Override
public void onApplicationEvent(ThreadPoolDynamicRefreshEvent threadPoolDynamicRefreshEvent) {
BootstrapCoreProperties bindableCoreProperties = threadPoolDynamicRefreshEvent.getBootstrapCoreProperties();
List<ExecutorProperties> executors = bindableCoreProperties.getExecutors();
for (ExecutorProperties executor : executors) {
String threadPoolId = executor.getThreadPoolId();
DynamicThreadPoolWrapper wrapper = GlobalThreadPoolManage.getExecutorService(threadPoolId);
if (!wrapper.isInitFlag()) {
HippoBaseSendMessageService sendMessageService = ApplicationContextHolder.getBean(HippoBaseSendMessageService.class);
CoreNotifyConfigBuilder configBuilder = ApplicationContextHolder.getBean(CoreNotifyConfigBuilder.class);
Map<String, List<NotifyConfigDTO>> notifyConfig = configBuilder.buildSingleNotifyConfig(executor);
sendMessageService.putPlatform(notifyConfig);
wrapper.setInitFlag(Boolean.TRUE);
}
}
}
}

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.springboot.starter.event;
import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties;
import lombok.Getter;
import lombok.Setter;
import org.springframework.context.ApplicationEvent;
/**
* @author : wh
* @date : 2022/5/13 09:49
* @description:
*/
public class ThreadPoolDynamicRefreshEvent extends ApplicationEvent {
@Getter
@Setter
private BootstrapCoreProperties bootstrapCoreProperties;
public ThreadPoolDynamicRefreshEvent(Object source) {
super(source);
}
}

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.springboot.starter.event;
/**
* @author : wh
* @date : 2022/5/13 10:25
* @description:
*/
public interface ThreadPoolDynamicRefreshEventOrder {
Integer WEB_EXECUTOR_LISTENER = 0;
Integer PLATFORMS_LISTENER = 1;
Integer EXECUTORS_LISTENER = 2;
}

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.springboot.starter.event;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.core.executor.web.WebThreadPoolHandlerChoose;
import cn.hippo4j.core.executor.web.WebThreadPoolService;
import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties;
import cn.hippo4j.core.springboot.starter.config.WebThreadPoolProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.Objects;
import static cn.hippo4j.core.springboot.starter.event.ThreadPoolDynamicRefreshEventOrder.WEB_EXECUTOR_LISTENER;
/**
* @author : wh
* @date : 2022/5/13 09:53
* @description:
*/
@Slf4j
@Component
@Order(WEB_EXECUTOR_LISTENER)
public class WebExecutorListener implements ApplicationListener<ThreadPoolDynamicRefreshEvent> {
@Override
public void onApplicationEvent(ThreadPoolDynamicRefreshEvent threadPoolDynamicRefreshEvent) {
BootstrapCoreProperties bindableCoreProperties = threadPoolDynamicRefreshEvent.getBootstrapCoreProperties();
boolean isNullFlag = bindableCoreProperties.getJetty() == null
|| bindableCoreProperties.getUndertow() == null
|| bindableCoreProperties.getTomcat() == null;
if (isNullFlag) {
return;
}
try {
ThreadPoolParameterInfo nowParameter = buildWebPoolParameter(bindableCoreProperties);
if (nowParameter != null) {
WebThreadPoolHandlerChoose webThreadPoolHandlerChoose = ApplicationContextHolder.getBean(WebThreadPoolHandlerChoose.class);
WebThreadPoolService webThreadPoolService = webThreadPoolHandlerChoose.choose();
ThreadPoolParameter beforeParameter = webThreadPoolService.getWebThreadPoolParameter();
if (!Objects.equals(beforeParameter.getCoreSize(), nowParameter.getCoreSize())
|| !Objects.equals(beforeParameter.getMaxSize(), nowParameter.getMaxSize())
|| !Objects.equals(beforeParameter.getKeepAliveTime(), nowParameter.getKeepAliveTime())) {
webThreadPoolService.updateWebThreadPool(nowParameter);
}
}
} catch (Exception ex) {
log.error("Failed to modify web thread pool.", ex);
}
}
private ThreadPoolParameterInfo buildWebPoolParameter(BootstrapCoreProperties bindableCoreProperties) {
ThreadPoolParameterInfo parameterInfo = null;
WebThreadPoolProperties poolProperties = null;
if (bindableCoreProperties.getTomcat() != null) {
poolProperties = bindableCoreProperties.getTomcat();
} else if (bindableCoreProperties.getUndertow() != null) {
poolProperties = bindableCoreProperties.getUndertow();
} else if (bindableCoreProperties.getJetty() != null) {
poolProperties = bindableCoreProperties.getJetty();
}
if (poolProperties != null) {
parameterInfo = new ThreadPoolParameterInfo();
parameterInfo.setCoreSize(poolProperties.getCorePoolSize());
parameterInfo.setMaxSize(poolProperties.getMaximumPoolSize());
parameterInfo.setKeepAliveTime(poolProperties.getKeepAliveTime());
}
return parameterInfo;
}
}

@ -18,44 +18,23 @@
package cn.hippo4j.core.springboot.starter.refresher;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.notify.HippoBaseSendMessageService;
import cn.hippo4j.common.notify.NotifyConfigDTO;
import cn.hippo4j.common.notify.ThreadPoolNotifyAlarm;
import cn.hippo4j.common.notify.request.ChangeParameterNotifyRequest;
import cn.hippo4j.core.executor.web.WebThreadPoolHandlerChoose;
import cn.hippo4j.core.executor.web.WebThreadPoolService;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.*;
import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.core.springboot.starter.config.WebThreadPoolProperties;
import cn.hippo4j.core.springboot.starter.support.GlobalCoreThreadPoolManage;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties;
import cn.hippo4j.core.springboot.starter.config.ExecutorProperties;
import cn.hippo4j.core.springboot.starter.notify.CoreNotifyConfigBuilder;
import cn.hippo4j.core.springboot.starter.event.ThreadPoolDynamicRefreshEvent;
import cn.hippo4j.core.springboot.starter.parser.ConfigParserHandler;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT;
/**
* Abstract core thread-pool dynamic refresh.
@ -64,8 +43,8 @@ import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD
* @date 2022/2/26 12:42
*/
@Slf4j
@AllArgsConstructor
public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh, InitializingBean {
@RequiredArgsConstructor
public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh, InitializingBean, ApplicationContextAware {
private final ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler;
@ -73,6 +52,8 @@ public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPool
protected final ExecutorService dynamicRefreshExecutorService = ThreadPoolBuilder.builder().singlePool("client.dynamic.refresh").build();
private ApplicationContext applicationContext;
@Override
public void dynamicRefresh(String content) {
Map<Object, Object> configInfo;
@ -83,14 +64,12 @@ public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPool
content, bootstrapCoreProperties.getConfigFileType(), e);
return;
}
BootstrapCoreProperties bindableCoreProperties = BootstrapCorePropertiesBinderAdapt.bootstrapCorePropertiesBinder(configInfo, bootstrapCoreProperties);
// web pool
refreshWebExecutor(bindableCoreProperties);
// platforms
refreshPlatforms(bindableCoreProperties);
// executors
refreshExecutors(bindableCoreProperties);
ThreadPoolDynamicRefreshEvent event = new ThreadPoolDynamicRefreshEvent(this);
event.setBootstrapCoreProperties(bindableCoreProperties);
applicationContext.publishEvent(event);
}
/**
@ -107,220 +86,11 @@ public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPool
GlobalNotifyAlarmManage.put(executorProperties.getThreadPoolId(), threadPoolNotifyAlarm);
});
}
/**
* Refresh web executor.
*
* @param bindableCoreProperties
*/
private void refreshWebExecutor(BootstrapCoreProperties bindableCoreProperties) {
boolean isNullFlag = bindableCoreProperties.getJetty() == null
|| bindableCoreProperties.getUndertow() == null
|| bindableCoreProperties.getTomcat() == null;
if (isNullFlag) {
return;
}
try {
ThreadPoolParameterInfo nowParameter = buildWebPoolParameter(bindableCoreProperties);
if (nowParameter != null) {
WebThreadPoolHandlerChoose webThreadPoolHandlerChoose = ApplicationContextHolder.getBean(WebThreadPoolHandlerChoose.class);
WebThreadPoolService webThreadPoolService = webThreadPoolHandlerChoose.choose();
ThreadPoolParameter beforeParameter = webThreadPoolService.getWebThreadPoolParameter();
if (!Objects.equals(beforeParameter.getCoreSize(), nowParameter.getCoreSize())
|| !Objects.equals(beforeParameter.getMaxSize(), nowParameter.getMaxSize())
|| !Objects.equals(beforeParameter.getKeepAliveTime(), nowParameter.getKeepAliveTime())) {
webThreadPoolService.updateWebThreadPool(nowParameter);
}
}
} catch (Exception ex) {
log.error("Failed to modify web thread pool.", ex);
}
}
/**
* Refresh platform.
*
* @param bindableCoreProperties
*/
private void refreshPlatforms(BootstrapCoreProperties bindableCoreProperties) {
List<ExecutorProperties> executors = bindableCoreProperties.getExecutors();
for (ExecutorProperties executor : executors) {
String threadPoolId = executor.getThreadPoolId();
DynamicThreadPoolWrapper wrapper = GlobalThreadPoolManage.getExecutorService(threadPoolId);
if (!wrapper.isInitFlag()) {
HippoBaseSendMessageService sendMessageService = ApplicationContextHolder.getBean(HippoBaseSendMessageService.class);
CoreNotifyConfigBuilder configBuilder = ApplicationContextHolder.getBean(CoreNotifyConfigBuilder.class);
Map<String, List<NotifyConfigDTO>> notifyConfig = configBuilder.buildSingleNotifyConfig(executor);
sendMessageService.putPlatform(notifyConfig);
wrapper.setInitFlag(Boolean.TRUE);
}
}
}
/**
* Refresh executors.
*
* @param bindableCoreProperties
*/
private void refreshExecutors(BootstrapCoreProperties bindableCoreProperties) {
List<ExecutorProperties> executors = bindableCoreProperties.getExecutors();
for (ExecutorProperties properties : executors) {
String threadPoolId = properties.getThreadPoolId();
if (!checkConsistency(threadPoolId, properties)) {
continue;
}
// refresh executor pool
dynamicRefreshPool(threadPoolId, properties);
// old properties
ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId());
// refresh executor properties
GlobalCoreThreadPoolManage.refresh(threadPoolId, properties);
log.info(CHANGE_THREAD_POOL_TEXT,
threadPoolId.toUpperCase(),
String.format(CHANGE_DELIMITER, beforeProperties.getCorePoolSize(), properties.getCorePoolSize()),
String.format(CHANGE_DELIMITER, beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()),
String.format(CHANGE_DELIMITER, beforeProperties.getBlockingQueue(), properties.getBlockingQueue()),
String.format(CHANGE_DELIMITER, beforeProperties.getQueueCapacity(), properties.getQueueCapacity()),
String.format(CHANGE_DELIMITER, beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()),
String.format(CHANGE_DELIMITER, beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()),
String.format(CHANGE_DELIMITER, beforeProperties.getRejectedHandler(), properties.getRejectedHandler()),
String.format(CHANGE_DELIMITER, beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut()));
try {
threadPoolNotifyAlarmHandler.sendPoolConfigChange(newChangeRequest(beforeProperties, properties));
} catch (Throwable ex) {
log.error("Failed to send change notice. Message :: {}", ex.getMessage());
}
}
}
/**
* Construct ChangeParameterNotifyRequest instance
*
* @param beforeProperties old properties
* @param properties new properties
* @return instance
*/
private ChangeParameterNotifyRequest newChangeRequest(ExecutorProperties beforeProperties, ExecutorProperties properties) {
ChangeParameterNotifyRequest changeRequest = new ChangeParameterNotifyRequest();
changeRequest.setBeforeCorePoolSize(beforeProperties.getCorePoolSize());
changeRequest.setBeforeMaximumPoolSize(beforeProperties.getMaximumPoolSize());
changeRequest.setBeforeAllowsCoreThreadTimeOut(beforeProperties.getAllowCoreThreadTimeOut());
changeRequest.setBeforeKeepAliveTime(beforeProperties.getKeepAliveTime());
changeRequest.setBlockingQueueName(beforeProperties.getBlockingQueue());
changeRequest.setBeforeQueueCapacity(beforeProperties.getQueueCapacity());
changeRequest.setBeforeRejectedName(beforeProperties.getRejectedHandler());
changeRequest.setBeforeExecuteTimeOut(beforeProperties.getExecuteTimeOut());
changeRequest.setThreadPoolId(beforeProperties.getThreadPoolId());
changeRequest.setNowCorePoolSize(properties.getCorePoolSize());
changeRequest.setNowMaximumPoolSize(properties.getMaximumPoolSize());
changeRequest.setNowAllowsCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut());
changeRequest.setNowKeepAliveTime(properties.getKeepAliveTime());
changeRequest.setNowQueueCapacity(properties.getQueueCapacity());
changeRequest.setNowRejectedName(properties.getRejectedHandler());
changeRequest.setNowExecuteTimeOut(properties.getExecuteTimeOut());
return changeRequest;
}
/**
* Check consistency.
*
* @param threadPoolId
* @param properties
*/
private boolean checkConsistency(String threadPoolId, ExecutorProperties properties) {
ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId());
ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor();
boolean result = !Objects.equals(beforeProperties.getCorePoolSize(), properties.getCorePoolSize())
|| !Objects.equals(beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize())
|| !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())
|| !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut())
|| !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())
|| !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())
||
(!Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity())
&& Objects.equals(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.name, executor.getQueue().getClass().getSimpleName()));
return result;
}
/**
* Dynamic refresh pool.
*
* @param threadPoolId
* @param properties
*/
private void dynamicRefreshPool(String threadPoolId, ExecutorProperties properties) {
ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId());
ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor();
if (properties.getMaximumPoolSize() != null && properties.getCorePoolSize() != null) {
if (properties.getMaximumPoolSize() < executor.getMaximumPoolSize()) {
executor.setCorePoolSize(properties.getCorePoolSize());
executor.setMaximumPoolSize(properties.getMaximumPoolSize());
} else {
executor.setMaximumPoolSize(properties.getMaximumPoolSize());
executor.setCorePoolSize(properties.getCorePoolSize());
}
} else {
if (properties.getMaximumPoolSize() != null) {
executor.setMaximumPoolSize(properties.getMaximumPoolSize());
}
if (properties.getCorePoolSize() != null) {
executor.setCorePoolSize(properties.getCorePoolSize());
}
}
if (!Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())) {
executor.allowCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut());
}
if (!Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut())) {
if (executor instanceof AbstractDynamicExecutorSupport) {
((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(properties.getExecuteTimeOut());
}
}
if (!Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) {
RejectedExecutionHandler rejectedExecutionHandler = RejectedTypeEnum.createPolicy(properties.getRejectedHandler());
if (executor instanceof AbstractDynamicExecutorSupport) {
DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor;
dynamicExecutor.setRedundancyHandler(rejectedExecutionHandler);
AtomicLong rejectCount = dynamicExecutor.getRejectCount();
rejectedExecutionHandler = RejectedProxyUtil.createProxy(rejectedExecutionHandler, threadPoolId, rejectCount);
}
executor.setRejectedExecutionHandler(rejectedExecutionHandler);
}
if (!Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) {
executor.setKeepAliveTime(properties.getKeepAliveTime(), TimeUnit.SECONDS);
}
if (!Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity())
&& Objects.equals(QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.name, executor.getQueue().getClass().getSimpleName())) {
if (executor.getQueue() instanceof ResizableCapacityLinkedBlockIngQueue) {
ResizableCapacityLinkedBlockIngQueue queue = (ResizableCapacityLinkedBlockIngQueue) executor.getQueue();
queue.setCapacity(properties.getQueueCapacity());
} else {
log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type :: {}", executor.getQueue().getClass().getSimpleName());
}
}
}
/**
* Build web pool parameter.
*
* @param bindableCoreProperties
* @return
*/
private ThreadPoolParameterInfo buildWebPoolParameter(BootstrapCoreProperties bindableCoreProperties) {
ThreadPoolParameterInfo parameterInfo = null;
WebThreadPoolProperties poolProperties = null;
if (bindableCoreProperties.getTomcat() != null) {
poolProperties = bindableCoreProperties.getTomcat();
} else if (bindableCoreProperties.getUndertow() != null) {
poolProperties = bindableCoreProperties.getUndertow();
} else if (bindableCoreProperties.getJetty() != null) {
poolProperties = bindableCoreProperties.getJetty();
}
if (poolProperties != null) {
parameterInfo = new ThreadPoolParameterInfo();
parameterInfo.setCoreSize(poolProperties.getCorePoolSize());
parameterInfo.setMaxSize(poolProperties.getMaximumPoolSize());
parameterInfo.setKeepAliveTime(poolProperties.getKeepAliveTime());
}
return parameterInfo;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}

Loading…
Cancel
Save