Merge remote-tracking branch 'upstream/develop' into develop

pull/872/head
weihu 3 years ago
commit fe979378c2

@ -16,6 +16,7 @@ docker run -d -p 6691:6691 --name hippo4j-server hippo4j/hippo4j-server
```shell
docker run -d -p 6691:6691 --name hippo4j-server \
-e DATASOURCE_MODE=mysql \
-e DATASOURCE_HOST=xxx.xxx.xxx.xxx \
-e DATASOURCE_PORT=3306 \
-e DATASOURCE_DB=hippo4j_manager \

@ -22,87 +22,202 @@ package cn.hippo4j.common.constant;
*
* @author Rongzhen Yan
*/
public interface HttpResponseCode {
int SC_CONTINUE = 100;
int SC_SWITCHING_PROTOCOLS = 101;
int SC_OK = 200;
int SC_CREATED = 201;
int SC_ACCEPTED = 202;
int SC_NON_AUTHORITATIVE_INFORMATION = 203;
int SC_NO_CONTENT = 204;
int SC_RESET_CONTENT = 205;
int SC_PARTIAL_CONTENT = 206;
int SC_MULTIPLE_CHOICES = 300;
int SC_MOVED_PERMANENTLY = 301;
int SC_MOVED_TEMPORARILY = 302;
int SC_FOUND = 302;
int SC_SEE_OTHER = 303;
int SC_NOT_MODIFIED = 304;
int SC_USE_PROXY = 305;
int SC_TEMPORARY_REDIRECT = 307;
int SC_BAD_REQUEST = 400;
int SC_UNAUTHORIZED = 401;
int SC_PAYMENT_REQUIRED = 402;
int SC_FORBIDDEN = 403;
int SC_NOT_FOUND = 404;
int SC_METHOD_NOT_ALLOWED = 405;
int SC_NOT_ACCEPTABLE = 406;
int SC_PROXY_AUTHENTICATION_REQUIRED = 407;
int SC_REQUEST_TIMEOUT = 408;
int SC_CONFLICT = 409;
int SC_GONE = 410;
int SC_LENGTH_REQUIRED = 411;
int SC_PRECONDITION_FAILED = 412;
int SC_REQUEST_ENTITY_TOO_LARGE = 413;
int SC_REQUEST_URI_TOO_LONG = 414;
int SC_UNSUPPORTED_MEDIA_TYPE = 415;
int SC_REQUESTED_RANGE_NOT_SATISFIABLE = 416;
int SC_EXPECTATION_FAILED = 417;
int SC_INTERNAL_SERVER_ERROR = 500;
int SC_NOT_IMPLEMENTED = 501;
int SC_BAD_GATEWAY = 502;
int SC_SERVICE_UNAVAILABLE = 503;
int SC_GATEWAY_TIMEOUT = 504;
int SC_HTTP_VERSION_NOT_SUPPORTED = 505;
public class HttpResponseCode {
/* 2XX: generally "OK" */
/**
* HTTP Status-Code 200: OK.
*/
public static final int SC_OK = 200;
/**
* HTTP Status-Code 201: Created.
*/
public static final int SC_HTTP_CREATED = 201;
/**
* HTTP Status-Code 202: Accepted.
*/
public static final int SC_HTTP_ACCEPTED = 202;
/**
* HTTP Status-Code 203: Non-Authoritative Information.
*/
public static final int SC_HTTP_NOT_AUTHORITATIVE = 203;
/**
* HTTP Status-Code 204: No Content.
*/
public static final int SC_HTTP_NO_CONTENT = 204;
/**
* HTTP Status-Code 205: Reset Content.
*/
public static final int SC_HTTP_RESET = 205;
/**
* HTTP Status-Code 206: Partial Content.
*/
public static final int SC_HTTP_PARTIAL = 206;
/* 3XX: relocation/redirect */
/**
* HTTP Status-Code 300: Multiple Choices.
*/
public static final int SC_HTTP_MULT_CHOICE = 300;
/**
* HTTP Status-Code 301: Moved Permanently.
*/
public static final int SC_HTTP_MOVED_PERM = 301;
/**
* HTTP Status-Code 302: Temporary Redirect.
*/
public static final int SC_HTTP_MOVED_TEMP = 302;
/**
* HTTP Status-Code 303: See Other.
*/
public static final int SC_HTTP_SEE_OTHER = 303;
/**
* HTTP Status-Code 304: Not Modified.
*/
public static final int SC_HTTP_NOT_MODIFIED = 304;
/**
* HTTP Status-Code 305: Use Proxy.
*/
public static final int SC_HTTP_USE_PROXY = 305;
/**
* HTTP 1.1 Status-Code 307: Temporary Redirect.
*/
public static final int SC_HTTP_TEMP_REDIRECT = 307;
/**
* HTTP 1.1 Status-Code 308: Permanent Redirect
*/
public static final int SC_HTTP_PERMANENT_REDIRECT = 308;
/* 4XX: client error */
/**
* HTTP Status-Code 400: Bad Request.
*/
public static final int SC_HTTP_BAD_REQUEST = 400;
/**
* HTTP Status-Code 401: Unauthorized.
*/
public static final int SC_HTTP_UNAUTHORIZED = 401;
/**
* HTTP Status-Code 402: Payment Required.
*/
public static final int SC_HTTP_PAYMENT_REQUIRED = 402;
/**
* HTTP Status-Code 403: Forbidden.
*/
public static final int SC_HTTP_FORBIDDEN = 403;
/**
* HTTP Status-Code 404: Not Found.
*/
public static final int SC_HTTP_NOT_FOUND = 404;
/**
* HTTP Status-Code 405: Method Not Allowed.
*/
public static final int SC_HTTP_BAD_METHOD = 405;
/**
* HTTP Status-Code 406: Not Acceptable.
*/
public static final int SC_HTTP_NOT_ACCEPTABLE = 406;
/**
* HTTP Status-Code 407: Proxy Authentication Required.
*/
public static final int SC_HTTP_PROXY_AUTH = 407;
/**
* HTTP Status-Code 408: Request Time-Out.
*/
public static final int SC_HTTP_CLIENT_TIMEOUT = 408;
/**
* HTTP Status-Code 409: Conflict.
*/
public static final int SC_HTTP_CONFLICT = 409;
/**
* HTTP Status-Code 410: Gone.
*/
public static final int SC_HTTP_GONE = 410;
/**
* HTTP Status-Code 411: Length Required.
*/
public static final int SC_HTTP_LENGTH_REQUIRED = 411;
/**
* HTTP Status-Code 412: Precondition Failed.
*/
public static final int SC_HTTP_PRECON_FAILED = 412;
/**
* HTTP Status-Code 413: Request Entity Too Large.
*/
public static final int SC_HTTP_ENTITY_TOO_LARGE = 413;
/**
* HTTP Status-Code 414: Request-URI Too Large.
*/
public static final int SC_HTTP_REQ_TOO_LONG = 414;
/**
* HTTP Status-Code 415: Unsupported Media Type.
*/
public static final int SC_HTTP_UNSUPPORTED_TYPE = 415;
/* 5XX: server error */
/**
* HTTP Status-Code 500: Internal Server Error.
*/
public static final int SC_HTTP_INTERNAL_ERROR = 500;
/**
* HTTP Status-Code 501: Not Implemented.
*/
public static final int SC_HTTP_NOT_IMPLEMENTED = 501;
/**
* HTTP Status-Code 502: Bad Gateway.
*/
public static final int SC_HTTP_BAD_GATEWAY = 502;
/**
* HTTP Status-Code 503: Service Unavailable.
*/
public static final int SC_HTTP_UNAVAILABLE = 503;
/**
* HTTP Status-Code 504: Gateway Timeout.
*/
public static final int SC_HTTP_GATEWAY_TIMEOUT = 504;
/**
* HTTP Status-Code 505: HTTP Version Not Supported.
*/
public static final int SC_HTTP_VERSION = 505;
public static boolean isOk(int code) {
return (code >= SC_OK && code < SC_HTTP_MULT_CHOICE) || code == SC_HTTP_NOT_MODIFIED;
}
}

@ -20,6 +20,7 @@ package cn.hippo4j.common.toolkit;
import cn.hippo4j.common.api.JsonFacade;
import com.fasterxml.jackson.core.type.TypeReference;
import java.util.Collections;
import java.util.List;
/**
@ -52,7 +53,7 @@ public class JSONUtil {
public static <T> List<T> parseArray(String text, Class<T> clazz) {
if (StringUtil.isBlank(text)) {
return null;
return Collections.emptyList();
}
return JSON_FACADE.parseArray(text, clazz);
}

@ -319,7 +319,7 @@ public class HttpUtil {
}
connection.connect();
JdkHttpClientResponse response = new JdkHttpClientResponse(connection);
if (HttpResponseCode.SC_OK != response.getStatusCode()) {
if (!HttpResponseCode.isOk(response.getStatusCode())) {
String msg = String.format("HttpPost response code error. [code] %s [url] %s [body] %s", response.getStatusCode(), connection.getURL(), response.getBodyString());
throw new ServiceException(msg);
}

@ -26,6 +26,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class JSONUtilTest {
@ -65,8 +66,8 @@ public class JSONUtilTest {
@Test
public void assertParseArray() {
Assert.assertNull(JSONUtil.parseArray(null, Foo.class));
Assert.assertNull(JSONUtil.parseArray(" ", Foo.class));
Assert.assertEquals(Collections.emptyList(), JSONUtil.parseArray(null, Foo.class));
Assert.assertEquals(Collections.emptyList(), JSONUtil.parseArray(" ", Foo.class));
Assert.assertEquals(
EXPECTED_FOO_ARRAY,
JSONUtil.parseArray(EXPECTED_FOO_JSON_ARRAY, Foo.class));

@ -51,6 +51,7 @@ public class DynamicThreadPoolWrapper implements DisposableBean {
public DynamicThreadPoolWrapper(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
this.threadPoolId = threadPoolId;
this.executor = threadPoolExecutor;
this.subscribeFlag = true;
}
public void execute(Runnable command) {
@ -67,7 +68,7 @@ public class DynamicThreadPoolWrapper implements DisposableBean {
@Override
public void destroy() throws Exception {
if (executor != null && executor instanceof AbstractDynamicExecutorSupport) {
if (executor instanceof AbstractDynamicExecutorSupport) {
((AbstractDynamicExecutorSupport) executor).destroy();
}
}

@ -30,6 +30,7 @@ RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ADD conf/hippo4j-logback.xml ${BASE_DIR}/conf/hippo4j-logback.xml
ADD conf/application.properties ${BASE_DIR}/conf/application.properties
ADD conf/application-h2.properties ${BASE_DIR}/conf/application-h2.properties
ADD target/hippo4j-server.jar ${BASE_DIR}/hippo4j-server.jar
ADD docker-startup.sh ${BASE_DIR}/docker-startup.sh

@ -0,0 +1,9 @@
### Data source customization section
hippo4j.database.dialect=h2
hippo4j.database.init_enable=true
hippo4j.database.init_script=sql-script/h2/hippo4j_manager.sql
spring.datasource.driver-class-name=org.h2.Driver
spring.datasource.url=jdbc:h2:file:{your storage address}/h2_hippo4j_test_file;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MYSQL;
spring.datasource.username=sa
spring.datasource.password=sa

@ -4,6 +4,7 @@
### Server Startup Port
server.port=6691
spring.profiles.active=mysql
### Server Tomcat
server.tomcat.accesslog.enabled=true
@ -23,17 +24,22 @@ hippo4j.core.clean-history-data-enable=true
### Whether to enable authentication.
hippo4j.core.auth.enabled=true
### Initialize the database dialect class.
hippo4j.database.dialect=mysql
hippo4j.database.init_enable=false
hippo4j.database.init_script=sql-script/mysql/hippo4j_manager.sql
### Use netty to report thread pool monitoring data. The default is http.
# hippo4j.core.monitor.report-type=netty
#*************** Config Module Related Configurations ***************#
### Data source customization section
### Default database
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/hippo4j_manager?characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=root
### Hikari Datasource
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.hikari.pool-name=Hikari
spring.datasource.hikari.connectionTimeout=30000
spring.datasource.hikari.idleTimeout=30000

@ -19,7 +19,8 @@ JAVA_OPT="${JAVA_OPT} --server.max-http-header-size=524288"
JAVA_OPT="${JAVA_OPT} --server.tomcat.basedir=${BASE_DIR}/bin"
if [[ "${DATASOURCE_MODE}" == "mysql" ]]; then
JAVA_OPT="${JAVA_OPT} --spring.datasource.url=\"jdbc:mysql://${DATASOURCE_HOST}:${DATASOURCE_PORT}/${DATASOURCE_DB}?characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&serverTimezone=GMT%2B8\" "
JAVA_OPT="${JAVA_OPT} --spring.profiles.active=mysql --hippo4j.database.init_enable=false "
JAVA_OPT="${JAVA_OPT} --spring.datasource.url=jdbc:mysql://${DATASOURCE_HOST}:${DATASOURCE_PORT}/${DATASOURCE_DB}?characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&serverTimezone=GMT%2B8 "
JAVA_OPT="${JAVA_OPT} --spring.datasource.username=${DATASOURCE_USERNAME} --spring.datasource.password=${DATASOURCE_PASSWORD} "
elif [[ "${DATASOURCE_MODE}" == "h2" ]]; then
JAVA_OPT="${JAVA_OPT} --spring.profiles.active=h2 --spring.datasource.url=jdbc:h2:file:${BASE_DIR}/h2_hippo4j;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MYSQL"

@ -109,7 +109,7 @@ public class LongPollingService {
@Override
public void run() {
try {
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext();) {
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
String identity = groupKey + GROUP_KEY_DELIMITER + identify;
List<String> parseMapForFilter = CollectionUtil.newArrayList(identity);
@ -150,7 +150,7 @@ public class LongPollingService {
timeout = Math.max(10000, getFixedPollingInterval());
} else {
List<String> changedGroups = Md5ConfigUtil.compareMd5(req, clientMd5Map);
if (changedGroups.size() > 0) {
if (!changedGroups.isEmpty()) {
generateResponse(rsp, changedGroups);
return;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
@ -203,7 +203,7 @@ public class LongPollingService {
allSubs.remove(ClientLongPolling.this);
if (isFixedPolling()) {
List<String> changedGroups = Md5ConfigUtil.compareMd5((HttpServletRequest) asyncContext.getRequest(), clientMd5Map);
if (changedGroups.size() > 0) {
if (!changedGroups.isEmpty()) {
sendResponse(changedGroups);
} else {
sendResponse(null);
@ -237,12 +237,13 @@ public class LongPollingService {
* @param changedGroups Changed thread pool group key
*/
private void generateResponse(List<String> changedGroups) {
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
if (null == changedGroups) {
response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
// Tell web container to send http response.
asyncContext.complete();
return;
}
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
try {
String respStr = buildRespStr(changedGroups);
response.setHeader("Pragma", "no-cache");

@ -204,17 +204,17 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
private ThreadPoolNotifyAlarm buildThreadPoolNotifyAlarm(ExecutorProperties executorProperties) {
DynamicThreadPoolNotifyProperties notify = Optional.ofNullable(executorProperties).map(ExecutorProperties::getNotify).orElse(null);
boolean isAlarm = Optional.ofNullable(executorProperties.getAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getAlarm()).orElse(true));
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getAlarm).orElse(true));
int activeAlarm = Optional.ofNullable(executorProperties.getActiveAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getActiveAlarm()).orElse(80));
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getActiveAlarm).orElse(80));
int capacityAlarm = Optional.ofNullable(executorProperties.getCapacityAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getCapacityAlarm()).orElse(80));
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getCapacityAlarm).orElse(80));
int interval = Optional.ofNullable(notify)
.map(each -> each.getInterval())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getNotify()).map(each -> each.getInterval()).orElse(5));
.map(DynamicThreadPoolNotifyProperties::getInterval)
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNotify).map(DynamicThreadPoolNotifyProperties::getInterval).orElse(5));
String receive = Optional.ofNullable(notify)
.map(each -> each.getReceives())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getNotify()).map(each -> each.getReceives()).orElse(""));
.map(DynamicThreadPoolNotifyProperties::getReceives)
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNotify).map(DynamicThreadPoolNotifyProperties::getReceives).orElse(""));
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(isAlarm, activeAlarm, capacityAlarm);
threadPoolNotifyAlarm.setInterval(interval);
threadPoolNotifyAlarm.setReceives(receive);

@ -35,17 +35,17 @@ public class BootstrapProperties implements BootstrapPropertiesInterface {
public static final String PREFIX = "spring.dynamic.thread-pool";
/**
* Username.
* Username
*/
private String username;
/**
* Password.
* Password
*/
private String password;
/**
* Server addr
* Server address
*/
private String serverAddr;

@ -38,7 +38,7 @@ import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL;
import static cn.hippo4j.core.toolkit.IdentifyUtil.CLIENT_IDENTIFICATION_VALUE;
/**
* Dynamic threadPool discovery config.
* Dynamic thread-pool discovery config.
*/
@AllArgsConstructor
public class DiscoveryConfiguration {

@ -53,6 +53,7 @@ import cn.hippo4j.springboot.starter.notify.ServerNotifyConfigBuilder;
import cn.hippo4j.springboot.starter.remote.HttpAgent;
import cn.hippo4j.springboot.starter.remote.HttpScheduledHealthCheck;
import cn.hippo4j.springboot.starter.remote.ServerHealthCheck;
import cn.hippo4j.springboot.starter.remote.ServerHttpAgent;
import cn.hippo4j.springboot.starter.support.DynamicThreadPoolConfigService;
import cn.hippo4j.springboot.starter.support.DynamicThreadPoolPostProcessor;
import lombok.AllArgsConstructor;
@ -75,8 +76,7 @@ import org.springframework.core.env.ConfigurableEnvironment;
@ConditionalOnBean(MarkerConfiguration.Marker.class)
@EnableConfigurationProperties(BootstrapProperties.class)
@ConditionalOnProperty(prefix = BootstrapProperties.PREFIX, value = "enable", matchIfMissing = true, havingValue = "true")
@ImportAutoConfiguration({WebAdapterConfiguration.class, HttpClientConfiguration.class, NettyClientConfiguration.class, DiscoveryConfiguration.class, MessageConfiguration.class,
UtilAutoConfiguration.class})
@ImportAutoConfiguration({WebAdapterConfiguration.class, NettyClientConfiguration.class, DiscoveryConfiguration.class, MessageConfiguration.class, UtilAutoConfiguration.class})
public class DynamicThreadPoolAutoConfiguration {
private final BootstrapProperties properties;
@ -210,4 +210,9 @@ public class DynamicThreadPoolAutoConfiguration {
public ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler(Hippo4jSendMessageService hippo4jSendMessageService) {
return new ThreadPoolNotifyAlarmHandler(hippo4jSendMessageService);
}
@Bean
public HttpAgent httpAgent(BootstrapProperties properties) {
return new ServerHttpAgent(properties);
}
}

@ -1,34 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.config;
import cn.hippo4j.springboot.starter.remote.HttpAgent;
import cn.hippo4j.springboot.starter.remote.ServerHttpAgent;
import org.springframework.context.annotation.Bean;
/**
* Http client config.
*/
public class HttpClientConfiguration {
@Bean
@SuppressWarnings("all")
public HttpAgent httpAgent(BootstrapProperties properties) {
return new ServerHttpAgent(properties);
}
}

@ -45,14 +45,9 @@ public class CacheData {
public final String threadPoolId;
@Setter
private int taskId;
@Setter
private volatile boolean isInitializing = true;
private volatile long localConfigLastModified;
private final CopyOnWriteArrayList<ManagerListenerWrapper> listeners;
public CacheData(String tenantId, String itemId, String threadPoolId) {

@ -245,8 +245,6 @@ public class ClientWorker {
} catch (Exception ex) {
log.error("Cache Data Error. Service Unavailable: {}", ex.getMessage());
}
int taskId = cacheMap.size() / CONFIG_LONG_POLL_TIMEOUT;
cacheData.setTaskId(taskId);
lastCacheData = cacheData;
}
return lastCacheData;

@ -53,7 +53,7 @@ public class DynamicThreadPoolSubscribeConfig {
.build();
public void subscribeConfig(String threadPoolId) {
subscribeConfig(threadPoolId, config -> threadPoolDynamicRefresh.dynamicRefresh(config));
subscribeConfig(threadPoolId, threadPoolDynamicRefresh::dynamicRefresh);
}
public void subscribeConfig(String threadPoolId, ThreadPoolSubscribeCallback threadPoolSubscribeCallback) {

@ -27,14 +27,14 @@ public interface Listener {
/**
* Get executor.
*
* @return
* @return executor
*/
Executor getExecutor();
/**
* Receive config info.
*
* @param configInfo
* @param configInfo config info
*/
void receiveConfigInfo(String configInfo);
}

@ -25,7 +25,7 @@ public interface ThreadPoolSubscribeCallback {
/**
* Callback.
*
* @param config
* @param config config info
*/
void callback(String config);
}

@ -27,7 +27,7 @@ public interface Collector {
/**
* Collect message.
*
* @return
* @return message
*/
Message collectMessage();
}

@ -80,7 +80,6 @@ public class ServerNotifyConfigBuilder implements NotifyConfigBuilder {
String resultDataStr = JSONUtil.toJSONString(result.getData());
List<ThreadPoolNotifyDTO> resultData = JSONUtil.parseArray(resultDataStr, ThreadPoolNotifyDTO.class);
resultData.forEach(each -> resultMap.put(each.getNotifyKey(), each.getNotifyList()));
resultMap.forEach((key, val) -> val.stream().filter(each -> Objects.equals("ALARM", each.getType()))
.forEach(each -> alarmControlHandler.initCacheAndLock(each.getTpId(), each.getPlatform(), each.getInterval())));
}

@ -130,7 +130,6 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
queryStrMap.put(TP_ID, threadPoolId);
queryStrMap.put(ITEM_ID, properties.getItemId());
queryStrMap.put(NAMESPACE, properties.getNamespace());
boolean isSubscribe = false;
ThreadPoolExecutor newDynamicThreadPoolExecutor = null;
ThreadPoolParameterInfo threadPoolParameterInfo = new ThreadPoolParameterInfo();
try {
@ -167,7 +166,6 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setExecuteTimeOut(executeTimeOut);
}
dynamicThreadPoolWrapper.setExecutor(newDynamicThreadPoolExecutor);
isSubscribe = true;
}
} else {
// DynamicThreadPool configuration undefined in server
@ -197,8 +195,6 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
if (Objects.isNull(executor)) {
dynamicThreadPoolWrapper.setExecutor(CommonDynamicThreadPool.getInstance(threadPoolId));
}
// Set whether to subscribe to the remote thread pool configuration.
dynamicThreadPoolWrapper.setSubscribeFlag(isSubscribe);
}
GlobalThreadPoolManage.register(dynamicThreadPoolWrapper.getThreadPoolId(), threadPoolParameterInfo, dynamicThreadPoolWrapper);
return newDynamicThreadPoolExecutor;

Loading…
Cancel
Save