Merge pull request #519 from mabaiwan/develop

hippo4j server dynamic registration thread pool refactoring
pull/536/head
小马哥 2 years ago committed by GitHub
commit c65db10926
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -37,9 +37,9 @@ public class DynamicThreadPoolRegisterServerNotifyParameter {
private String platform; private String platform;
/** /**
* Secret key * Access token
*/ */
private String secretKey; private String accessToken;
/** /**
* Interval * Interval

@ -165,7 +165,7 @@ public class ConfigServiceImpl implements ConfigService {
.setTpId(configAllInfo.getTpId()) .setTpId(configAllInfo.getTpId())
.setPlatform(serverNotifyParameter.getPlatform()) .setPlatform(serverNotifyParameter.getPlatform())
.setReceives(serverNotifyParameter.getReceives()) .setReceives(serverNotifyParameter.getReceives())
.setSecretKey(serverNotifyParameter.getSecretKey()); .setSecretKey(serverNotifyParameter.getAccessToken());
if (Objects.equals(each, "ALARM")) { if (Objects.equals(each, "ALARM")) {
notifyReqDTO.setInterval(serverNotifyParameter.getInterval()); notifyReqDTO.setInterval(serverNotifyParameter.getInterval());
notifyReqDTO.setAlarmType(true); notifyReqDTO.setAlarmType(true);

@ -63,7 +63,7 @@ public class RegisterDynamicThreadPoolTest {
.build(); .build();
DynamicThreadPoolRegisterServerNotifyParameter serverNotifyParameter = DynamicThreadPoolRegisterServerNotifyParameter.builder() DynamicThreadPoolRegisterServerNotifyParameter serverNotifyParameter = DynamicThreadPoolRegisterServerNotifyParameter.builder()
.platform(NotifyPlatformEnum.WECHAT.name()) .platform(NotifyPlatformEnum.WECHAT.name())
.secretKey("xxx") .accessToken("7487d0a0-20ec-40ab-b67b-ce68db406b37")
.interval(5) .interval(5)
.receives("chen.ma") .receives("chen.ma")
.build(); .build();

@ -47,6 +47,7 @@ import cn.hippo4j.springboot.starter.notify.ServerNotifyConfigBuilder;
import cn.hippo4j.springboot.starter.remote.HttpAgent; import cn.hippo4j.springboot.starter.remote.HttpAgent;
import cn.hippo4j.springboot.starter.remote.HttpScheduledHealthCheck; import cn.hippo4j.springboot.starter.remote.HttpScheduledHealthCheck;
import cn.hippo4j.springboot.starter.remote.ServerHealthCheck; import cn.hippo4j.springboot.starter.remote.ServerHealthCheck;
import cn.hippo4j.springboot.starter.support.DynamicThreadPoolPostProcessor;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;

@ -21,12 +21,15 @@ import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper; import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.toolkit.BooleanUtil;
import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.web.base.Result; import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.exception.ServiceException; import cn.hippo4j.common.web.exception.ServiceException;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.service.AbstractDynamicThreadPoolService; import cn.hippo4j.core.executor.support.service.AbstractDynamicThreadPoolService;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent; import cn.hippo4j.springboot.starter.event.ApplicationCompleteEvent;
import cn.hippo4j.springboot.starter.remote.HttpAgent; import cn.hippo4j.springboot.starter.remote.HttpAgent;
@ -55,9 +58,20 @@ public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolSer
@Override @Override
public ThreadPoolExecutor registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper) { public ThreadPoolExecutor registerDynamicThreadPool(DynamicThreadPoolRegisterWrapper registerWrapper) {
ThreadPoolExecutor dynamicThreadPoolExecutor = registerExecutor(registerWrapper);
subscribeConfig(registerWrapper);
putNotifyAlarmConfig(registerWrapper);
return dynamicThreadPoolExecutor;
}
@Override
public void onApplicationEvent(ApplicationCompleteEvent event) {
clientWorker.notifyApplicationComplete();
}
private ThreadPoolExecutor registerExecutor(DynamicThreadPoolRegisterWrapper registerWrapper) {
DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter(); DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter();
checkThreadPoolParameter(registerParameter); checkThreadPoolParameter(registerParameter);
ThreadPoolParameterInfo parameter = JSONUtil.parseObject(JSONUtil.toJSONString(registerParameter), ThreadPoolParameterInfo.class);
String threadPoolId = registerParameter.getThreadPoolId(); String threadPoolId = registerParameter.getThreadPoolId();
try { try {
failDynamicThreadPoolRegisterWrapper(registerWrapper); failDynamicThreadPoolRegisterWrapper(registerWrapper);
@ -69,20 +83,27 @@ public class DynamicThreadPoolConfigService extends AbstractDynamicThreadPoolSer
log.error("Dynamic thread pool registration execution error: {}", threadPoolId, ex); log.error("Dynamic thread pool registration execution error: {}", threadPoolId, ex);
throw ex; throw ex;
} }
ThreadPoolParameterInfo parameter = JSONUtil.parseObject(JSONUtil.toJSONString(registerParameter), ThreadPoolParameterInfo.class);
ThreadPoolExecutor dynamicThreadPoolExecutor = buildDynamicThreadPoolExecutor(registerParameter); ThreadPoolExecutor dynamicThreadPoolExecutor = buildDynamicThreadPoolExecutor(registerParameter);
DynamicThreadPoolWrapper dynamicThreadPoolWrapper = DynamicThreadPoolWrapper.builder() DynamicThreadPoolWrapper dynamicThreadPoolWrapper = DynamicThreadPoolWrapper.builder()
.threadPoolId(threadPoolId) .threadPoolId(threadPoolId)
.executor(dynamicThreadPoolExecutor) .executor(dynamicThreadPoolExecutor)
.build(); .build();
GlobalThreadPoolManage.register(threadPoolId, parameter, dynamicThreadPoolWrapper); GlobalThreadPoolManage.register(threadPoolId, parameter, dynamicThreadPoolWrapper);
dynamicThreadPoolSubscribeConfig.subscribeConfig(threadPoolId);
clientWorker.addCacheDataIfAbsent(properties.getNamespace(), properties.getItemId(), threadPoolId);
return dynamicThreadPoolExecutor; return dynamicThreadPoolExecutor;
} }
@Override private void subscribeConfig(DynamicThreadPoolRegisterWrapper registerWrapper) {
public void onApplicationEvent(ApplicationCompleteEvent event) { dynamicThreadPoolSubscribeConfig.subscribeConfig(registerWrapper.getDynamicThreadPoolRegisterParameter().getThreadPoolId());
clientWorker.notifyApplicationComplete(); }
private void putNotifyAlarmConfig(DynamicThreadPoolRegisterWrapper registerWrapper) {
DynamicThreadPoolRegisterParameter registerParameter = registerWrapper.getDynamicThreadPoolRegisterParameter();
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(
BooleanUtil.toBoolean(String.valueOf(registerParameter.getIsAlarm())),
registerParameter.getActiveAlarm(),
registerParameter.getCapacityAlarm());
GlobalNotifyAlarmManage.put(registerParameter.getThreadPoolId(), threadPoolNotifyAlarm);
} }
private void checkThreadPoolParameter(DynamicThreadPoolRegisterParameter registerParameter) { private void checkThreadPoolParameter(DynamicThreadPoolRegisterParameter registerParameter) {

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.springboot.starter.core; package cn.hippo4j.springboot.starter.support;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.constant.Constants; import cn.hippo4j.common.constant.Constants;
@ -34,6 +34,7 @@ import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose;
import cn.hippo4j.core.toolkit.inet.DynamicThreadPoolAnnotationUtil; import cn.hippo4j.core.toolkit.inet.DynamicThreadPoolAnnotationUtil;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import cn.hippo4j.springboot.starter.config.BootstrapProperties; import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import cn.hippo4j.springboot.starter.core.DynamicThreadPoolSubscribeConfig;
import cn.hippo4j.springboot.starter.remote.HttpAgent; import cn.hippo4j.springboot.starter.remote.HttpAgent;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
Loading…
Cancel
Save