default subscribe to the remote thread pool configuration. (#850)

* feat:default subscribe to the remote thread pool configuration.

* feat:default empty list
pull/857/head
WuLang 2 years ago committed by GitHub
parent 4e68645ccb
commit 8060e5f239
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -20,6 +20,7 @@ package cn.hippo4j.common.toolkit;
import cn.hippo4j.common.api.JsonFacade; import cn.hippo4j.common.api.JsonFacade;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import java.util.Collections;
import java.util.List; import java.util.List;
/** /**
@ -52,7 +53,7 @@ public class JSONUtil {
public static <T> List<T> parseArray(String text, Class<T> clazz) { public static <T> List<T> parseArray(String text, Class<T> clazz) {
if (StringUtil.isBlank(text)) { if (StringUtil.isBlank(text)) {
return null; return Collections.emptyList();
} }
return JSON_FACADE.parseArray(text, clazz); return JSON_FACADE.parseArray(text, clazz);
} }

@ -26,6 +26,7 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
public class JSONUtilTest { public class JSONUtilTest {
@ -65,8 +66,8 @@ public class JSONUtilTest {
@Test @Test
public void assertParseArray() { public void assertParseArray() {
Assert.assertNull(JSONUtil.parseArray(null, Foo.class)); Assert.assertEquals(Collections.emptyList(), JSONUtil.parseArray(null, Foo.class));
Assert.assertNull(JSONUtil.parseArray(" ", Foo.class)); Assert.assertEquals(Collections.emptyList(), JSONUtil.parseArray(" ", Foo.class));
Assert.assertEquals( Assert.assertEquals(
EXPECTED_FOO_ARRAY, EXPECTED_FOO_ARRAY,
JSONUtil.parseArray(EXPECTED_FOO_JSON_ARRAY, Foo.class)); JSONUtil.parseArray(EXPECTED_FOO_JSON_ARRAY, Foo.class));

@ -51,6 +51,7 @@ public class DynamicThreadPoolWrapper implements DisposableBean {
public DynamicThreadPoolWrapper(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) { public DynamicThreadPoolWrapper(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
this.threadPoolId = threadPoolId; this.threadPoolId = threadPoolId;
this.executor = threadPoolExecutor; this.executor = threadPoolExecutor;
this.subscribeFlag = true;
} }
public void execute(Runnable command) { public void execute(Runnable command) {
@ -67,7 +68,7 @@ public class DynamicThreadPoolWrapper implements DisposableBean {
@Override @Override
public void destroy() throws Exception { public void destroy() throws Exception {
if (executor != null && executor instanceof AbstractDynamicExecutorSupport) { if (executor instanceof AbstractDynamicExecutorSupport) {
((AbstractDynamicExecutorSupport) executor).destroy(); ((AbstractDynamicExecutorSupport) executor).destroy();
} }
} }

@ -204,17 +204,17 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
private ThreadPoolNotifyAlarm buildThreadPoolNotifyAlarm(ExecutorProperties executorProperties) { private ThreadPoolNotifyAlarm buildThreadPoolNotifyAlarm(ExecutorProperties executorProperties) {
DynamicThreadPoolNotifyProperties notify = Optional.ofNullable(executorProperties).map(ExecutorProperties::getNotify).orElse(null); DynamicThreadPoolNotifyProperties notify = Optional.ofNullable(executorProperties).map(ExecutorProperties::getNotify).orElse(null);
boolean isAlarm = Optional.ofNullable(executorProperties.getAlarm()) boolean isAlarm = Optional.ofNullable(executorProperties.getAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getAlarm()).orElse(true)); .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getAlarm).orElse(true));
int activeAlarm = Optional.ofNullable(executorProperties.getActiveAlarm()) int activeAlarm = Optional.ofNullable(executorProperties.getActiveAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getActiveAlarm()).orElse(80)); .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getActiveAlarm).orElse(80));
int capacityAlarm = Optional.ofNullable(executorProperties.getCapacityAlarm()) int capacityAlarm = Optional.ofNullable(executorProperties.getCapacityAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getCapacityAlarm()).orElse(80)); .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getCapacityAlarm).orElse(80));
int interval = Optional.ofNullable(notify) int interval = Optional.ofNullable(notify)
.map(each -> each.getInterval()) .map(DynamicThreadPoolNotifyProperties::getInterval)
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getNotify()).map(each -> each.getInterval()).orElse(5)); .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNotify).map(DynamicThreadPoolNotifyProperties::getInterval).orElse(5));
String receive = Optional.ofNullable(notify) String receive = Optional.ofNullable(notify)
.map(each -> each.getReceives()) .map(DynamicThreadPoolNotifyProperties::getReceives)
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getNotify()).map(each -> each.getReceives()).orElse("")); .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNotify).map(DynamicThreadPoolNotifyProperties::getReceives).orElse(""));
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(isAlarm, activeAlarm, capacityAlarm); ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(isAlarm, activeAlarm, capacityAlarm);
threadPoolNotifyAlarm.setInterval(interval); threadPoolNotifyAlarm.setInterval(interval);
threadPoolNotifyAlarm.setReceives(receive); threadPoolNotifyAlarm.setReceives(receive);

@ -53,7 +53,7 @@ public class DynamicThreadPoolSubscribeConfig {
.build(); .build();
public void subscribeConfig(String threadPoolId) { public void subscribeConfig(String threadPoolId) {
subscribeConfig(threadPoolId, config -> threadPoolDynamicRefresh.dynamicRefresh(config)); subscribeConfig(threadPoolId, threadPoolDynamicRefresh::dynamicRefresh);
} }
public void subscribeConfig(String threadPoolId, ThreadPoolSubscribeCallback threadPoolSubscribeCallback) { public void subscribeConfig(String threadPoolId, ThreadPoolSubscribeCallback threadPoolSubscribeCallback) {

@ -130,7 +130,6 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
queryStrMap.put(TP_ID, threadPoolId); queryStrMap.put(TP_ID, threadPoolId);
queryStrMap.put(ITEM_ID, properties.getItemId()); queryStrMap.put(ITEM_ID, properties.getItemId());
queryStrMap.put(NAMESPACE, properties.getNamespace()); queryStrMap.put(NAMESPACE, properties.getNamespace());
boolean isSubscribe = false;
ThreadPoolExecutor newDynamicThreadPoolExecutor = null; ThreadPoolExecutor newDynamicThreadPoolExecutor = null;
ThreadPoolParameterInfo threadPoolParameterInfo = new ThreadPoolParameterInfo(); ThreadPoolParameterInfo threadPoolParameterInfo = new ThreadPoolParameterInfo();
try { try {
@ -167,7 +166,6 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setExecuteTimeOut(executeTimeOut); ((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setExecuteTimeOut(executeTimeOut);
} }
dynamicThreadPoolWrapper.setExecutor(newDynamicThreadPoolExecutor); dynamicThreadPoolWrapper.setExecutor(newDynamicThreadPoolExecutor);
isSubscribe = true;
} }
} else { } else {
// DynamicThreadPool configuration undefined in server // DynamicThreadPool configuration undefined in server
@ -197,8 +195,6 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
if (Objects.isNull(executor)) { if (Objects.isNull(executor)) {
dynamicThreadPoolWrapper.setExecutor(CommonDynamicThreadPool.getInstance(threadPoolId)); dynamicThreadPoolWrapper.setExecutor(CommonDynamicThreadPool.getInstance(threadPoolId));
} }
// Set whether to subscribe to the remote thread pool configuration.
dynamicThreadPoolWrapper.setSubscribeFlag(isSubscribe);
} }
GlobalThreadPoolManage.register(dynamicThreadPoolWrapper.getThreadPoolId(), threadPoolParameterInfo, dynamicThreadPoolWrapper); GlobalThreadPoolManage.register(dynamicThreadPoolWrapper.getThreadPoolId(), threadPoolParameterInfo, dynamicThreadPoolWrapper);
return newDynamicThreadPoolExecutor; return newDynamicThreadPoolExecutor;

Loading…
Cancel
Save